Triggers

Triggers define when and how your ETL workflows execute. Dataflow provides three types of triggers for different use cases but multiple can be defined within a single workflow (e.g. Kafka event)

Base Trigger Class

All triggers inherit from the abstract Trigger base class defined in src/dataflow/triggers/base.py:

class Trigger

Bases: ABC

Abstract base class for all Dataflow triggers.

A trigger defines when and how an ETL workflow function should be executed. Subclasses must implement all abstract methods and properties to provide specific triggering behavior.

All triggers follow a common lifecycle:

  1. Initialization: Configure the trigger with necessary parameters

  2. Start: Begin listening/scheduling via start()

  3. Execute: Run the wrapped function when trigger conditions are met

  4. Stop: Gracefully shutdown via stop()

Variables:
  • name (str) – Read-only property returning the trigger’s identifier.

  • is_running (bool) – Read-only property indicating if the trigger is active.

Example:

from dataflow.triggers.base import Trigger
import asyncio

class IntervalTrigger(Trigger):
    def __init__(self, func, interval_seconds, name=None):
        self._func = func
        self._interval = interval_seconds
        self._name = name or func.__name__
        self._running = False
        self._task = None

    @property
    def name(self) -> str:
        return self._name

    @property
    def is_running(self) -> bool:
        return self._running

    def start(self) -> asyncio.Task:
        self._running = True
        self._task = asyncio.create_task(self._loop())
        return self._task

    def stop(self) -> None:
        self._running = False
        if self._task:
            self._task.cancel()

    async def _loop(self):
        while self._running:
            await asyncio.sleep(self._interval)
            self._func()

See also

CronJob: Time-based trigger using cron expressions. MongoDBChangeStream: Event-based trigger for MongoDB changes. MSSQLChangeTrigger: Polling-based trigger for SQL Server changes.

abstract property is_running

Check whether the trigger is currently active.

Returns:

bool

True if the trigger has been started and is actively

listening or scheduling, False otherwise.

abstract property name

Get the name identifier of this trigger.

Returns:

str

The trigger’s unique name, typically derived from the

decorated function’s name or explicitly provided during creation.

abstractmethod start()

Start the trigger and begin execution scheduling.

This method initializes the trigger’s main loop and returns an asyncio Task that can be awaited or gathered with other triggers.

Returns:

asyncio.Task

The asyncio Task running the trigger’s main loop.

This task will run until stop() is called or an unrecoverable error occurs.

Raises:

RuntimeError – If the trigger is already running.

Example:

trigger = MyTrigger(my_function, ...)

async def main():
    task = trigger.start()
    await task  # Run until stopped

asyncio.run(main())
abstractmethod stop()

Stop the trigger and cease execution scheduling.

This method signals the trigger to stop its main loop and releases any resources. It should be safe to call even if the trigger is not currently running.

After calling stop, is_running should return False.

Example:

trigger = MyTrigger(my_function, ...)
task = trigger.start()

# Later...
trigger.stop()  # Gracefully shutdown

CronJob Trigger

Location: src/dataflow/triggers/cronjob.py

The CronJob trigger executes functions on a schedule defined by cron expressions.

cronjob(schedule, *, timezone='UTC', name=None, enabled=True)

Decorator to create a CronJob trigger from a function.

The decorated function becomes a CronJob instance that can be started to run automatically based on the cron schedule. The original function can still be called directly.

Parameters:
  • schedule (str) – A cron expression defining the run frequency. The format is minute hour day month weekday.

  • timezone (str) – Timezone for the cron schedule. Defaults to “UTC”. Accepts any valid IANA timezone name (e.g., “Europe/Rome”, “America/New_York”).

  • name (Optional[str]) – Optional name for the cronjob. Defaults to the function name.

  • enabled (bool) – Whether the cronjob is enabled. Defaults to True. If False, calling CronJob.start() will return immediately without scheduling.

Returns:

Callable[[Callable], CronJob] – A decorator that wraps a function in a CronJob instance.

Cron Expression Format

The schedule parameter accepts standard 5-field cron expressions:

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sun-Sat)
│ │ │ │ │
* * * * *

