Dataflow Documentation

Dataflow is a Python framework designed to build ETL (Extract, Transform, Load) pipelines in a fast and efficient way. It provides a decorator-based approach for defining data workflows with support for multiple trigger types and database connections.

Key Features

  • Decorator-based workflow definition: Define ETL jobs using simple decorators like @cronjob

  • Multiple trigger types: Time-based scheduling, MongoDB change streams, and SQL Server change tracking

  • Multi-database support: SQL Server, MongoDB, and SQLite out of the box

  • Async-first architecture: Non-blocking I/O for high-performance data processing

  • Built-in security: SQL injection prevention, NoSQL injection protection, and type validation

Quick Example

from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MongoDB, MicrosoftSQLServerConnection

@cronjob("*/10 * * * *", timezone="UTC", name="sync_orders")
def sync_orders():
    # Extract from SQL Server
    sql_conn = MicrosoftSQLServerConnection(server="...", database="...")
    sql_conn.connect()
    orders = sql_conn.fetch("SELECT * FROM orders WHERE status = 'pending'")

    # Transform and Load to MongoDB
    mongo = MongoDB(hostname="...", database="analytics")
    mongo.connect()
    mongo.upsert("orders", filters={"order_id": ...}, data=orders)

    sql_conn.disconnect()
    mongo.disconnect()