Dataflow Documentation¶
Dataflow is a Python framework designed to build ETL (Extract, Transform, Load) pipelines in a fast and efficient way. It provides a decorator-based approach for defining data workflows with support for multiple trigger types and database connections.
Key Features¶
Decorator-based workflow definition: Define ETL jobs using simple decorators like
@cronjobMultiple trigger types: Time-based scheduling, MongoDB change streams, and SQL Server change tracking
Multi-database support: SQL Server, MongoDB, and SQLite out of the box
Async-first architecture: Non-blocking I/O for high-performance data processing
Built-in security: SQL injection prevention, NoSQL injection protection, and type validation
Quick Example¶
from dataflow.triggers.cronjob import cronjob
from dataflow.connection import MongoDB, MicrosoftSQLServerConnection
@cronjob("*/10 * * * *", timezone="UTC", name="sync_orders")
def sync_orders():
# Extract from SQL Server
sql_conn = MicrosoftSQLServerConnection(server="...", database="...")
sql_conn.connect()
orders = sql_conn.fetch("SELECT * FROM orders WHERE status = 'pending'")
# Transform and Load to MongoDB
mongo = MongoDB(hostname="...", database="analytics")
mongo.connect()
mongo.upsert("orders", filters={"order_id": ...}, data=orders)
sql_conn.disconnect()
mongo.disconnect()