Common Schedule Patterns

  • * * * * * - Every minute

  • */5 * * * * - Every 5 minutes

  • */15 * * * * - Every 15 minutes

  • 0 * * * * - Every hour (at minute 0)

  • 0 0 * * * - Daily at midnight

  • 0 0 * * 0 - Weekly on Sunday at midnight

  • 0 0 1 * * - First day of every month

  • 30 2 * * * - Daily at 2:30 AM

Example: Basic Usage

from dataflow.triggers.cronjob import cronjob

@cronjob("0 0 * * *", timezone="Europe/Rome")
def daily_cleanup():
    # Runs every day at midnight Rome time
    print("Cleaning up...")

# Start the scheduler
async def main():
    task = daily_cleanup.start()
    await task  # Run forever

Example: Async Function

@cronjob("*/10 * * * *")
async def fetch_data():
    # Async functions are fully supported
    await some_async_operation()

Example: Direct Invocation

@cronjob("0 * * * *")
def my_task():
    return "completed"

# The decorated function can still be called directly
result = my_task()  # Returns "completed"

Example: Starting and Stopping

import asyncio

@cronjob("*/5 * * * *", name="my_periodic_job")
def periodic_task():
    print("Running...")

async def main():
    # Start the scheduler
    task = periodic_task.start()

    # Let it run for an hour
    await asyncio.sleep(3600)

    # Stop the scheduler
    periodic_task.stop()

asyncio.run(main())

See also

CronJob: The underlying trigger class.

class CronJob

Bases: Trigger

A cron-based trigger that executes a function on a schedule using asyncio.

The CronJob class wraps a function and executes it automatically based on a cron expression schedule. It supports both synchronous and asynchronous functions, running sync functions in an executor to avoid blocking.

Parameters:
  • func (Callable) – The function to execute on schedule.

  • schedule (str) – A valid cron expression (5 fields: minute, hour, day, month, weekday).

  • timezone (str) – The timezone for scheduling. Defaults to “UTC”.

  • name (Optional[str]) – Optional name identifier. Defaults to the function name.

  • enabled (bool) – Whether the trigger should run when started. Defaults to True.

Raises:

ValueError – If the cron expression is invalid.

Example:

from dataflow.triggers.cronjob import CronJob

def my_task():
    print("Task executed")

job = CronJob(
    func=my_task,
    schedule="0 * * * *",  # Every hour
    timezone="Europe/Rome",
    name="hourly_task"
)

# Start the scheduler
task = job.start()

# Stop when done
job.stop()

See also

cronjob: Decorator to create a CronJob from a function.

__init__(func, schedule, timezone='UTC', name=None, enabled=True)

Initialize a new CronJob trigger.

Parameters:
  • func (Callable) – The function to execute on schedule.

  • schedule (str) – A valid cron expression defining when to run.

  • timezone (str) – The timezone for the schedule. Defaults to “UTC”.

  • name (Optional[str]) – Optional name for the trigger. Defaults to function name.

  • enabled (bool) – Whether the trigger is enabled. Defaults to True.

Raises:

ValueError – If the cron expression is invalid.

property is_running

Check if the trigger is currently running.

Returns:

bool – True if the scheduler loop is active, False otherwise.

property name

Get the name of this trigger.

Returns:

str – The trigger name.

property schedule

Get the cron schedule expression.

Returns:

str – The cron expression string.

start()

Start the cron scheduler as an asyncio task.

Creates and returns an asyncio task that runs the scheduling loop. The task will continue running until stop() is called.

Returns:

asyncio.Task – The asyncio Task running the scheduler loop.

Raises:

RuntimeError – If the cronjob is already running.

Example:

@cronjob("*/5 * * * *")
def my_task():
    pass

async def main():
    task = my_task.start()
    await asyncio.sleep(3600)  # Run for 1 hour
    my_task.stop()

asyncio.run(main())
stop()

Stop the cron scheduler.

Signals the scheduling loop to stop and cancels the running task. This method is safe to call even if the scheduler is not running.

Example:

@cronjob("0 * * * *")
def my_task():
    pass

task = my_task.start()
# ... later ...
my_task.stop()

