Database Connections

Dataflow provides abstraction layers for connecting to multiple database systems. All connections are defined in src/dataflow/connection.py.

Connection Base Class

All connections inherit from the abstract Connection base class:

class Connection

Bases: BaseModel, ABC

Abstract base class for database connections.

This class defines the interface that all database connection implementations must follow. It inherits from both Pydantic’s BaseModel for configuration validation and Python’s ABC for abstract method enforcement.

All subclasses must implement the connect and disconnect methods.

abstractmethod connect()

Establish a connection to the database.

This method must be implemented by subclasses to handle the specific connection logic for each database type.

Raises:

RuntimeError – If the connection cannot be established.

abstractmethod disconnect()

Close the database connection.

This method must be implemented by subclasses to properly clean up resources and close any open connections or cursors.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Microsoft SQL Server

Connects to Microsoft SQL Server databases using pymssql.

class MicrosoftSQLServerConnection

Bases: Connection

Microsoft SQL Server database connection implementation.

Provides methods for connecting to and querying Microsoft SQL Server databases using the pymssql library. Supports parameterized queries to prevent SQL injection.

Variables:
  • host (str) – The hostname or IP address of the SQL Server. Defaults to “localhost”.

  • port (str) – The port number for the SQL Server connection. Defaults to “1433”.

  • database (str) – The name of the database to connect to.

  • username (str) – The username for authentication.

  • password (str) – The password for authentication.

Example

Basic connection and query:

conn = MicrosoftSQLServerConnection(
    host="localhost",
    port="1433",
    database="mydb",
    username="sa",
    password="password"
)
conn.connect()
results = conn.fetch("SELECT * FROM users")
conn.disconnect()
__init__(**data)

Initialize the SQL Server connection.

Parameters:

**data – Keyword arguments for connection configuration including host, port, database, username, and password.

connect()

Establish a connection to the SQL Server database.

Creates a new connection and cursor for executing queries.

Raises:

AssertionError – If the connection fails to establish.

database
disconnect()

Close the SQL Server connection and cursor.

Safely closes the cursor and connection if they exist, and sets the internal references to None.

execute(query, params=None)

Execute a SQL query that modifies data.

Executes the query and commits the transaction. If an error occurs, the transaction is rolled back.

Parameters:
  • query – The SQL query string to execute (INSERT, UPDATE, DELETE, etc.).

  • params – Optional tuple of parameters for the query.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If parameters are invalid.

  • Exception – Re-raises any exception after rolling back the transaction.

execute_many(query, params_list, batch_size=1000)

Execute a SQL query multiple times with different parameters.

Executes the query in batches for better performance with large datasets.

Parameters:
  • query – The SQL query string to execute.

  • params_list – A list of parameter tuples, one for each execution.

  • batch_size – The number of operations per batch. Defaults to 1000.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If params_list is not a list or parameters are invalid.

  • Exception – Re-raises any exception after rolling back the transaction.

fetch(query, params=None)

Execute a SELECT query and return results as a list of dictionaries.

Parameters:
  • query – The SQL query string to execute.

  • params – Optional tuple of parameters for the query.

Returns:

A list of dictionaries where each dictionary represents a row with column names as keys.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If parameters are invalid.

host
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

password
port
username
classmethod validate_port(v)

Validate and convert the port to a string.

Parameters:

v – The port value to validate, can be int or str.

Returns:

The port as a string.

Basic Usage

from dataflow.connection import MicrosoftSQLServerConnection

conn = MicrosoftSQLServerConnection(
    server="my-server.database.windows.net",
    port=1433,
    database="mydb",
    username="user",
    password="password"
)

conn.connect()

# Fetch data with parameterized query
results = conn.fetch(
    "SELECT * FROM orders WHERE status = %s AND created_at > %s",
    ("pending", "2024-01-01")
)

# Execute a statement
conn.execute(
    "UPDATE orders SET status = %s WHERE id = %s",
    ("completed", 123)
)

# Batch execute
conn.execute_many(
    "INSERT INTO logs (message) VALUES (%s)",
    [("Log 1",), ("Log 2",), ("Log 3",)],
    batch_size=100
)

conn.disconnect()

MongoDB (Synchronous)

Synchronous MongoDB client using pymongo.

class MongoDB

Bases: Connection

MongoDB database connection implementation.

Provides methods for connecting to and querying MongoDB databases using the pymongo library. Includes protection against NoSQL injection attacks by blocking dangerous operators.

