Architecture Overview¶
This section provides an overview of the Dataflow codebase structure and how the components interact to create ETL pipelines.
Directory Structure¶
src/dataflow/
├── __init__.py # Package initialization
├── main.py # Application entry point
├── connection.py # Database connection abstractions
├── utils.py # Utility functions
├── triggers/ # Trigger implementations
│ ├── __init__.py
│ ├── base.py # Abstract Trigger base class
│ ├── cronjob.py # Time-based trigger
│ ├── mongo_db_listener.py # MongoDB change stream trigger
│ └── mssql_change_trigger.py # SQL Server change tracking trigger
└── workflows/ # Workflow definitions
├── __init__.py
└── conrad.py # Example workflow
Core Components¶
Entry Point (main.py)¶
The main.py module serves as the application entry point. It:
Reads the
WORKFLOWenvironment variable to determine which workflow to runDynamically imports the workflow module from
src/dataflow/workflows/Discovers all
Triggerinstances defined in the workflowStarts each trigger as an asyncio task
Manages graceful shutdown on interrupt
# Example workflow loading
WORKFLOW=conrad # Environment variable
# Loads: src/dataflow/workflows/conrad.py
Connections (connection.py)¶
The connection.py module provides database connection abstractions:
Connection: Abstract base class defining the connection interface
MicrosoftSQLServerConnection: SQL Server connectivity with parameterized queries
MongoDB: Synchronous MongoDB client with batch operations
MongoDBAsync: Asynchronous MongoDB client using motor
SQLiteConnection: Local SQLite database for caching
See Database Connections for detailed documentation.
Triggers (triggers/)¶
The triggers/ directory contains trigger implementations that define when ETL jobs run:
Trigger (
base.py): Abstract base class for all triggersCronJob (
cronjob.py): Time-based scheduling using cron expressionsMongoDBChangeStream (
mongo_db_listener.py): Real-time MongoDB change detectionMSSQLChangeTrigger (
mssql_change_trigger.py): SQL Server change tracking polling
See Triggers for detailed documentation.
Workflows (workflows/)¶
The workflows/ directory contains your ETL workflow definitions. Each workflow file
contains one or more decorated functions that define triggers. Each function encapsulates the ETL logic and runs independently within an asyncio-supported context.
Example workflow structure:
# src/dataflow/workflows/my_workflow.py
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MongoDB, MicrosoftSQLServerConnection
@cronjob("0 * * * *", name="hourly_sync")
def sync_data():
# Your ETL logic here
pass
@cronjob("30 2 * * *", name="daily_cleanup")
def cleanup_data():
# Your cleanup logic here
pass
@mongodb_change_stream("mongodb://localhost:27017", "mydb", "mycollection", name="realtime_sync")
def realtime_sync():
# Your real-time sync logic here
pass
You can then set the WORKFLOW environment variable to my_workflow to run all the worflows defined in that file.
Utilities (utils.py)¶
The utils.py module provides helper functions for logging, configuration management,
and common data transformations used across the codebase.
ETL Pattern¶
A typical Dataflow ETL job follows this pattern:
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MicrosoftSQLServerConnection, MongoDB
import pandas as pd
@cronjob("*/15 * * * *", name="etl_job")
def etl_job():
# 1. EXTRACT - Connect to source and fetch data
source = MicrosoftSQLServerConnection(
server="source-server",
database="source_db",
username="user",
password="pass"
)
source.connect()
raw_data = source.fetch("SELECT * FROM source_table WHERE updated > ?", (last_sync,))
# 2. TRANSFORM - Process and clean data
df = pd.DataFrame(raw_data)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['value'] = df['value'].astype(float)
transformed = df.to_dict('records')
# 3. LOAD - Insert/update destination
dest = MongoDB(
hostname="dest-server",
database="dest_db",
username="user",
password="pass"
)
dest.connect()
for record in transformed:
dest.upsert("collection", filters={"id": record["id"]}, data=record)
# 4. CLEANUP - Close connections
source.disconnect()
dest.disconnect()
Design Principles¶
- Async-First
All I/O operations use async/await for non-blocking execution. Multiple triggers run concurrently without blocking each other.
- Decorator Pattern
Functions decorated with
@cronjob,@mongodb_change_stream, or@mssql_change_triggerautomatically becomeTriggerinstances.- Security by Default
All database operations use parameterized queries to prevent injection attacks. NoSQL injection is prevented through operator filtering.
- Lazy Loading
Database drivers are imported only when connections are instantiated, reducing startup time and memory usage.
- Graceful Shutdown
On interrupt (SIGINT), all triggers are stopped cleanly and connections closed.