Basic Usage

from dataflow.triggers.cronjob import cronjob

@cronjob("*/10 * * * *", name="my_job")
def my_scheduled_function():
    print("Running every 10 minutes")

Cron Expression Reference

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sun-Sat)
│ │ │ │ │
* * * * *

Common patterns:

  • * * * * * - Every minute

  • */5 * * * * - Every 5 minutes

  • 0 * * * * - Every hour (at minute 0)

  • 0 0 * * * - Daily at midnight

  • 0 0 * * 0 - Weekly on Sunday at midnight

  • 0 0 1 * * - First day of each month

Async Support

CronJob supports both synchronous and asynchronous functions:

@cronjob("0 * * * *")
def sync_job():
    # Synchronous code
    pass

@cronjob("0 * * * *")
async def async_job():
    # Asynchronous code
    await some_async_operation()

MongoDB Change Stream Trigger

Location: src/dataflow/triggers/mongo_db_listener.py

The MongoDBChangeStream trigger reacts to real-time changes in MongoDB collections using Change Streams.

mongodb_change_stream(collection, *, hostname='localhost', port=27017, database, username=None, password=None, ttl=False, auth_source='admin', fields=None, operations=None, name=None, enabled=True, full_document='updateLookup')

Decorator to create a MongoDB Change Stream trigger from a function.

The decorated function becomes a MongoDBChangeStream instance that listens for real-time document changes in a MongoDB collection.

Parameters:
  • collection (str) – Name of the MongoDB collection to watch.

  • hostname (str) – MongoDB server hostname. Defaults to “localhost”.

  • port (int) – MongoDB server port. Defaults to 27017.

  • database (str) – Name of the database containing the collection.

  • username (Optional[str]) – Username for MongoDB authentication. Defaults to None.

  • password (Optional[str]) – Password for MongoDB authentication. Defaults to None.

  • ttl (bool) – Time-to-live for the change stream cursor. Defaults to False.

  • auth_source (str) – Authentication database name. Defaults to “admin”.

  • fields (Optional[list[str]]) – List of field paths to watch for changes. If None, triggers on any field change. Supports nested paths using dot notation (e.g., “metadata.updated_at”).

  • operations (Optional[list[MongoDBOperation]]) – Operations to listen for. Options: “insert”, “update”, “replace”, “delete”. Defaults to all operations.

  • name (Optional[str]) – Name identifier for this trigger. Defaults to the function name.

  • enabled (bool) – Whether the trigger is enabled. Defaults to True.

  • full_document (str) – Specifies how much document data to return. - “default”: Returns None for updates (only delta) - “updateLookup”: Looks up the full document after update - “whenAvailable”: Returns full document if available - “required”: Fails if full document is not available

Returns:

Callable[[Callable], MongoDBChangeStream]

A decorator that wraps

a function in a MongoDBChangeStream instance.

Change Event Structure

The callback function receives a change event dictionary with the following structure:

{
    "operationType": "insert" | "update" | "replace" | "delete",
    "fullDocument": {...},       # Complete document (if available)
    "documentKey": {"_id": ...}, # Document identifier
    "updateDescription": {       # For update operations only
        "updatedFields": {...},  # Fields that were modified
        "removedFields": [...]   # Fields that were removed
    },
    "ns": {                      # Namespace info
        "db": "database_name",
        "coll": "collection_name"
    }
}

Example: Basic Usage

from dataflow.triggers.mongo_db_listener import mongodb_change_stream

@mongodb_change_stream(
    collection="orders",
    hostname="localhost",
    port=27017,
    database="shop"
)
async def on_order_change(change_event: dict):
    print(f"Order {change_event['documentKey']['_id']} was {change_event['operationType']}")

async def main():
    task = on_order_change.start()
    await task

Example: Filtering by Fields

@mongodb_change_stream(
    collection="products",
    database="inventory",
    fields=["price", "stock.quantity"],  # Only trigger on these fields
    operations=["update"]                 # Only on updates
)
async def on_price_change(event: dict):
    if "price" in event.get("updateDescription", {}).get("updatedFields", {}):
        new_price = event["updateDescription"]["updatedFields"]["price"]
        print(f"Price changed to {new_price}")