Variables:
  • host (str) – The hostname or IP address of the MongoDB server. Defaults to “localhost”.

  • port (int) – The port number for the MongoDB connection. Defaults to 27017.

  • database (str) – The name of the database to connect to.

  • username (str | None) – Optional username for authentication.

  • password (str | None) – Optional password for authentication.

  • tls (bool) – Whether to use TLS encryption. Defaults to False.

  • auth_source (str) – The authentication database. Defaults to “admin”.

Example

Basic connection and query:

conn = MongoDB(
    host="localhost",
    port=27017,
    database="mydb",
    username="user",
    password="password"
)
conn.connect()
results = conn.fetch("users", {"active": True})
conn.disconnect()
__init__(**data)

Initialize the MongoDB connection.

Parameters:

**data – Keyword arguments for connection configuration including host, port, database, username, password, tls, and auth_source.

auth_source
connect()

Establish a connection to the MongoDB database.

Constructs a connection URI from the configured parameters and creates a MongoClient instance.

Raises:

RuntimeError – If the client or database connection fails.

database
delete(collection, filters, batch_size=1000, must_exist=True)

Delete documents from a collection in batches.

Performs delete operations using bulk_write with DeleteOne operations.

Parameters:
  • collection – The name of the collection to delete from.

  • filters – A list of filter dictionaries to match documents for deletion.

  • batch_size – The number of operations per batch. Defaults to 1000.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of BulkWriteResult objects, one per batch.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If filters is not a list.

  • ValueError – If collection doesn’t exist or dangerous operators are used.

disconnect()

Close the MongoDB connection.

Safely closes the client connection and sets internal references to None.

fetch(collection, query, projection=None, must_exist=True)

Retrieve documents from a collection.

Parameters:
  • collection – The name of the collection to query.

  • query – The query filter dictionary.

  • projection – Optional projection to limit returned fields.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A cursor over the matching documents.

Raises:
  • RuntimeError – If not connected to the database.

  • ValueError – If collection doesn’t exist or dangerous operators are used.

  • TypeError – If query or projection is not a dictionary.

host
insert(collection, documents, batch_size=1000, must_exist=True)

Insert documents into a collection in batches.

Parameters:
  • collection – The name of the collection to insert into.

  • documents – A list of document dictionaries to insert.

  • batch_size – The number of documents per batch. Defaults to 1000.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of InsertManyResult objects, one per batch.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If documents is not a list.

  • ValueError – If collection doesn’t exist when required.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

password
port
tls
upsert(collection, filters, updates, batch_size=1000, must_exist=True)

Update or insert documents in a collection in batches.

Performs upsert operations using bulk_write with UpdateOne operations. Automatically sets updated_at and created_at timestamps.

Parameters:
  • collection – The name of the collection to upsert into.

  • filters – A list of filter dictionaries to match documents.

  • updates – A list of update dictionaries with fields to set.

  • batch_size – The number of operations per batch. Defaults to 1000.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of BulkWriteResult objects, one per batch.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If filters or updates is not a list.

  • ValueError – If filters and updates have different lengths, collection doesn’t exist, or dangerous operators are used.

username
classmethod validate_port(v)

Validate and convert the port to an integer.

Parameters:

v – The port value to validate, can be int or str.

Returns:

The port as an integer.

Basic Usage

from dataflow.connection import MongoDB

mongo = MongoDB(
    hostname="localhost",
    port=27017,
    database="analytics",
    username="user",
    password="password"
)

mongo.connect()

# Fetch documents
orders = mongo.fetch("orders", filters={"status": "pending"})

# Insert documents
mongo.insert("orders", data=[
    {"product": "Widget", "quantity": 10},
    {"product": "Gadget", "quantity": 5}
])

# Upsert (update or insert)
mongo.upsert(
    "orders",
    filters={"order_id": "ORD-123"},
    data={"order_id": "ORD-123", "status": "shipped"}
)

# Delete documents
mongo.delete("orders", filters={"status": "cancelled"})

mongo.disconnect()

MongoDB (Asynchronous)

Asynchronous MongoDB client using motor for non-blocking operations.

class MongoDBAsync

Bases: Connection

Asynchronous MongoDB database connection implementation.

Provides async methods for connecting to and querying MongoDB databases using the Motor library. Includes protection against NoSQL injection attacks by blocking dangerous operators.

Variables:
  • hostname (str) – The hostname or IP address of the MongoDB server.

  • port (int) – The port number for the MongoDB connection.

  • database (str) – The name of the database to connect to.

  • username (str | None) – Optional username for authentication.

  • password (str | None) – Optional password for authentication.

  • tls (bool) – Whether to use TLS encryption. Defaults to False.

  • auth_source (str) – The authentication database. Defaults to “admin”.

Example

Basic async connection and query:

