Architecture Overview¶
This section provides an overview of the Dataflow codebase structure and how the components interact to create ETL pipelines.
Directory Structure¶
src/dataflow/
├── __init__.py # Package initialization
├── main.py # Application entry point
├── connection.py # Database connection abstractions
├── utils.py # Utility functions
├── triggers/ # Trigger implementations
│ └──__init__.py
└── workflows/ # Workflow definitions
└── __init__.py
Core Components¶
Application Lifecycle¶
The following diagram illustrates the application’s lifecycle from startup to running state:
flowchart TD
A[Application Start] --> B[Load Workflow Module]
B --> C[Discover All Triggers]
C --> D{Startup Triggers<br/>Exist?}
D -->|Yes| E[Execute @startup<br/>Decorated Functions]
D -->|No| G[Start Other Triggers]
E --> F[Wait for All<br/>Startup Jobs to Complete]
F --> G[Start Other Triggers]
G --> H["<Triggers><br/>(CronJob, Change Streams, etc.)"]
H --> K[Application Running]
K --> L[Monitor & Execute Jobs]
L --> M{Shutdown Signal?}
M -->|No| L
M -->|Yes| N[Graceful Shutdown]
N --> O[Stop All Triggers]
O --> P[Close Connections]
P --> Q[Application Exit]
style E fill:#90EE90
style F fill:#FFD700
style K fill:#87CEEB
style N fill:#FFB6C1
Entry Point (main.py)¶
The main.py module serves as the application entry point. It:
Reads the
WORKFLOWenvironment variable to determine which workflow to runDynamically imports the workflow module from
src/dataflow/workflows/Discovers all
Triggerinstances defined in the workflowStarts each trigger as an asyncio task
Manages graceful shutdown on interrupt
# Example workflow loading
WORKFLOW=conrad # Environment variable
# Loads: src/dataflow/workflows/conrad.py
Note
The @startup decorator is used to initialize or setup resources before other triggers begin execution. All workflows decorated with @startup will run first, followed by the execution of other triggers. Multiple startup functions will run concurrently, and the application will wait for all of them to complete before starting the other triggers.
For detailed information on the @startup decorator, see the Triggers documentation.
Connections (connection.py)¶
The connection.py module provides database connection abstractions:
Connection: Abstract base class defining the connection interface
MicrosoftSQLServerConnection: SQL Server connectivity with parameterized queries
MongoDB: Synchronous MongoDB client with batch operations
MongoDBAsync: Asynchronous MongoDB client using motor
SQLiteConnection: Local SQLite database for caching
See Database Connections for detailed documentation.
Triggers (triggers/)¶
The triggers/ directory contains trigger implementations that define when ETL jobs run:
Trigger (
base.py): Abstract base class for all triggersCronJob (
cronjob.py): Time-based scheduling using cron expressionsMongoDBChangeStream (
mongo_change_listener.py): Real-time MongoDB change detectionStartup (
startup.py): One-time execution on application startup
See Triggers for detailed documentation.
Workflows (workflows/)¶
The workflows/ directory contains your ETL workflow definitions. Each workflow file
contains one or more decorated functions that define triggers. Each function encapsulates the ETL logic and runs independently within an asyncio-supported context.
Example workflow structure:
# src/dataflow/workflows/my_workflow.py
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MongoDB, MicrosoftSQLServerConnection
@cronjob("0 * * * *", name="hourly_sync")
def sync_data():
# Your ETL logic here
pass
@cronjob("30 2 * * *", name="daily_cleanup")
def cleanup_data():
# Your cleanup logic here
pass
@mongodb_change_stream("mongodb://localhost:27017", "mydb", "mycollection", name="realtime_sync")
def realtime_sync():
# Your real-time sync logic here
pass
You can then set the WORKFLOW environment variable to my_workflow to run all the worflows defined in that file.
Utilities (utils.py)¶
The utils.py module provides helper functions for logging, configuration management,
and common data transformations used across the codebase.
Full worflow example¶
A typical Dataflow ETL job follows this pattern:
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MicrosoftSQLServerConnection, MongoDB
import pandas as pd
# First initialize the worflow with @startup to setup any necessary resources (note that this is optional and not required for all workflows)
@startup(name="initial_setup")
def initial_setup():
pass
# Define a extract-transform-load job that runs every 15 minutes
@cronjob("*/15 * * * *", name="etl_job")
def etl_job():
# 1. EXTRACT - Connect to source and fetch data
source = MicrosoftSQLServerConnection(
server="source-server",
database="source_db",
username="user",
password="pass"
)
source.connect()
raw_data = source.fetch("SELECT * FROM source_table WHERE updated > ?", (last_sync,))
# 2. TRANSFORM - Process and clean data
df = pd.DataFrame(raw_data)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['value'] = df['value'].astype(float)
transformed = df.to_dict('records')
# 3. LOAD - Insert/update destination
dest = MongoDB(
hostname="dest-server",
database="dest_db",
username="user",
password="pass"
)
dest.connect()
for record in transformed:
dest.upsert("collection", filters={"id": record["id"]}, data=record)
# 4. CLEANUP - Close connections
source.disconnect()
dest.disconnect()
# Each time something is changed in the MongoDB collection, this function will run to sync changes back to SQL Server
@mongodb_change_stream("mongodb://dest-server:27017", "dest_db", "collection", name="realtime_sync", operation_types=["update", "replace"])
def realtime_sync(change_event):
# Connect to SQL Server
sql_conn = MicrosoftSQLServerConnection(
server="source-server",
database="source_db",
username="user",
password="pass"
)
sql_conn.connect()
# Process change event and sync back to SQL Server
updated_doc = change_event["fullDocument"]
sql_conn.execute("UPDATE source_table SET value = ?, timestamp = ? WHERE id = ?", (updated_doc["value"], updated_doc["timestamp"], updated_doc["id"]))
# Cleanup
sql_conn.disconnect()