Example: Multiple Operations

@mongodb_change_stream(
    collection="users",
    database="auth",
    operations=["insert", "delete"]
)
async def on_user_lifecycle(event: dict):
    if event["operationType"] == "insert":
        print(f"New user: {event['fullDocument']['email']}")
    elif event["operationType"] == "delete":
        print(f"User deleted: {event['documentKey']['_id']}")

Example: Manual Testing

@mongodb_change_stream(collection="test", database="dev")
def handler(event):
    print(event)

# Test without actual database changes
handler({"operationType": "insert", "fullDocument": {"_id": "1"}})

See also

MongoDBChangeStream: The underlying trigger class.

Note

MongoDB Change Streams require a replica set or sharded cluster. Standalone MongoDB instances do not support Change Streams.

class MongoDBChangeStream

Bases: Trigger

A trigger that listens to MongoDB Change Streams for document changes.

This trigger connects to a MongoDB collection and watches for real-time changes using Change Streams. When a matching change occurs, the wrapped function is called with the change event data.

Parameters:
  • func (Callable) – The function to execute when a change is detected. Should accept a single change_event dictionary parameter.

  • collection (str) – Name of the MongoDB collection to watch.

  • hostname (str) – MongoDB server hostname. Defaults to “localhost”.

  • port (int) – MongoDB server port. Defaults to 27017.

  • database (str) – Name of the database containing the collection.

  • username (Optional[str]) – Username for authentication. Defaults to None.

  • password (Optional[str]) – Password for authentication. Defaults to None.

  • ttl (Optional[int]) – Time-to-live for the change stream cursor. Defaults to None.

  • auth_source (Optional[str]) – Authentication database name. Defaults to None.

  • fields (Optional[list[str]]) – List of field paths to watch for changes. If None, triggers on any field change.

  • operations (Optional[list[MongoDBOperation]]) – List of operations to trigger on. Defaults to all operations: [“insert”, “update”, “replace”, “delete”].

  • name (Optional[str]) – Name identifier for this trigger. Defaults to function name.

  • enabled (bool) – Whether the trigger is enabled. Defaults to True.

  • full_document (str) – Specifies how much document data to return. One of “default”, “updateLookup”, “whenAvailable”, “required”. Defaults to “updateLookup”.

Example:

from dataflow.triggers.mongo_db_listener import MongoDBChangeStream

def handle_change(event):
    print(f"Document {event['documentKey']['_id']} was {event['operationType']}")

trigger = MongoDBChangeStream(
    func=handle_change,
    collection="users",
    hostname="localhost",
    port=27017,
    database="myapp",
    operations=["insert", "delete"]
)

# Start listening
task = trigger.start()

See also

mongodb_change_stream: Decorator to create a MongoDBChangeStream from a function.

Note

MongoDB Change Streams require a replica set or sharded cluster. Standalone MongoDB instances do not support Change Streams.

__init__(func, collection, *, hostname='localhost', port=27017, database, username=None, password=None, ttl=None, auth_source=None, fields=None, operations=None, name=None, enabled=True, full_document='updateLookup')

Initialize a new MongoDBChangeStream trigger.

Parameters:
  • func (Callable) – The function to execute when a change is detected.

  • collection (str) – Name of the MongoDB collection to watch.

  • hostname (str) – MongoDB server hostname. Defaults to “localhost”.

  • port (int) – MongoDB server port. Defaults to 27017.

  • database (str) – Name of the database containing the collection.

  • username (Optional[str]) – Username for authentication. Defaults to None.

  • password (Optional[str]) – Password for authentication. Defaults to None.

  • ttl (Optional[int]) – Time-to-live for the cursor. Defaults to None.

  • auth_source (Optional[str]) – Authentication database. Defaults to None.

  • fields (Optional[list[str]]) – Field paths to watch. Defaults to None (all fields).

  • operations (Optional[list[MongoDBOperation]]) – Operations to trigger on. Defaults to all operations.

  • name (Optional[str]) – Trigger name. Defaults to function name.

  • enabled (bool) – Whether enabled. Defaults to True.

  • full_document (str) – Document return mode. Defaults to “updateLookup”.