conn = MongoDBAsync(
    hostname="localhost",
    port=27017,
    database="mydb"
)
conn.connect()
results = await conn.fetch("users", {"active": True})
conn.disconnect()
__init__(**data)

Initialize the async MongoDB connection.

Parameters:

**data – Keyword arguments for connection configuration including hostname, port, database, username, password, tls, and auth_source.

auth_source
connect()

Establish a connection to the MongoDB database.

Creates an AsyncIOMotorClient instance with the configured parameters.

Raises:

RuntimeError – If the client or database connection fails.

database
async delete(collection, filters, batch_size=1000, must_exist=True)

Delete documents from a collection asynchronously in batches.

Performs delete operations using bulk_write with DeleteOne operations.

Parameters:
  • collection – The name of the collection to delete from.

  • filters – A list of filter dictionaries to match documents for deletion.

  • batch_size – The number of operations per batch. Defaults to 1000.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of BulkWriteResult objects, one per batch.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If filters is not a list.

  • ValueError – If collection doesn’t exist or dangerous operators are used.

disconnect()

Close the async MongoDB connection.

Safely closes the client connection and sets internal references to None.

async fetch(collection, query, projection=None, must_exist=True)

Retrieve documents from a collection asynchronously.

Parameters:
  • collection – The name of the collection to query.

  • query – The query filter dictionary.

  • projection – Optional projection to limit returned fields.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of matching documents.

Raises:
  • RuntimeError – If not connected to the database.

  • ValueError – If collection doesn’t exist or dangerous operators are used.

  • TypeError – If query or projection is not a dictionary.

async get_collection(collection, must_exist=True)

Get a collection from the connected database asynchronously.

Parameters:
  • collection – The name of the collection to retrieve.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

The MongoDB collection object.

Raises:
  • RuntimeError – If not connected to the database.

  • ValueError – If collection name is invalid or doesn’t exist when required.

hostname
async insert(collection, documents, batch_size=1000, must_exist=True)

Insert documents into a collection asynchronously in batches.

Parameters:
  • collection – The name of the collection to insert into.

  • documents – A list of document dictionaries to insert.

  • batch_size – The number of documents per batch. Defaults to 1000.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of InsertManyResult objects, one per batch.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If documents is not a list.

  • ValueError – If collection doesn’t exist when required.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

password
port
tls
async upsert(collection, filters, updates, batch_size=1000, must_exist=True)

Update or insert documents asynchronously in batches.

Performs upsert operations using bulk_write with UpdateOne operations.

Parameters:
  • collection – The name of the collection to upsert into.

  • filters – A list of filter dictionaries to match documents.

  • updates – A list of update dictionaries with fields to set.

  • batch_size – The number of operations per batch. Defaults to 1000.

  • must_exist – If True, raises an error if the collection doesn’t exist. Defaults to True.

Returns:

A list of BulkWriteResult objects, one per batch.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If filters or updates is not a list.

  • ValueError – If filters and updates have different lengths, collection doesn’t exist, or dangerous operators are used.

username
classmethod validate_port(v)

Validate and convert the port to an integer.

Parameters:

v – The port value to validate, can be int or str.

Returns:

The port as an integer.

Basic Usage

from dataflow.connection import MongoDBAsync
import asyncio

async def main():
    mongo = MongoDBAsync(
        hostname="localhost",
        port=27017,
        database="analytics",
        username="user",
        password="password"
    )

    await mongo.connect()

    # Async fetch
    orders = await mongo.fetch("orders", filters={"status": "pending"})

    # Async insert
    await mongo.insert("orders", data=[{"product": "Widget"}])

    # Async upsert
    await mongo.upsert(
        "orders",
        filters={"order_id": "ORD-123"},
        data={"status": "shipped"}
    )

    await mongo.disconnect()

asyncio.run(main())

SQLite

Local SQLite database for caching and lightweight storage.

class SQLiteConnection

Bases: Connection

SQLite database connection implementation.

Provides methods for connecting to and querying SQLite databases using the built-in sqlite3 library. Supports parameterized queries to prevent SQL injection.

Variables:

database_path (str) – The file path to the SQLite database file.

Example

Basic connection and query:

conn = SQLiteConnection(database_path="/path/to/database.db")
conn.connect()
results = conn.fetch("SELECT * FROM users WHERE id = ?", (1,))
conn.disconnect()
__init__(**data)

Initialize the SQLite connection.

Parameters:

**data – Keyword arguments for connection configuration, primarily database_path.

connect()

Establish a connection to the SQLite database.

Creates parent directories if they don’t exist and opens a connection to the database file. Enables multi-threaded access.

Raises:

AssertionError – If the connection fails to establish.

