Triggers

Triggers define when and how your ETL workflows execute. Dataflow provides multiple types of triggers for different use cases, and multiple can be defined within a single workflow.

Base Trigger Class

All triggers inherit from the abstract Trigger base class defined in src/dataflow/triggers/base.py:

CronJob Trigger

Location: src/dataflow/triggers/cronjob.py

The CronJob trigger executes functions on a schedule defined by cron expressions.

Basic Usage

from dataflow.triggers.cronjob import cronjob

@cronjob("*/10 * * * *", name="my_job")
def my_scheduled_function(logger):
    logger.info("Running every 10 minutes")

Logger Injection

All functions decorated with @cronjob automatically receive a logger parameter. This logger is pre-configured with the trigger’s name and can be used for logging within your scheduled function:

@cronjob("0 0 * * *", name="daily_report")
def generate_daily_report(logger):
    logger.info("Starting daily report generation")
    try:
        # Your logic here
        logger.debug("Processing data...")
        logger.info("Report generated successfully")
    except Exception as e:
        logger.error(f"Failed to generate report: {e}")

Cron Expression Reference

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sun-Sat)
│ │ │ │ │
* * * * *

Common patterns:

  • * * * * * - Every minute

  • */5 * * * * - Every 5 minutes

  • 0 * * * * - Every hour (at minute 0)

  • 0 0 * * * - Daily at midnight

  • 0 0 * * 0 - Weekly on Sunday at midnight

  • 0 0 1 * * - First day of each month

Async Support

CronJob supports both synchronous and asynchronous functions. The logger is injected in both cases:

@cronjob("0 * * * *")
def sync_job(logger):
    # Synchronous code
    logger.info("Sync job executed")

@cronjob("0 * * * *")
async def async_job(logger):
    # Asynchronous code
    logger.info("Async job started")
    await some_async_operation()
    logger.info("Async job completed")

MongoDB Change Stream Trigger

Location: src/dataflow/triggers/mongo_change_listener.py

The MongoDBChangeStream trigger reacts to real-time changes in MongoDB collections using Change Streams.

Basic Usage

from dataflow.triggers.mongo_change_listener import mongodb_change_stream

@mongodb_change_stream(
    collection="orders",
    host="localhost",
    port=27017,
    database="mydb",
    username="user",
    password="pass"
)
async def on_order_change(change_event: dict, logger):
    logger.info(f"Order changed: {change_event['operationType']}")

Logger Injection

All functions decorated with @mongodb_change_stream automatically receive a logger parameter as the second argument (after the change_event). This logger is pre-configured with the trigger’s name:

@mongodb_change_stream(
    collection="orders",
    database="mydb",
    host="localhost"
)
async def on_order_change(change_event: dict, logger):
    logger.info(f"Received {change_event['operationType']} event")
    try:
        # Process the change event
        doc_id = change_event['documentKey']['_id']
        logger.debug(f"Processing document {doc_id}")
        # Your logic here
        logger.info(f"Successfully processed document {doc_id}")
    except Exception as e:
        logger.error(f"Failed to process change event: {e}")

Change Event Structure

The callback receives a change event dictionary:

{
    "operationType": "update",  # insert, update, replace, delete
    "fullDocument": {...},      # The complete document (if available)
    "updateDescription": {      # For updates only
        "updatedFields": {...},
        "removedFields": [...]
    },
    "documentKey": {"_id": ...}  # Document identifier
}

Filtering Changes

Filter by specific fields and operations:

@mongodb_change_stream(
    collection="orders",
    database="mydb",
    host="localhost",
    fields=["status", "payment.completed"],  # Only trigger on these fields
    operations=["update", "insert"]          # Only these operations
)
async def on_order_status_change(change_event: dict, logger):
    logger.info(f"Order status changed for document {change_event['documentKey']['_id']}")

Startup Trigger

Location: src/dataflow/triggers/startup.py

The Startup trigger executes a function once when the application starts. This is useful for initialization tasks like warming up caches, loading configuration, or running database migrations.

Basic Usage

from dataflow.triggers.startup import startup

@startup(name="initialize_cache")
def initialize_cache(logger):
    logger.info("Cache initialized on startup")

@startup(name="load_config")
async def load_config(logger):
    # Async functions are also supported
    logger.info("Loading remote configuration")
    config = await fetch_remote_config()
    logger.info("Configuration loaded successfully")
    return config

Logger Injection

All functions decorated with @startup automatically receive a logger parameter. This logger is pre-configured with the trigger’s name and can be used for logging during initialization:

@startup(name="database_migrations")
async def run_migrations(logger):
    logger.info("Starting database migrations")
    try:
        await apply_migrations()
        logger.info("Database migrations completed successfully")
    except Exception as e:
        logger.error(f"Migration failed: {e}")
        raise

Starting and Stopping Triggers

All decorated functions become Trigger instances with start() and stop() methods:

import asyncio
from dataflow.triggers.cronjob import cronjob

@cronjob("*/5 * * * *", name="my_job")
def my_job(logger):
    logger.info("Job is running")

async def main():
    # Start the trigger
    task = my_job.start()

    # Run for some time
    await asyncio.sleep(3600)

    # Stop the trigger
    my_job.stop()

asyncio.run(main())

Multiple Triggers in a Workflow

Define multiple triggers in a single workflow file:

# src/dataflow/workflows/my_workflow.py
from dataflow.triggers.cronjob import cronjob
from dataflow.triggers.mongo_change_listener import mongodb_change_stream
from dataflow.triggers.startup import startup

@startup(name="init")
def initialize(logger):
    logger.info("Initializing workflow")

@cronjob("0 * * * *", name="hourly_sync")
def hourly_sync(logger):
    logger.info("Running hourly sync")

@cronjob("0 0 * * *", name="daily_cleanup")
def daily_cleanup(logger):
    logger.info("Running daily cleanup")

@mongodb_change_stream(collection="events", database="mydb", host="localhost")
async def on_event(change, logger):
    logger.info(f"Event received: {change['operationType']}")

When running this workflow, all triggers will execute concurrently.

Logger Configuration

Each trigger automatically creates its own logger instance configured with the trigger’s name. The logger parameter is automatically injected into all decorated functions, providing:

  • Automatic naming: Logger is named after the trigger (or custom name if provided)

  • Standard Python logging: Uses the standard Python logging module

  • Consistent interface: All trigger types receive the logger the same way

You can configure the logging level and format in your application’s logging configuration:

import logging

# Configure logging for all triggers
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Or configure specific trigger loggers
logging.getLogger("my_cronjob").setLevel(logging.DEBUG)