Database Connections¶
Dataflow provides abstraction layers for connecting to multiple database systems.
All connections are defined in src/dataflow/connection.py.
Connection Base Class¶
All connections inherit from the abstract Connection base class:
Microsoft SQL Server¶
Connects to Microsoft SQL Server databases using pymssql.
Basic Usage¶
from dataflow.connection import MicrosoftSQLServerConnection
conn = MicrosoftSQLServerConnection(
server="my-server.database.windows.net",
port=1433,
database="mydb",
username="user",
password="password"
)
conn.connect()
# Fetch data with parameterized query
results = conn.fetch(
"SELECT * FROM orders WHERE status = %s AND created_at > %s",
("pending", "2024-01-01")
)
# Execute a statement
conn.execute(
"UPDATE orders SET status = %s WHERE id = %s",
("completed", 123)
)
# Batch execute
conn.execute_many(
"INSERT INTO logs (message) VALUES (%s)",
[("Log 1",), ("Log 2",), ("Log 3",)],
batch_size=100
)
conn.disconnect()
MongoDB (Synchronous)¶
Synchronous MongoDB client using pymongo.
Basic Usage¶
from dataflow.connection import MongoDB
mongo = MongoDB(
host="localhost",
port=27017,
database="analytics",
username="user",
password="password"
)
mongo.connect()
# Fetch documents
orders = mongo.fetch("orders", filters={"status": "pending"})
# Insert documents
mongo.insert("orders", data=[
{"product": "Widget", "quantity": 10},
{"product": "Gadget", "quantity": 5}
])
# Upsert (update or insert)
mongo.upsert(
"orders",
filters={"order_id": "ORD-123"},
data={"order_id": "ORD-123", "status": "shipped"}
)
# Delete documents
mongo.delete("orders", filters={"status": "cancelled"})
mongo.disconnect()
MongoDB (Asynchronous)¶
Asynchronous MongoDB client using motor for non-blocking operations.
Basic Usage¶
from dataflow.connection import MongoDBAsync
import asyncio
async def main():
mongo = MongoDBAsync(
host="localhost",
port=27017,
database="analytics",
username="user",
password="password"
)
mongo.connect()
# Async fetch
orders = await mongo.fetch("orders", filters={"status": "pending"})
# Async insert
await mongo.insert("orders", data=[{"product": "Widget"}])
# Async upsert
await mongo.upsert(
"orders",
filters={"order_id": "ORD-123"},
data={"status": "shipped"}
)
await mongo.disconnect()
asyncio.run(main())
SQLite¶
Local SQLite database for caching and lightweight storage.
Basic Usage¶
from dataflow.connection import SQLiteConnection
db = SQLiteConnection(database_path="./cache.db")
db.connect()
# Execute schema creation
db.execute("""
CREATE TABLE IF NOT EXISTS sync_cache (
id TEXT PRIMARY KEY,
last_sync TIMESTAMP
)
""")
# Fetch data
cache = db.fetch("SELECT * FROM sync_cache WHERE id = ?", ("item-1",))
# Insert data
db.insert("sync_cache", data=[{"id": "item-1", "last_sync": "2024-01-01"}])
# Upsert data
db.upsert(
"sync_cache",
filters={"id": "item-1"},
data={"id": "item-1", "last_sync": "2024-01-02"}
)
# Delete data
db.delete("sync_cache", filters={"id": "item-1"})
db.disconnect()
Security Features¶
All connection classes implement security measures to prevent injection attacks.
SQL Injection Prevention¶
SQL Server and SQLite connections use parameterized queries:
# Safe - parameters are escaped
conn.fetch("SELECT * FROM users WHERE id = %s", (user_id,))
# Never do this - vulnerable to injection
conn.fetch(f"SELECT * FROM users WHERE id = {user_id}") # DON'T DO THIS
NoSQL Injection Prevention¶
MongoDB connections block dangerous operators:
# These operators are blocked:
# $where, $function, $accumulator
# Safe usage
mongo.fetch("collection", filters={"status": "active"})
# This would raise an error if attempted
mongo.fetch("collection", filters={"$where": "..."}) # Blocked
Type Validation¶
All parameters are validated before being passed to database drivers:
Query parameters must be of allowed types (str, int, float, bool, datetime, None)
Complex objects are rejected to prevent serialization attacks
Connection Lifecycle¶
Always ensure connections are properly closed, preferably using try/finally:
from dataflow.connection import MicrosoftSQLServerConnection, MongoDB
def my_etl_job():
source = MicrosoftSQLServerConnection(...)
dest = MongoDB(...)
try:
source.connect()
dest.connect()
data = source.fetch("SELECT * FROM table")
dest.insert("collection", data=data)
finally:
source.disconnect()
dest.disconnect()