database_path
delete(query, params_list, batch_size=1000)

Delete records from the database in batches.

Parameters:
  • query – The DELETE SQL query string.

  • params_list – A list of parameter tuples, one for each deletion.

  • batch_size – The number of deletions per batch. Defaults to 1000.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If params_list is not a list or parameters are invalid.

  • Exception – Re-raises any exception after rolling back the transaction.

disconnect()

Close the SQLite connection and cursor.

Safely closes the cursor and connection if they exist, and sets the internal references to None.

ensure_connected()

Ensure the cursor and connection are established.

Returns:

A tuple containing the cursor and connection objects.

Raises:

RuntimeError – If the cursor or connection is not established.

execute(query, params=None)

Execute a SQL query that modifies data.

Executes the query and commits the transaction. If an error occurs, the transaction is rolled back.

Parameters:
  • query – The SQL query string to execute (INSERT, UPDATE, DELETE, etc.).

  • params – Optional tuple of parameters for the query.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If parameters are invalid.

  • Exception – Re-raises any exception after rolling back the transaction.

fetch(query, params=None)

Execute a SELECT query and return results as a list of dictionaries.

Parameters:
  • query – The SQL query string to execute.

  • params – Optional tuple of parameters for the query.

Returns:

A list of dictionaries where each dictionary represents a row with column names as keys.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If parameters are invalid.

insert(query, params_list, batch_size=1000)

Insert records into the database in batches.

Parameters:
  • query – The INSERT SQL query string.

  • params_list – A list of parameter tuples, one for each row to insert.

  • batch_size – The number of rows per batch. Defaults to 1000.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If params_list is not a list or parameters are invalid.

  • Exception – Re-raises any exception after rolling back the transaction.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

upsert(query, params_list, batch_size=1000)

Update or insert records in batches.

Uses SQLite’s INSERT OR REPLACE or similar syntax provided in the query.

Parameters:
  • query – The upsert SQL query string (e.g., INSERT OR REPLACE).

  • params_list – A list of parameter tuples, one for each operation.

  • batch_size – The number of operations per batch. Defaults to 1000.

Raises:
  • RuntimeError – If not connected to the database.

  • TypeError – If params_list is not a list or parameters are invalid.

  • Exception – Re-raises any exception after rolling back the transaction.

classmethod validate_database_path(v)

Validate and convert the database path to a string.

Parameters:

v – The database path value to validate, can be Path or str.

Returns:

The database path as a string.

Basic Usage

from dataflow.connection import SQLiteConnection

db = SQLiteConnection(database_path="./cache.db")
db.connect()

# Execute schema creation
db.execute("""
    CREATE TABLE IF NOT EXISTS sync_cache (
        id TEXT PRIMARY KEY,
        last_sync TIMESTAMP
    )
""")

# Fetch data
cache = db.fetch("SELECT * FROM sync_cache WHERE id = ?", ("item-1",))

# Insert data
db.insert("sync_cache", data=[{"id": "item-1", "last_sync": "2024-01-01"}])

# Upsert data
db.upsert(
    "sync_cache",
    filters={"id": "item-1"},
    data={"id": "item-1", "last_sync": "2024-01-02"}
)

# Delete data
db.delete("sync_cache", filters={"id": "item-1"})

db.disconnect()

Security Features

All connection classes implement security measures to prevent injection attacks.

SQL Injection Prevention

SQL Server and SQLite connections use parameterized queries:

# Safe - parameters are escaped
conn.fetch("SELECT * FROM users WHERE id = %s", (user_id,))

# Never do this - vulnerable to injection
conn.fetch(f"SELECT * FROM users WHERE id = {user_id}")  # DON'T DO THIS

NoSQL Injection Prevention

MongoDB connections block dangerous operators:

# These operators are blocked:
# $where, $function, $accumulator

# Safe usage
mongo.fetch("collection", filters={"status": "active"})

# This would raise an error if attempted
mongo.fetch("collection", filters={"$where": "..."})  # Blocked

Type Validation

All parameters are validated before being passed to database drivers:

  • Query parameters must be of allowed types (str, int, float, bool, datetime, None)

  • Complex objects are rejected to prevent serialization attacks

Connection Lifecycle

Always ensure connections are properly closed, preferably using try/finally:

from dataflow.connection import MicrosoftSQLServerConnection, MongoDB

def my_etl_job():
    source = MicrosoftSQLServerConnection(...)
    dest = MongoDB(...)

    try:
        source.connect()
        dest.connect()

        data = source.fetch("SELECT * FROM table")
        dest.insert("collection", data=data)

    finally:
        source.disconnect()
        dest.disconnect()