Skip to content

pipeloom

pipeloom

pipeloom

Reusable scaffolding for ETL-style, multi-threaded pipelines with:

  • One SQLite writer thread in WAL mode
  • Workers publishing progress/results over a Queue
  • Rich progress (sticky overall + transient per-task)
  • Typer CLI

Top-level exports are provided for convenience so you can do:

from pipeloom import (
    run_pipeline,
    SQLiteWriter,
    TaskDef,
    MsgTaskStarted,  # or alias: MsgTaskStart
    MsgTaskProgress,
    MsgTaskFinished,
    SENTINEL,
    make_overall_progress,
    make_task_progress,
    preregister_task_bars,
    console,
    logger
)

MsgTaskFinished dataclass

MsgTaskFinished(task_id: int, status: str, finished_at: str, result: str | None = None, message: str = '')

Final status + optional result payload for a completed or failed task.

Attributes:

Name Type Description
task_id int

Unique identifier for the task.

status str

Final status of the task ("done" | "error" | "cancelled").

finished_at str

ISO 8601 UTC string (avoid tz-naive datetimes over queues).

result str | None

Optional result payload for the completed task.

message str

Optional status message for the completed task.

MsgTaskProgress dataclass

MsgTaskProgress(task_id: int, step: int, total: int, message: str = '')

Incremental progress signal. The writer translates this into a fractional progress column and updates the Rich bar.

Attributes:

Name Type Description
task_id int

Unique identifier for the task.

step int

Current progress step (1-based).

total int

Total number of progress steps.

MsgTaskStarted dataclass

MsgTaskStarted(task_id: int, name: str, started_at: str)

Signal that a task has begun. Posted by a worker thread as soon as the task is admitted for work.

Attributes:

Name Type Description
task_id int

Unique identifier for the task.

name str

Display-friendly name shown in logs and progress UI.

started_at str

ISO 8601 UTC string (avoid tz-naive datetimes over queues).

SQLiteWriter

SQLiteWriter(db_path: Path, msg_q: Queue[Msg], *, wal: bool = True, store_task_status: bool = True, task_progress: Progress | None = None, task_bar_map: dict[int, TaskID] | None = None)

Bases: Thread

Dedicated thread that exclusively writes to SQLite and manages per-task bars.

Parameters:

Name Type Description Default
db_path Path

Path to the SQLite database file (or ':memory:').

required
msg_q Queue[Msg]

Thread-safe queue from which this writer consumes message objects.

required
wal bool

Whether to enable WAL on file-backed databases.

True
store_task_status bool

Toggle persistence of task status into the task_runs table.

True
task_progress Progress | None

Rich Progress manager used for per-task bars (transient).

None
task_bar_map dict[int, TaskID] | None

Pre-registered mapping: task_id -> Rich TaskID (prevents render races).

None
Source code in pipeloom/writer.py
def __init__(
    self,
    db_path: Path,
    msg_q: queue.Queue[Msg],
    *,
    wal: bool = True,
    store_task_status: bool = True,
    task_progress: Progress | None = None,
    task_bar_map: dict[int, TaskID] | None = None,
) -> None:
    super().__init__(daemon=True)
    self.db_path = db_path
    self.msg_q = msg_q
    self._use_wal = wal
    self._store = store_task_status
    self._progress = task_progress
    self._progress_tasks: dict[int, TaskID] = dict(task_bar_map or {})
    self._conn: sqlite3.Connection | None = None
    self._stop_flag = threading.Event()

run

run() -> None

Lifecycle: - open connection - initialize schema (optional) - consume messages until SENTINEL arrives - checkpoint and close

Source code in pipeloom/writer.py
def run(self) -> None:
    """
    Lifecycle:
    - open connection
    - initialize schema (optional)
    - consume messages until SENTINEL arrives
    - checkpoint and close
    """
    try:
        self._conn = dbmod.connect(self.db_path, wal=self._use_wal)
        dbmod.init_schema(self._conn, store_task_status=self._store)
        LOG.info("DB writer started → %s (WAL=%s)", self.db_path, self._use_wal)

        while not self._stop_flag.is_set():
            try:
                item = self.msg_q.get(
                    timeout=0.5,
                )  # small timeout to enable graceful exit
            except queue.Empty:
                continue

            if item is SENTINEL:
                self.msg_q.task_done()
                break

            if isinstance(item, MsgTaskStarted):
                self._on_started(item)
            elif isinstance(item, MsgTaskProgress):
                self._on_progress(item)
            elif isinstance(item, MsgTaskFinished):
                self._on_finished(item)
            else:
                LOG.warning("Unknown message: %r", type(item))

            self.msg_q.task_done()

    finally:
        # Best-effort cleanup & checkpoint
        try:
            if self._conn:
                self._conn.execute("ANALYZE;")
        except Exception:
            LOG.debug("ANALYZE failed", exc_info=True)

        if self._conn:
            with _Suppress(sqlite3.OperationalError):
                dbmod.wal_checkpoint(self._conn, "TRUNCATE")
                self._conn.close()
            self._conn = None

        LOG.info("DB writer stopped")

