pipeloom
Lightweight Python framework for orchestrating concurrent tasks with a single-writer persistence model and live progress tracking.
Why pipeloom?¶
Many workflows require concurrency, persistence, and monitoring — but existing solutions can be burdonsome to spin up, maintain, and integrate with your workflow. pipeloom aims to simplify this with a message-driven, single-writer model that’s safe, observable, and extensible.
Who is it for?¶
- Data engineers needing lightweight pipelines without Airflow/Kubernetes.
- Analysts automating ETL locally or on servers.
- Developers who want concurrency + persistence without database lock headaches.
Quickstart¶
Run the demo locally:
Or with Docker:
Example output (Rich progress bars):

Features¶
- Concurrent task orchestration → Run many jobs at once safely.
- Single-writer persistence → No more SQLite
database lockederrors. - Live progress tracking → Always know what’s running, failed, or complete.
- Declarative API → Define tasks and pipelines with minimal boilerplate.
- CLI-first → Run pipelines from the terminal with a Typer-powered CLI.
Design Principles¶
- All SQLite access happens in one thread (
SQLiteWriter). - Two progress managers:
- Overall (remains after completion).
- Per-task (clears on finish).
- Workers never write to the DB directly; they send typed messages to the writer.
Architecture¶
flowchart LR
subgraph App["Your App / CLI (Typer)"]
CLI["CLI (Typer)"]
Engine["Engine (orchestrator)"]
end
subgraph Work["Worker Threads (N)"]
W1["Worker #1"]
Wn["Worker #N"]
end
subgraph Writer["SQLite Writer Thread"]
Q["Queue[object]"]
SW["SQLiteWriter (single sqlite3.Connection)"]
end
subgraph DB["SQLite Database"]
WAL["db-wal / db-shm"]
Main["main.db"]
end
CLI --> Engine
Engine -->|ThreadPoolExecutor| Work
Work -->|Messages| Q
Engine -->|start| SW
SW -->|consume| Q
SW -->|INSERT/UPDATE| DB
DB <--> WAL
Why this works:
Only the writer thread touches SQLite. WAL mode allows concurrent reads during writes. Workers communicate via typed messages.
Using pipeloom in Your ETL¶
- Define tasks with
Task(...). - Provide a
worker_fn(task, msg_q)that emits: MsgTaskStartedMsgTaskProgressMsgTaskFinished- Run:
Pass
store_task_status=Falseif you don’t want thetask_runstable.
Extending pipeloom¶
Custom worker logic¶
Workers perform the actual work, then publish messages.
Custom message types¶
Add domain-specific messages and handle them in the writer.
Example: domain-specific upsert:
from dataclasses import dataclass
@dataclass(frozen=True)
class MsgUpsertRecord:
table: str
key: str
payload: dict
Emit from worker:
Handle in writer:
elif isinstance(item, MsgUpsertRecord):
self._on_upsert(item)
def _on_upsert(self, m: MsgUpsertRecord):
self._conn.execute(
"INSERT INTO users(key, name, active) VALUES (?, ?, ?) "
"ON CONFLICT(key) DO UPDATE SET name=excluded.name, active=excluded.active",
(m.key, m.payload["name"], m.payload["active"]),
)
self._conn.commit()
Observability¶
Keep task_runs enabled unless you have a strong reason not to.