property collection

Get the MongoDB collection being watched.

Returns:

str – The collection name.

property is_running

Check if the trigger is currently running.

Returns:

bool – True if the change stream listener is active, False otherwise.

property name

Get the name of this trigger.

Returns:

str – The trigger name.

property operations

Get the list of operations this trigger responds to.

Returns:

list[MongoDBOperation] – List of operation types (insert, update, replace, delete).

start()

Start the change stream listener as an asyncio task.

Creates and returns an asyncio task that listens to the MongoDB change stream. The task will continue running until stop() is called.

Returns:

asyncio.Task – The asyncio Task running the listener loop.

Raises:

RuntimeError – If the change stream is already running.

Example:

@mongodb_change_stream(collection="orders", database="shop")
async def on_change(event):
    pass

async def main():
    task = on_change.start()
    await asyncio.sleep(3600)  # Listen for 1 hour
    on_change.stop()

asyncio.run(main())
stop()

Stop the change stream listener.

Signals the listening loop to stop and cancels the running task. This method is safe to call even if the listener is not running.

Example:

on_change.stop()

Basic Usage

from dataflow.triggers.mongo_db_listener import mongodb_change_stream

@mongodb_change_stream(
    collection="orders",
    hostname="localhost",
    port=27017,
    database="mydb",
    username="user",
    password="pass"
)
async def on_order_change(change_event: dict):
    print(f"Order changed: {change_event}")

Change Event Structure

The callback receives a change event dictionary:

{
    "operationType": "update",  # insert, update, replace, delete
    "fullDocument": {...},      # The complete document (if available)
    "updateDescription": {      # For updates only
        "updatedFields": {...},
        "removedFields": [...]
    },
    "documentKey": {"_id": ...}  # Document identifier
}

Filtering Changes

Filter by specific fields and operations:

@mongodb_change_stream(
    collection="orders",
    fields=["status", "payment.completed"],  # Only trigger on these fields
    operations=["update", "insert"],          # Only these operations
    ...
)
async def on_order_status_change(change_event: dict):
    pass

SQL Server Change Trigger

Location: src/dataflow/triggers/mssql_change_trigger.py

The MSSQLChangeTrigger uses SQL Server’s Change Tracking feature to detect table modifications through polling.

Note

Change Tracking must be enabled on the database and table in SQL Server.

mssql_change_trigger(table, *, schema='dbo', database=None, connection_string=None, fields=None, operations=None, name=None, enabled=True, track_columns=True, poll_interval=1.0)

Decorator to create a SQL Server Change Trigger from a function.

The decorated function becomes a MSSQLChangeTrigger instance that polls for table changes using SQL Server Change Tracking.

Parameters:
  • table (str) – Name of the table to watch for changes.

  • schema (str) – Schema name containing the table. Defaults to “dbo”.

  • database (Optional[str]) – Database name. If not provided, uses the default from the connection string.

  • connection_string (Optional[str]) – ODBC connection string for SQL Server. Example: “Driver={ODBC Driver 17 for SQL Server};Server=…;Database=…;UID=…;PWD=…”

  • fields (Optional[list[str]]) – List of column names to watch for changes. If None, triggers on any column change.

  • operations (Optional[list[MSSQLOperation]]) – Operations to trigger on. Options: “INSERT”, “UPDATE”, “DELETE”. Defaults to all operations.

  • name (Optional[str]) – Name identifier for this trigger. Defaults to the function name.

  • enabled (bool) – Whether the trigger is enabled. Defaults to True.

  • track_columns (bool) – Whether to track which specific columns changed. Defaults to True. When True, only triggers on watched columns.

  • poll_interval (float) – Seconds between polling for changes. Defaults to 1.0 second.

Returns:

Callable[[Callable], MSSQLChangeTrigger]

A decorator that wraps

a function in a MSSQLChangeTrigger instance.

Enabling Change Tracking in SQL Server

Before using this trigger, you must enable Change Tracking on your database and table:

-- Enable at database level
ALTER DATABASE YourDatabase
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);

