pipeloom¶
pipeloom ¶
pipeloom¶
Reusable scaffolding for ETL-style, multi-threaded pipelines with:
- One SQLite writer thread in WAL mode
- Workers publishing progress/results over a Queue
- Rich progress (sticky overall + transient per-task)
- Typer CLI
Top-level exports are provided for convenience so you can do:
from pipeloom import (
run_pipeline,
SQLiteWriter,
TaskDef,
MsgTaskStarted, # or alias: MsgTaskStart
MsgTaskProgress,
MsgTaskFinished,
SENTINEL,
make_overall_progress,
make_task_progress,
preregister_task_bars,
console,
logger
)
MsgTaskFinished
dataclass
¶
MsgTaskFinished(task_id: int, status: str, finished_at: str, result: str | None = None, message: str = '')
Final status + optional result payload for a completed or failed task.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
int
|
Unique identifier for the task. |
status |
str
|
Final status of the task ("done" | "error" | "cancelled"). |
finished_at |
str
|
ISO 8601 UTC string (avoid tz-naive datetimes over queues). |
result |
str | None
|
Optional result payload for the completed task. |
message |
str
|
Optional status message for the completed task. |
MsgTaskProgress
dataclass
¶
Incremental progress signal. The writer translates this into a fractional progress column and updates the Rich bar.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
int
|
Unique identifier for the task. |
step |
int
|
Current progress step (1-based). |
total |
int
|
Total number of progress steps. |
MsgTaskStarted
dataclass
¶
Signal that a task has begun. Posted by a worker thread as soon as the task is admitted for work.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
int
|
Unique identifier for the task. |
name |
str
|
Display-friendly name shown in logs and progress UI. |
started_at |
str
|
ISO 8601 UTC string (avoid tz-naive datetimes over queues). |
SQLiteWriter ¶
SQLiteWriter(db_path: Path, msg_q: Queue[Msg], *, wal: bool = True, store_task_status: bool = True, task_progress: Progress | None = None, task_bar_map: dict[int, TaskID] | None = None)
Bases: Thread
Dedicated thread that exclusively writes to SQLite and manages per-task bars.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
Path
|
Path to the SQLite database file (or ':memory:'). |
required |
msg_q
|
Queue[Msg]
|
Thread-safe queue from which this writer consumes message objects. |
required |
wal
|
bool
|
Whether to enable WAL on file-backed databases. |
True
|
store_task_status
|
bool
|
Toggle persistence of task status into the |
True
|
task_progress
|
Progress | None
|
Rich Progress manager used for per-task bars (transient). |
None
|
task_bar_map
|
dict[int, TaskID] | None
|
Pre-registered mapping: task_id -> Rich TaskID (prevents render races). |
None
|
Source code in pipeloom/writer.py
run ¶
Lifecycle: - open connection - initialize schema (optional) - consume messages until SENTINEL arrives - checkpoint and close
Source code in pipeloom/writer.py
run_pipeline ¶
run_pipeline(tasks: Iterable[TTask], db_path: Path = Path('./pipeloom.db'), *, workers: int | None = None, wal: bool = True, store_task_status: bool = True, worker_fn: Callable[[TTask, Queue[Msg]], None]) -> None
Execute a workload using a single-writer SQLite backend.
Parameters¶
tasks : Iterable[TTask]
Opaque task objects (functions, dataclasses, dicts, etc.). The engine
does not inspect them; worker_fn knows how to run each.
db_path : Path
SQLite DB for pipeline metadata/observability.
workers : int | None
Max concurrent workers. Defaults to min(len(tasks), CPU count) when possible.
wal : bool
Enable SQLite WAL mode for better concurrency.
store_task_status : bool
Maintain task_runs table.
worker_fn : Callable[[TTask, queue.Queue[Msg]], None]
Invoked per task. Communicate via msg_q.
Notes¶
The writer thread is shut down before the per-task progress context exits, so the final frame shows only “All tasks 100%”.
Source code in pipeloom/engine.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
make_overall_progress ¶
Create the overall persistent progress manager.
This remains visible after completion and is ideal for a single “All tasks: 100%” summary line once per-task bars disappear.
Returns:
| Name | Type | Description |
|---|---|---|
Progress |
Progress
|
The Rich Progress instance managing overall progress. |
Source code in pipeloom/progress.py
make_task_progress ¶
Create the transient per-task progress manager.
This hides when the context exits so your terminal remains tidy.
Returns:
| Name | Type | Description |
|---|---|---|
Progress |
Progress
|
The Rich Progress instance managing task bars. |
Source code in pipeloom/progress.py
preregister_task_bars ¶
Pre-create one Rich bar per task and return a mapping {task_id -> TaskID}.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_progress
|
Progress
|
The Rich Progress instance managing task bars. |
required |
num_tasks
|
int
|
The total number of tasks to create bars for. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, TaskID]
|
dict[int, TaskID]: A mapping of task IDs to their Rich TaskIDs. |
Why up-front?¶
If a worker publishes a MsgTaskProgress before the writer has created the bar, updates can be lost. Pre-registering eliminates this race completely.
Source code in pipeloom/progress.py
setup_logging ¶
Configure logging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose
|
int
|
0 = WARNING, 1 = INFO, 2+ = DEBUG |
required |
log_file
|
Path | None
|
If provided, a plain (non-Rich) file handler is added for CI/grep. |
None
|