Database Connections

Dataflow provides abstraction layers for connecting to multiple database systems. All connections are defined in src/dataflow/connection.py.

Connection Base Class

All connections inherit from the abstract Connection base class:

Microsoft SQL Server

Connects to Microsoft SQL Server databases using pymssql.

Basic Usage

from dataflow.connection import MicrosoftSQLServerConnection

conn = MicrosoftSQLServerConnection(
    server="my-server.database.windows.net",
    port=1433,
    database="mydb",
    username="user",
    password="password"
)

conn.connect()

# Fetch data with parameterized query
results = conn.fetch(
    "SELECT * FROM orders WHERE status = %s AND created_at > %s",
    ("pending", "2024-01-01")
)

# Execute a statement
conn.execute(
    "UPDATE orders SET status = %s WHERE id = %s",
    ("completed", 123)
)

# Batch execute
conn.execute_many(
    "INSERT INTO logs (message) VALUES (%s)",
    [("Log 1",), ("Log 2",), ("Log 3",)],
    batch_size=100
)

conn.disconnect()

MongoDB (Synchronous)

Synchronous MongoDB client using pymongo.

Basic Usage

from dataflow.connection import MongoDB

mongo = MongoDB(
    host="localhost",
    port=27017,
    database="analytics",
    username="user",
    password="password"
)

mongo.connect()

# Fetch documents
orders = mongo.fetch("orders", filters={"status": "pending"})

# Insert documents
mongo.insert("orders", data=[
    {"product": "Widget", "quantity": 10},
    {"product": "Gadget", "quantity": 5}
])

# Upsert (update or insert)
mongo.upsert(
    "orders",
    filters={"order_id": "ORD-123"},
    data={"order_id": "ORD-123", "status": "shipped"}
)

# Delete documents
mongo.delete("orders", filters={"status": "cancelled"})

mongo.disconnect()

MongoDB (Asynchronous)

Asynchronous MongoDB client using motor for non-blocking operations.

Basic Usage

from dataflow.connection import MongoDBAsync
import asyncio

async def main():
    mongo = MongoDBAsync(
        host="localhost",
        port=27017,
        database="analytics",
        username="user",
        password="password"
    )

    mongo.connect()

    # Async fetch
    orders = await mongo.fetch("orders", filters={"status": "pending"})

    # Async insert
    await mongo.insert("orders", data=[{"product": "Widget"}])

    # Async upsert
    await mongo.upsert(
        "orders",
        filters={"order_id": "ORD-123"},
        data={"status": "shipped"}
    )

    await mongo.disconnect()

asyncio.run(main())

SQLite

Local SQLite database for caching and lightweight storage.

Basic Usage

from dataflow.connection import SQLiteConnection

db = SQLiteConnection(database_path="./cache.db")
db.connect()

# Execute schema creation
db.execute("""
    CREATE TABLE IF NOT EXISTS sync_cache (
        id TEXT PRIMARY KEY,
        last_sync TIMESTAMP
    )
""")

# Fetch data
cache = db.fetch("SELECT * FROM sync_cache WHERE id = ?", ("item-1",))

# Insert data
db.insert("sync_cache", data=[{"id": "item-1", "last_sync": "2024-01-01"}])

# Upsert data
db.upsert(
    "sync_cache",
    filters={"id": "item-1"},
    data={"id": "item-1", "last_sync": "2024-01-02"}
)

# Delete data
db.delete("sync_cache", filters={"id": "item-1"})

db.disconnect()

Security Features

All connection classes implement security measures to prevent injection attacks.

SQL Injection Prevention

SQL Server and SQLite connections use parameterized queries:

# Safe - parameters are escaped
conn.fetch("SELECT * FROM users WHERE id = %s", (user_id,))

# Never do this - vulnerable to injection
conn.fetch(f"SELECT * FROM users WHERE id = {user_id}")  # DON'T DO THIS

NoSQL Injection Prevention

MongoDB connections block dangerous operators:

# These operators are blocked:
# $where, $function, $accumulator

# Safe usage
mongo.fetch("collection", filters={"status": "active"})

# This would raise an error if attempted
mongo.fetch("collection", filters={"$where": "..."})  # Blocked

Type Validation

All parameters are validated before being passed to database drivers:

  • Query parameters must be of allowed types (str, int, float, bool, datetime, None)

  • Complex objects are rejected to prevent serialization attacks

Connection Lifecycle

Always ensure connections are properly closed, preferably using try/finally:

from dataflow.connection import MicrosoftSQLServerConnection, MongoDB

def my_etl_job():
    source = MicrosoftSQLServerConnection(...)
    dest = MongoDB(...)

    try:
        source.connect()
        dest.connect()

        data = source.fetch("SELECT * FROM table")
        dest.insert("collection", data=data)

    finally:
        source.disconnect()
        dest.disconnect()