-- Enable for specific table
ALTER TABLE dbo.YourTable
ENABLE CHANGE_TRACKING
WITH (TRACK_COLUMNS_UPDATED = ON);

Change Event Structure

The callback function receives a change event dictionary with the following structure:

{
    "operation": "INSERT" | "UPDATE" | "DELETE",
    "table": "TableName",
    "schema": "dbo",
    "version": 12345,              # Change tracking version
    "new_values": {...},           # Current column values
    "changed_columns": [...]       # List of changed column names
}

Example: Basic Usage

from dataflow.triggers.mssql_change_trigger import mssql_change_trigger

@mssql_change_trigger(
    table="Orders",
    schema="Sales",
    database="AdventureWorks",
    connection_string="Driver={ODBC Driver 17 for SQL Server};..."
)
async def on_order_change(change_event: dict):
    print(f"Order {change_event['operation']}: {change_event['new_values']}")

async def main():
    task = on_order_change.start()
    await task

Example: Filtering by Columns

@mssql_change_trigger(
    table="Products",
    database="Inventory",
    fields=["Price", "StockQuantity"],  # Only trigger on these columns
    operations=["UPDATE"],               # Only on updates
    poll_interval=0.5                    # Poll every 500ms
)
async def on_price_change(event: dict):
    print(f"Product price/stock changed: {event['new_values']}")

Example: Multiple Operations

@mssql_change_trigger(
    table="AuditLog",
    operations=["INSERT", "DELETE"]
)
async def on_audit_event(event: dict):
    if event["operation"] == "INSERT":
        print(f"New audit entry: {event['new_values']}")
    elif event["operation"] == "DELETE":
        print("Audit entry deleted")

Example: Manual Testing

@mssql_change_trigger(table="Test", ...)
def handler(event):
    return event["operation"]

# Test without actual database changes
result = handler({
    "operation": "INSERT",
    "table": "Test",
    "schema": "dbo",
    "version": 1,
    "new_values": {"Id": 1, "Name": "Test"}
})

See also

MSSQLChangeTrigger: The underlying trigger class.

Note

This trigger uses polling rather than real-time events. The poll_interval parameter controls how frequently changes are checked. Lower values provide faster detection but increase database load.

class MSSQLChangeTrigger

Bases: Trigger

A trigger that polls SQL Server Change Tracking for table modifications.

This trigger connects to a SQL Server database and polls for changes using the Change Tracking feature. When matching changes are detected, the wrapped function is called with the change event data.

Parameters:
  • func (Callable) – The function to execute when a change is detected. Should accept a single change_event dictionary parameter.

  • table (str) – Name of the table to watch for changes.

  • schema (str) – Schema name containing the table. Defaults to “dbo”.

  • database (Optional[str]) – Database name. Uses connection default if None.

  • connection_string (Optional[str]) – ODBC connection string for SQL Server.

  • fields (Optional[list[str]]) – List of column names to watch. If None, triggers on any column change.

  • operations (Optional[list[MSSQLOperation]]) – Operations to trigger on. Defaults to all: [“INSERT”, “UPDATE”, “DELETE”].

  • name (Optional[str]) – Name identifier for this trigger. Defaults to function name.

  • enabled (bool) – Whether the trigger is enabled. Defaults to True.

  • track_columns (bool) – Whether to track which columns changed. Defaults to True.

  • poll_interval (float) – Seconds between polling for changes. Defaults to 1.0.

Example:

from dataflow.triggers.mssql_change_trigger import MSSQLChangeTrigger

def handle_change(event):
    print(f"Row {event['operation']}: {event['new_values']}")

trigger = MSSQLChangeTrigger(
    func=handle_change,
    table="Products",
    schema="dbo",
    database="Inventory",
    connection_string="Driver={ODBC Driver 17 for SQL Server};...",
    operations=["UPDATE"]
)

# Start polling
task = trigger.start()

See also

mssql_change_trigger: Decorator to create a trigger from a function.

Note

SQL Server Change Tracking must be enabled on the database and table. See the decorator documentation for SQL commands to enable it.

__init__(func, table, *, schema='dbo', database=None, connection_string=None, fields=None, operations=None, name=None, enabled=True, track_columns=True, poll_interval=1.0)