run_pipeline

run_pipeline(tasks: Iterable[TTask], db_path: Path = Path('./pipeloom.db'), *, workers: int | None = None, wal: bool = True, store_task_status: bool = True, worker_fn: Callable[[TTask, Queue[Msg]], None]) -> None

Execute a workload using a single-writer SQLite backend.

Parameters

tasks : Iterable[TTask] Opaque task objects (functions, dataclasses, dicts, etc.). The engine does not inspect them; worker_fn knows how to run each. db_path : Path SQLite DB for pipeline metadata/observability. workers : int | None Max concurrent workers. Defaults to min(len(tasks), CPU count) when possible. wal : bool Enable SQLite WAL mode for better concurrency. store_task_status : bool Maintain task_runs table. worker_fn : Callable[[TTask, queue.Queue[Msg]], None] Invoked per task. Communicate via msg_q.

Notes

The writer thread is shut down before the per-task progress context exits, so the final frame shows only “All tasks 100%”.

Source code in pipeloom/engine.py
def run_pipeline(
    tasks: Iterable[TTask],
    db_path: Path = Path("./pipeloom.db"),
    *,
    workers: int | None = None,
    wal: bool = True,
    store_task_status: bool = True,
    worker_fn: Callable[[TTask, queue.Queue[Msg]], None],
) -> None:
    """
    Execute a workload using a single-writer SQLite backend.

    Parameters
    ----------
    tasks : Iterable[TTask]
        Opaque task objects (functions, dataclasses, dicts, etc.). The engine
        does not inspect them; `worker_fn` knows how to run each.
    db_path : Path
        SQLite DB for pipeline metadata/observability.
    workers : int | None
        Max concurrent workers. Defaults to min(len(tasks), CPU count) when possible.
    wal : bool
        Enable SQLite WAL mode for better concurrency.
    store_task_status : bool
        Maintain `task_runs` table.
    worker_fn : Callable[[TTask, queue.Queue[Msg]], None]
        Invoked per task. Communicate via `msg_q`.

    Notes
    -----
    The writer thread is shut down *before* the per-task progress context exits,
    so the final frame shows only “All tasks 100%”.
    """
    all_tasks = list(tasks)
    n = len(all_tasks)
    workers = _pick_workers(all_tasks, workers)

    # Sized message queue; scale with worker count
    msg_q: queue.Queue[Msg] = queue.Queue(maxsize=max(64, workers * 8))

    # Gentle shutdown
    stop_event = threading.Event()

    def handle_sigint(signum, frame):  # type: ignore[override]
        LOG.warning("SIGINT received; finishing in-flight tasks, then exiting…")
        stop_event.set()

    # Install handler only in main thread
    try:
        if threading.current_thread() is threading.main_thread():
            signal.signal(signal.SIGINT, handle_sigint)
    except Exception as e:
        LOG.debug("signal handler not installed: %r", e)

    overall_p = make_overall_progress()
    task_p = make_task_progress()

    start = time.time()
    writer: SQLiteWriter | None = None
    futures = []

    try:
        with overall_p:
            overall = overall_p.add_task("[cyan]All tasks", total=n)
            with task_p:
                bar_map = preregister_task_bars(task_p, n)

                # Start writer after bars exist
                writer = SQLiteWriter(
                    db_path=db_path,
                    msg_q=msg_q,
                    wal=wal,
                    store_task_status=store_task_status,
                    task_progress=task_p,
                    task_bar_map=bar_map,
                )
                writer.start()

                with ThreadPoolExecutor(
                    max_workers=workers,
                    thread_name_prefix="worker",
                ) as ex:
                    futures = [ex.submit(worker_fn, t, msg_q) for t in all_tasks]

                    try:
                        for fut in as_completed(futures):
                            if stop_event.is_set():
                                # Cancel whatever hasn't started yet
                                ex.shutdown(wait=False, cancel_futures=True)
                                break
                            # Surface exceptions here; let caller see failures
                            _ = fut.result()
                            overall_p.advance(overall, 1)
                    except KeyboardInterrupt:
                        LOG.warning("KeyboardInterrupt; requesting graceful stop…")
                        stop_event.set()
                        ex.shutdown(wait=False, cancel_futures=True)
                        # Drain any completed futures to surface exceptions
                        for fut in futures:
                            if fut.done():
                                try:
                                    _ = fut.result()
                                except (CancelledError, Exception) as e:
                                    LOG.error("Task error during shutdown: %s", e)

        # Ensure last UI refresh while task progress context is alive
        task_p.refresh()

    finally:
        # Always signal writer and join
        if writer is not None and writer.is_alive():
            try:
                msg_q.put(SENTINEL)
            except Exception as e:
                LOG.debug("Failed to enqueue SENTINEL: %s", e)
            writer.join(timeout=30)

        LOG.info("Elapsed: %.2fs", time.time() - start)

