Architecture Overview

This section provides an overview of the Dataflow codebase structure and how the components interact to create ETL pipelines.

Directory Structure

src/dataflow/
├── __init__.py              # Package initialization
├── main.py                  # Application entry point
├── connection.py            # Database connection abstractions
├── utils.py                 # Utility functions
├── triggers/                # Trigger implementations
│   └──__init__.py
└── workflows/               # Workflow definitions
    └── __init__.py

Core Components

Application Lifecycle

The following diagram illustrates the application’s lifecycle from startup to running state:

        flowchart TD
    A[Application Start] --> B[Load Workflow Module]
    B --> C[Discover All Triggers]
    C --> D{Startup Triggers<br/>Exist?}
    D -->|Yes| E[Execute @startup<br/>Decorated Functions]
    D -->|No| G[Start Other Triggers]
    E --> F[Wait for All<br/>Startup Jobs to Complete]
    F --> G[Start Other Triggers]
    G --> H["&lt;Triggers&gt;<br/>(CronJob, Change Streams, etc.)"]
    H --> K[Application Running]
    K --> L[Monitor & Execute Jobs]
    L --> M{Shutdown Signal?}
    M -->|No| L
    M -->|Yes| N[Graceful Shutdown]
    N --> O[Stop All Triggers]
    O --> P[Close Connections]
    P --> Q[Application Exit]

    style E fill:#90EE90
    style F fill:#FFD700
    style K fill:#87CEEB
    style N fill:#FFB6C1
    

Entry Point (main.py)

The main.py module serves as the application entry point. It:

  1. Reads the WORKFLOW environment variable to determine which workflow to run

  2. Dynamically imports the workflow module from src/dataflow/workflows/

  3. Discovers all Trigger instances defined in the workflow

  4. Starts each trigger as an asyncio task

  5. Manages graceful shutdown on interrupt

# Example workflow loading
WORKFLOW=conrad  # Environment variable
# Loads: src/dataflow/workflows/conrad.py

Note

The @startup decorator is used to initialize or setup resources before other triggers begin execution. All workflows decorated with @startup will run first, followed by the execution of other triggers. Multiple startup functions will run concurrently, and the application will wait for all of them to complete before starting the other triggers.

For detailed information on the @startup decorator, see the Triggers documentation.

Connections (connection.py)

The connection.py module provides database connection abstractions:

  • Connection: Abstract base class defining the connection interface

  • MicrosoftSQLServerConnection: SQL Server connectivity with parameterized queries

  • MongoDB: Synchronous MongoDB client with batch operations

  • MongoDBAsync: Asynchronous MongoDB client using motor

  • SQLiteConnection: Local SQLite database for caching

See Database Connections for detailed documentation.

Triggers (triggers/)

The triggers/ directory contains trigger implementations that define when ETL jobs run:

  • Trigger (base.py): Abstract base class for all triggers

  • CronJob (cronjob.py): Time-based scheduling using cron expressions

  • MongoDBChangeStream (mongo_change_listener.py): Real-time MongoDB change detection

  • Startup (startup.py): One-time execution on application startup

See Triggers for detailed documentation.

Workflows (workflows/)

The workflows/ directory contains your ETL workflow definitions. Each workflow file contains one or more decorated functions that define triggers. Each function encapsulates the ETL logic and runs independently within an asyncio-supported context.

Example workflow structure:

# src/dataflow/workflows/my_workflow.py
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MongoDB, MicrosoftSQLServerConnection

@cronjob("0 * * * *", name="hourly_sync")
def sync_data():
   # Your ETL logic here
   pass

@cronjob("30 2 * * *", name="daily_cleanup")
def cleanup_data():
   # Your cleanup logic here
   pass

@mongodb_change_stream("mongodb://localhost:27017", "mydb", "mycollection", name="realtime_sync")
def realtime_sync():
   # Your real-time sync logic here
   pass

You can then set the WORKFLOW environment variable to my_workflow to run all the worflows defined in that file.

Utilities (utils.py)

The utils.py module provides helper functions for logging, configuration management, and common data transformations used across the codebase.

Full worflow example

A typical Dataflow ETL job follows this pattern:

from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MicrosoftSQLServerConnection, MongoDB
import pandas as pd

# First initialize the worflow with @startup to setup any necessary resources (note that this is optional and not required for all workflows)
@startup(name="initial_setup")
def initial_setup():
    pass

# Define a extract-transform-load job that runs every 15 minutes
@cronjob("*/15 * * * *", name="etl_job")
def etl_job():
    # 1. EXTRACT - Connect to source and fetch data
    source = MicrosoftSQLServerConnection(
        server="source-server",
        database="source_db",
        username="user",
        password="pass"
    )
    source.connect()
    raw_data = source.fetch("SELECT * FROM source_table WHERE updated > ?", (last_sync,))

    # 2. TRANSFORM - Process and clean data
    df = pd.DataFrame(raw_data)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['value'] = df['value'].astype(float)
    transformed = df.to_dict('records')

    # 3. LOAD - Insert/update destination
    dest = MongoDB(
        hostname="dest-server",
        database="dest_db",
        username="user",
        password="pass"
    )
    dest.connect()
    for record in transformed:
        dest.upsert("collection", filters={"id": record["id"]}, data=record)

    # 4. CLEANUP - Close connections
    source.disconnect()
    dest.disconnect()


# Each time something is changed in the MongoDB collection, this function will run to sync changes back to SQL Server
@mongodb_change_stream("mongodb://dest-server:27017", "dest_db", "collection", name="realtime_sync", operation_types=["update", "replace"])
def realtime_sync(change_event):
      # Connect to SQL Server
      sql_conn = MicrosoftSQLServerConnection(
         server="source-server",
         database="source_db",
         username="user",
         password="pass"
      )
      sql_conn.connect()

      # Process change event and sync back to SQL Server
      updated_doc = change_event["fullDocument"]
      sql_conn.execute("UPDATE source_table SET value = ?, timestamp = ? WHERE id = ?", (updated_doc["value"], updated_doc["timestamp"], updated_doc["id"]))

      # Cleanup
      sql_conn.disconnect()