Initialize a new MSSQLChangeTrigger.

Parameters:
  • func (Callable) – The function to execute when a change is detected.

  • table (str) – Name of the table to watch.

  • schema (str) – Schema name. Defaults to “dbo”.

  • database (Optional[str]) – Database name. Defaults to None.

  • connection_string (Optional[str]) – ODBC connection string. Defaults to None.

  • fields (Optional[list[str]]) – Columns to watch. Defaults to None (all columns).

  • operations (Optional[list[MSSQLOperation]]) – Operations to trigger on. Defaults to all operations.

  • name (Optional[str]) – Trigger name. Defaults to function name.

  • enabled (bool) – Whether enabled. Defaults to True.

  • track_columns (bool) – Track changed columns. Defaults to True.

  • poll_interval (float) – Polling interval in seconds. Defaults to 1.0.

property database

Get the database name.

Returns:

Optional[str] – The database name, or None if using connection default.

property is_running

Check if the trigger is currently running.

Returns:

bool – True if the polling loop is active, False otherwise.

property name

Get the name of this trigger.

Returns:

str – The trigger name.

property operations

Get the list of operations this trigger responds to.

Returns:

list[MSSQLOperation] – List of operation types (INSERT, UPDATE, DELETE).

property schema

Get the schema containing the watched table.

Returns:

str – The schema name.

start()

Start the change trigger polling loop as an asyncio task.

Creates and returns an asyncio task that polls SQL Server for changes. The task will continue running until stop() is called.

Returns:

asyncio.Task – The asyncio Task running the polling loop.

Raises:

RuntimeError – If the change trigger is already running.

Example:

@mssql_change_trigger(table="Orders", ...)
async def on_change(event):
    pass

async def main():
    task = on_change.start()
    await asyncio.sleep(3600)  # Poll for 1 hour
    on_change.stop()

asyncio.run(main())
stop()

Stop the change trigger polling loop.

Signals the polling loop to stop and cancels the running task. This method is safe to call even if the trigger is not running.

Example:

on_change.stop()
property table

Get the table being watched.

Returns:

str – The table name.

Basic Usage

from dataflow.triggers.mssql_change_trigger import mssql_change_trigger

@mssql_change_trigger(
    table="Orders",
    connection_string="Driver={ODBC Driver 17 for SQL Server};Server=...;Database=...;UID=...;PWD=...",
    database="SalesDB"
)
async def on_order_change(change_event: dict):
    print(f"Order changed: {change_event}")

Change Event Structure

{
    "operation": "UPDATE",      # INSERT, UPDATE, DELETE
    "table": "Orders",
    "schema": "dbo",
    "version": 12345,           # Change tracking version
    "new_values": {...},        # Current column values
    "changed_columns": [...]    # Which columns changed (if track_columns=True)
}

Enabling Change Tracking in SQL Server

-- Enable at database level
ALTER DATABASE YourDatabase
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);

-- Enable for specific table
ALTER TABLE dbo.YourTable
ENABLE CHANGE_TRACKING
WITH (TRACK_COLUMNS_UPDATED = ON);

Starting and Stopping Triggers

All decorated functions become Trigger instances with start() and stop() methods:

import asyncio
from dataflow.triggers.cronjob import cronjob

@cronjob("*/5 * * * *", name="my_job")
def my_job():
    pass

async def main():
    # Start the trigger
    task = await my_job.start()

    # Run for some time
    await asyncio.sleep(3600)

    # Stop the trigger
    my_job.stop()

asyncio.run(main())

Multiple Triggers in a Workflow

Define multiple triggers in a single workflow file:

# src/dataflow/workflows/my_workflow.py
from dataflow.triggers.cronjob import cronjob
from dataflow.triggers.mongo_db_listener import mongodb_change_stream

@cronjob("0 * * * *", name="hourly_sync")
def hourly_sync():
    pass

@cronjob("0 0 * * *", name="daily_cleanup")
def daily_cleanup():
    pass

@mongodb_change_stream(collection="events", ...)
async def on_event(change):
    pass

When running this workflow, all triggers will execute concurrently.