make_overall_progress

make_overall_progress() -> Progress

Create the overall persistent progress manager.

This remains visible after completion and is ideal for a single “All tasks: 100%” summary line once per-task bars disappear.

Returns:

Name Type Description
Progress Progress

The Rich Progress instance managing overall progress.

Source code in pipeloom/progress.py
def make_overall_progress() -> Progress:
    """
    Create the overall persistent progress manager.

    This remains visible after completion and is ideal for a single “All tasks: 100%”
    summary line once per-task bars disappear.

    Returns:
        Progress: The Rich Progress instance managing overall progress.
    """
    return Progress(
        SpinnerColumn(),
        "[progress.description]{task.description}",
        BarColumn(),
        TaskProgressColumn(),
        MofNCompleteColumn(),
        "•",
        TimeElapsedColumn(),
        "•",
        TimeRemainingColumn(),
        transient=False,  # keep final frame visible
        console=console,
        refresh_per_second=8,  # throttle repaints to avoid log spam jitter
        disable=not console.is_terminal,
    )

make_task_progress

make_task_progress() -> Progress

Create the transient per-task progress manager.

This hides when the context exits so your terminal remains tidy.

Returns:

Name Type Description
Progress Progress

The Rich Progress instance managing task bars.

Source code in pipeloom/progress.py
def make_task_progress() -> Progress:
    """
    Create the transient per-task progress manager.

    This hides when the context exits so your terminal remains tidy.

    Returns:
        Progress: The Rich Progress instance managing task bars.
    """
    return Progress(
        SpinnerColumn(),
        "[progress.description]{task.description}",
        BarColumn(),
        TaskProgressColumn(),
        TimeElapsedColumn(),
        transient=True,  # auto-hide on exit
        console=console,
        refresh_per_second=8,
        disable=not console.is_terminal,
    )

preregister_task_bars

preregister_task_bars(task_progress: Progress, num_tasks: int) -> dict[int, TaskID]

Pre-create one Rich bar per task and return a mapping {task_id -> TaskID}.

Parameters:

Name Type Description Default
task_progress Progress

The Rich Progress instance managing task bars.

required
num_tasks int

The total number of tasks to create bars for.

required

Returns:

Type Description
dict[int, TaskID]

dict[int, TaskID]: A mapping of task IDs to their Rich TaskIDs.

Why up-front?

If a worker publishes a MsgTaskProgress before the writer has created the bar, updates can be lost. Pre-registering eliminates this race completely.

Source code in pipeloom/progress.py
def preregister_task_bars(task_progress: Progress, num_tasks: int) -> dict[int, TaskID]:
    """
    Pre-create one Rich bar per task and return a mapping {task_id -> TaskID}.

    Args:
        task_progress (Progress): The Rich Progress instance managing task bars.
        num_tasks (int): The total number of tasks to create bars for.

    Returns:
        dict[int, TaskID]: A mapping of task IDs to their Rich TaskIDs.

    Why up-front?
    -------------
    If a worker publishes a MsgTaskProgress before the writer has created the bar,
    updates can be lost. Pre-registering eliminates this race completely.
    """
    mapping: dict[int, TaskID] = {}
    for i in range(1, num_tasks + 1):
        mapping[i] = task_progress.add_task(f"[bold]task-{i}", total=100)
    return mapping

setup_logging

setup_logging(verbose: int, log_file: Path | None = None) -> None

Configure logging.

Parameters:

Name Type Description Default
verbose int

0 = WARNING, 1 = INFO, 2+ = DEBUG

required
log_file Path | None

If provided, a plain (non-Rich) file handler is added for CI/grep.

None
Source code in pipeloom/__init__.py
def setup_logging(verbose: int, log_file: Path | None = None) -> None:
    """
    Configure logging.

    Args:
        verbose (int): 0 = WARNING, 1 = INFO, 2+ = DEBUG
        log_file (Path | None): If provided, a plain (non-Rich) file handler is added for CI/grep.
    """
    level = logging.WARNING if verbose <= 0 else logging.INFO if verbose == 1 else logging.DEBUG
    handlers: list[logging.Handler] = [
        RichHandler(
            rich_tracebacks=True,
            show_time=False,
            show_level=True,
            show_path=False,
            console=console,  # IMPORTANT: share with Progress
            markup=True,
        ),
    ]

    if log_file:
        fh = logging.FileHandler(log_file, encoding="utf-8")
        fh.setLevel(level)
        fh.setFormatter(
            logging.Formatter(
                fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S",
            ),
        )
        handlers.append(fh)

    logging.basicConfig(level=level, format="%(message)s", handlers=handlers)
    logger.setLevel(level)