Architecture Overview

This section provides an overview of the Dataflow codebase structure and how the components interact to create ETL pipelines.

Directory Structure

src/dataflow/
├── __init__.py              # Package initialization
├── main.py                  # Application entry point
├── connection.py            # Database connection abstractions
├── utils.py                 # Utility functions
├── triggers/                # Trigger implementations
│   ├── __init__.py
│   ├── base.py              # Abstract Trigger base class
│   ├── cronjob.py           # Time-based trigger
│   ├── mongo_db_listener.py # MongoDB change stream trigger
│   └── mssql_change_trigger.py  # SQL Server change tracking trigger
└── workflows/               # Workflow definitions
    ├── __init__.py
    └── conrad.py            # Example workflow

Core Components

Entry Point (main.py)

The main.py module serves as the application entry point. It:

  1. Reads the WORKFLOW environment variable to determine which workflow to run

  2. Dynamically imports the workflow module from src/dataflow/workflows/

  3. Discovers all Trigger instances defined in the workflow

  4. Starts each trigger as an asyncio task

  5. Manages graceful shutdown on interrupt

# Example workflow loading
WORKFLOW=conrad  # Environment variable
# Loads: src/dataflow/workflows/conrad.py

Connections (connection.py)

The connection.py module provides database connection abstractions:

  • Connection: Abstract base class defining the connection interface

  • MicrosoftSQLServerConnection: SQL Server connectivity with parameterized queries

  • MongoDB: Synchronous MongoDB client with batch operations

  • MongoDBAsync: Asynchronous MongoDB client using motor

  • SQLiteConnection: Local SQLite database for caching

See Database Connections for detailed documentation.

Triggers (triggers/)

The triggers/ directory contains trigger implementations that define when ETL jobs run:

  • Trigger (base.py): Abstract base class for all triggers

  • CronJob (cronjob.py): Time-based scheduling using cron expressions

  • MongoDBChangeStream (mongo_db_listener.py): Real-time MongoDB change detection

  • MSSQLChangeTrigger (mssql_change_trigger.py): SQL Server change tracking polling

See Triggers for detailed documentation.

Workflows (workflows/)

The workflows/ directory contains your ETL workflow definitions. Each workflow file contains one or more decorated functions that define triggers. Each function encapsulates the ETL logic and runs independently within an asyncio-supported context.

Example workflow structure:

# src/dataflow/workflows/my_workflow.py
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MongoDB, MicrosoftSQLServerConnection

@cronjob("0 * * * *", name="hourly_sync")
def sync_data():
   # Your ETL logic here
   pass

@cronjob("30 2 * * *", name="daily_cleanup")
def cleanup_data():
   # Your cleanup logic here
   pass

@mongodb_change_stream("mongodb://localhost:27017", "mydb", "mycollection", name="realtime_sync")
def realtime_sync():
   # Your real-time sync logic here
   pass

You can then set the WORKFLOW environment variable to my_workflow to run all the worflows defined in that file.

Utilities (utils.py)

The utils.py module provides helper functions for logging, configuration management, and common data transformations used across the codebase.

ETL Pattern

A typical Dataflow ETL job follows this pattern:

from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MicrosoftSQLServerConnection, MongoDB
import pandas as pd

@cronjob("*/15 * * * *", name="etl_job")
def etl_job():
    # 1. EXTRACT - Connect to source and fetch data
    source = MicrosoftSQLServerConnection(
        server="source-server",
        database="source_db",
        username="user",
        password="pass"
    )
    source.connect()
    raw_data = source.fetch("SELECT * FROM source_table WHERE updated > ?", (last_sync,))

    # 2. TRANSFORM - Process and clean data
    df = pd.DataFrame(raw_data)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['value'] = df['value'].astype(float)
    transformed = df.to_dict('records')

    # 3. LOAD - Insert/update destination
    dest = MongoDB(
        hostname="dest-server",
        database="dest_db",
        username="user",
        password="pass"
    )
    dest.connect()
    for record in transformed:
        dest.upsert("collection", filters={"id": record["id"]}, data=record)

    # 4. CLEANUP - Close connections
    source.disconnect()
    dest.disconnect()

Design Principles

Async-First

All I/O operations use async/await for non-blocking execution. Multiple triggers run concurrently without blocking each other.

Decorator Pattern

Functions decorated with @cronjob, @mongodb_change_stream, or @mssql_change_trigger automatically become Trigger instances.

Security by Default

All database operations use parameterized queries to prevent injection attacks. NoSQL injection is prevented through operator filtering.

Lazy Loading

Database drivers are imported only when connections are instantiated, reducing startup time and memory usage.

Graceful Shutdown

On interrupt (SIGINT), all triggers are stopped cleanly and connections closed.