Skip to content

pipeloom.writer

pipeloom.writer

writer.py

The single SQLite writer thread. It:

  • Opens and owns the only SQLite connection (configured for WAL).
  • Consumes message objects from a thread-safe Queue.
  • Writes task status/progress to SQLite (optional).
  • Updates/removes Rich per-task progress bars safely.

Why a single writer?

SQLite allows multiple writers serially, but a single connection and a single owning thread is the least surprising, highest-reliability approach. It also avoids cross-thread connection misuse, which is a common source of bugs.

SQLiteWriter

SQLiteWriter(db_path: Path, msg_q: Queue[Msg], *, wal: bool = True, store_task_status: bool = True, task_progress: Progress | None = None, task_bar_map: dict[int, TaskID] | None = None)

Bases: Thread

Dedicated thread that exclusively writes to SQLite and manages per-task bars.

Parameters:

Name Type Description Default
db_path Path

Path to the SQLite database file (or ':memory:').

required
msg_q Queue[Msg]

Thread-safe queue from which this writer consumes message objects.

required
wal bool

Whether to enable WAL on file-backed databases.

True
store_task_status bool

Toggle persistence of task status into the task_runs table.

True
task_progress Progress | None

Rich Progress manager used for per-task bars (transient).

None
task_bar_map dict[int, TaskID] | None

Pre-registered mapping: task_id -> Rich TaskID (prevents render races).

None
Source code in pipeloom/writer.py
def __init__(
    self,
    db_path: Path,
    msg_q: queue.Queue[Msg],
    *,
    wal: bool = True,
    store_task_status: bool = True,
    task_progress: Progress | None = None,
    task_bar_map: dict[int, TaskID] | None = None,
) -> None:
    super().__init__(daemon=True)
    self.db_path = db_path
    self.msg_q = msg_q
    self._use_wal = wal
    self._store = store_task_status
    self._progress = task_progress
    self._progress_tasks: dict[int, TaskID] = dict(task_bar_map or {})
    self._conn: sqlite3.Connection | None = None
    self._stop_flag = threading.Event()

run

run() -> None

Lifecycle: - open connection - initialize schema (optional) - consume messages until SENTINEL arrives - checkpoint and close

Source code in pipeloom/writer.py
def run(self) -> None:
    """
    Lifecycle:
    - open connection
    - initialize schema (optional)
    - consume messages until SENTINEL arrives
    - checkpoint and close
    """
    try:
        self._conn = dbmod.connect(self.db_path, wal=self._use_wal)
        dbmod.init_schema(self._conn, store_task_status=self._store)
        LOG.info("DB writer started → %s (WAL=%s)", self.db_path, self._use_wal)

        while not self._stop_flag.is_set():
            try:
                item = self.msg_q.get(
                    timeout=0.5,
                )  # small timeout to enable graceful exit
            except queue.Empty:
                continue

            if item is SENTINEL:
                self.msg_q.task_done()
                break

            if isinstance(item, MsgTaskStarted):
                self._on_started(item)
            elif isinstance(item, MsgTaskProgress):
                self._on_progress(item)
            elif isinstance(item, MsgTaskFinished):
                self._on_finished(item)
            else:
                LOG.warning("Unknown message: %r", type(item))

            self.msg_q.task_done()

    finally:
        # Best-effort cleanup & checkpoint
        try:
            if self._conn:
                self._conn.execute("ANALYZE;")
        except Exception:
            LOG.debug("ANALYZE failed", exc_info=True)

        if self._conn:
            with _Suppress(sqlite3.OperationalError):
                dbmod.wal_checkpoint(self._conn, "TRUNCATE")
                self._conn.close()
            self._conn = None

        LOG.info("DB writer stopped")