Subsection

Progress Tracking for Python GIS Batch Pipelines

Embedding visible, accurate progress state into a spatial batch job transforms a black-box process into a monitorable workflow — and is part of the broader Spatial Batch Processing & Async Workflows guide on building resilient Python GIS pipelines.

TL;DR

Decouple the rendering layer from the execution layer: workers advance a threading.Lock-guarded counter; the main thread drives the terminal UI. In asyncio pipelines, rich.progress.advance() is safe to call from coroutine context. Persist completion state to a JSON manifest so jobs can resume after a crash or spot-instance eviction.

Prerequisites

Requirement Version Notes
Python 3.9+ asyncio.to_thread() and stable concurrent.futures
rich ≥ 13.0 Multi-column progress dashboard, live display
tqdm ≥ 4.65 Lightweight bars, tqdm.asyncio support
rasterio ≥ 1.3 Raster I/O, windowed reads
pyogrio ≥ 0.6 Fast vector I/O (preferred over fiona)
click ≥ 8.1 CLI argument parsing and signal context
pip install rich tqdm rasterio pyogrio click
python -c "import rich; print(rich.__version__)"

For patterns that structure multi-stage pipelines into composable subcommands, see CLI Subcommand Organization, which covers how to wire progress state across nested command groups.

Problem Framing

A synchronous loop over 10 000 GeoPackage files gives no indication of throughput, remaining time, or which file caused a stall. When a GDAL driver blocks on a corrupt geometry ring or a remote COG fetch times out, the terminal sits silent for minutes before the job either crashes or continues without explanation. Operators kill what they cannot observe.

The same pipeline with accurate progress instrumentation shows: 3 412 / 10 000 files | 47 feat/s | ETA 3 m 22 s | 12 errors. That single line of terminal state reduces unnecessary interruptions, guides parallelism tuning, and provides the timestamp audit trail needed for post-mortems.

The core engineering challenge is that naive implementations break under concurrency. Calling tqdm.update() from multiple threads, writing to stdout from worker processes, or holding a display lock across a slow GDAL open all degrade throughput or produce corrupted output. The patterns below address each failure mode explicitly.


Progress Tracking Architecture Data-flow diagram showing how spatial task items flow through workers that advance a thread-safe counter, while the main thread reads that counter to update the Rich terminal UI and write a JSON checkpoint manifest. Task Queue files / tiles / features Worker Pool ThreadPoolExecutor Async Workers asyncio coroutines SafeCounter threading.Lock Rich UI main thread only Checkpoint JSON manifest .advance() sync / N tasks

Step-by-Step Implementation

Step 1 — Enumerate tasks before spawning workers

Pre-calculating the total before opening the worker pool gives the progress renderer an accurate denominator. Avoid deferring this calculation into the workers themselves, where race conditions can cause double-counting.

from pathlib import Path
import pyogrio

def enumerate_vector_tasks(input_dir: str) -> list[Path]:
    """Return all GeoPackage files in input_dir, sorted for reproducible ordering."""
    files = sorted(Path(input_dir).glob("*.gpkg"))
    if not files:
        raise FileNotFoundError(f"No .gpkg files found in {input_dir!r}")
    return files

Step 2 — Build a thread-safe counter

Python’s threading.Lock serialises access to a shared integer. Keep the lock held for the minimum time needed — just the counter increment, not the I/O operation.

import threading
from dataclasses import dataclass, field

@dataclass
class SafeCounter:
    total: int
    _completed: int = field(default=0, init=False)
    _failed: int = field(default=0, init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)

    def advance(self, failed: bool = False) -> None:
        with self._lock:
            self._completed += 1
            if failed:
                self._failed += 1

    @property
    def completed(self) -> int:
        with self._lock:
            return self._completed

    @property
    def failed(self) -> int:
        with self._lock:
            return self._failed

    @property
    def fraction(self) -> float:
        with self._lock:
            return min(self._completed / self.total, 1.0) if self.total else 0.0

Step 3 — Run workers and render from the main thread

The Rich live display must be driven exclusively from the main thread. Workers mutate the SafeCounter; as_completed() provides the synchronisation boundary where the main thread safely reads the counter and updates the UI.

import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.progress import (
    Progress, BarColumn, TextColumn,
    TimeRemainingColumn, MofNCompleteColumn
)
import pyogrio
import geopandas as gpd
from pathlib import Path

def _validate_one(fp: Path, counter: SafeCounter) -> tuple[Path, bool, str]:
    """Worker: open a GeoPackage, validate geometry rings, return result."""
    try:
        gdf: gpd.GeoDataFrame = pyogrio.read_dataframe(fp, use_arrow=True)
        # Buffer(0) repairs self-intersections and validates ring orientation
        gdf["geometry"] = gdf.geometry.buffer(0)
        counter.advance(failed=False)
        return (fp, True, "")
    except Exception as exc:
        counter.advance(failed=True)
        return (fp, False, str(exc))

def run_vector_validation(input_dir: str, max_workers: int = 4) -> int:
    """Validate all GeoPackage files; return POSIX exit code (0 = clean, 1 = errors)."""
    files = enumerate_vector_tasks(input_dir)
    counter = SafeCounter(total=len(files))

    with Progress(
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        MofNCompleteColumn(),
        TimeRemainingColumn(),
        transient=False,
    ) as ui:
        task_id = ui.add_task("Validating vectors", total=len(files))

        with ThreadPoolExecutor(max_workers=max_workers) as pool:
            futures = {pool.submit(_validate_one, fp, counter): fp for fp in files}

            for future in as_completed(futures):
                # Sync UI from main thread only — never from workers
                ui.update(task_id, completed=counter.completed)
                fp, ok, err = future.result()
                if not ok:
                    ui.console.log(f"[yellow]WARN[/yellow] {fp.name}: {err}")

        ui.update(task_id, completed=counter.total)

    failed = counter.failed
    if failed:
        print(f"{failed}/{counter.total} files failed validation.", flush=True)
        return 1
    return 0

Step 4 — Async-compatible progress for I/O pipelines

When pipelines rely on async I/O for raster processing — fetching cloud-optimized GeoTIFFs, querying feature servers, or streaming tile APIs — synchronous workers are replaced by coroutines. rich.progress.advance() is thread-safe and non-blocking, so it can be called directly from coroutine context without wrapping.

import asyncio
import aiohttp
import rasterio
from rich.progress import Progress, TaskID
from pathlib import Path

EPSG_4326 = 4326

async def _fetch_and_validate_cog(
    session: aiohttp.ClientSession,
    url: str,
    out_dir: Path,
    progress: Progress,
    task_id: TaskID,
    semaphore: asyncio.Semaphore,
) -> tuple[str, bool]:
    """Fetch a COG URL, verify CRS is EPSG:4326, write to out_dir."""
    async with semaphore:
        try:
            async with session.get(url) as resp:
                resp.raise_for_status()
                data = await resp.read()

            dest = out_dir / Path(url).name
            dest.write_bytes(data)

            # Offload blocking GDAL open to thread pool
            def _check_crs() -> bool:
                with rasterio.open(dest) as src:
                    return src.crs and src.crs.to_epsg() == EPSG_4326

            ok = await asyncio.to_thread(_check_crs)
            progress.advance(task_id)
            return (url, ok)
        except Exception:
            progress.advance(task_id)
            return (url, False)

async def batch_fetch_cogs(urls: list[str], out_dir: Path, concurrency: int = 10) -> list[tuple[str, bool]]:
    """Fetch a list of COG URLs concurrently with a bounded semaphore."""
    out_dir.mkdir(parents=True, exist_ok=True)
    semaphore = asyncio.Semaphore(concurrency)

    with Progress() as progress:
        task_id = progress.add_task("Fetching COGs", total=len(urls))
        async with aiohttp.ClientSession() as session:
            tasks = [
                _fetch_and_validate_cog(session, url, out_dir, progress, task_id, semaphore)
                for url in urls
            ]
            results = await asyncio.gather(*tasks, return_exceptions=False)

    return list(results)

Step 5 — Persist checkpoint state for long jobs

Geospatial jobs that process hundreds of gigabytes of imagery or millions of vector features run for hours. Spot-instance evictions, OOM kills, and manual interruptions (SIGINT) are routine. A checkpoint manifest written at regular intervals lets the pipeline resume from the last safe state rather than restarting from scratch. See implementing checkpointing for interrupted spatial batches for a full treatment of atomic writes, idempotent task execution, and state reconciliation.

import json
import os
import time
from pathlib import Path

CHECKPOINT_INTERVAL = 50  # tasks between writes

class CheckpointManager:
    def __init__(self, manifest_path: Path):
        self.path = manifest_path
        self._completed: set[str] = set()
        self._failed: dict[str, str] = {}
        self._since_flush = 0
        if manifest_path.exists():
            self._load()

    def _load(self) -> None:
        data = json.loads(self.path.read_text())
        self._completed = set(data.get("completed", []))
        self._failed = data.get("failed", {})

    def record(self, key: str, error: str | None = None) -> None:
        if error:
            self._failed[key] = error
        else:
            self._completed.add(key)
        self._since_flush += 1
        if self._since_flush >= CHECKPOINT_INTERVAL:
            self.flush()

    def flush(self) -> None:
        """Atomic write: tmp file then os.replace() to avoid corrupt state on crash."""
        tmp = self.path.with_suffix(".tmp")
        payload = {
            "completed": sorted(self._completed),
            "failed": self._failed,
            "flushed_at": time.time(),
        }
        tmp.write_text(json.dumps(payload, indent=2))
        os.replace(tmp, self.path)
        self._since_flush = 0

    def is_done(self, key: str) -> bool:
        return key in self._completed

    @property
    def completed_count(self) -> int:
        return len(self._completed)

Configuration Integration

Progress verbosity and checkpointing behaviour should follow the site’s layered config precedence: compiled defaults → YAML config file → environment variables → CLI flags. This ensures the same pipeline binary behaves correctly in an interactive developer session, a CI runner, and a headless cloud batch job.

import os
import click

DEFAULTS = {
    "workers": 4,
    "checkpoint_interval": 50,
    "quiet": False,
}

@click.command()
@click.option("--input-dir", required=True, type=click.Path(exists=True))
@click.option("--workers", default=None, type=int,
              help="Override BATCH_WORKERS env var or config default.")
@click.option("--checkpoint-dir", default=".checkpoints", show_default=True,
              type=click.Path(), help="Directory for JSON manifest files.")
@click.option("--quiet", is_flag=True, default=False,
              help="Suppress progress rendering; structured logs only.")
def validate_cmd(input_dir: str, workers: int | None, checkpoint_dir: str, quiet: bool) -> None:
    """Validate all GeoPackage files in INPUT_DIR."""
    resolved_workers = workers or int(os.environ.get("BATCH_WORKERS", DEFAULTS["workers"]))
    is_tty = not quiet and os.sys.stdout.isatty()

    checkpoint = CheckpointManager(
        Path(checkpoint_dir) / "validate_manifest.json"
    )

    # Skip already-completed tasks if resuming
    all_files = enumerate_vector_tasks(input_dir)
    pending = [f for f in all_files if not checkpoint.is_done(str(f))]
    click.echo(
        f"Tasks: {len(all_files)} total, {checkpoint.completed_count} already done, "
        f"{len(pending)} pending.",
        err=True,
    )

    if is_tty:
        exit_code = run_vector_validation_with_checkpoint(
            pending, resolved_workers, checkpoint
        )
    else:
        exit_code = run_vector_validation_quiet(pending, resolved_workers, checkpoint)

    raise SystemExit(exit_code)

Environment variables supported:

Variable Default Effect
BATCH_WORKERS 4 Thread/process pool size
NO_COLOR unset Disables ANSI output (honoured by Rich)
BATCH_CHECKPOINT_DIR .checkpoints Directory for manifest files
BATCH_CHECKPOINT_INTERVAL 50 Tasks between checkpoint flushes

Error Handling and Gotchas

GDAL driver lock on corrupted GeoTIFF blocks a thread indefinitely

GDAL’s internal error handling in certain drivers (notably GTiff and JPEG 2000) can block a worker thread for minutes when encountering a truncated file header. The symptom is a progress bar that stops advancing even though other workers are running.

Mitigation: set GDAL_HTTP_TIMEOUT and GDAL_HTTP_MAX_RETRY for remote reads, and wrap rasterio.open() in concurrent.futures.wait() with an explicit timeout:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import rasterio
from pathlib import Path

def open_with_timeout(fp: Path, timeout_s: float = 30.0):
    with ThreadPoolExecutor(max_workers=1) as ex:
        future = ex.submit(rasterio.open, fp)
        done, _ = wait([future], timeout=timeout_s)
        if not done:
            raise TimeoutError(f"rasterio.open timed out after {timeout_s}s: {fp}")
        return future.result()

CRS mismatch silently produces wrong geometry after buffer

geopandas.GeoDataFrame.buffer(0) operates in the coordinate units of the current CRS. Calling it on a GeoDataFrame in geographic coordinates (EPSG:4326) produces buffers in degrees, not metres. Always re-project to a suitable projected CRS before any metric geometry operation:

import pyogrio, geopandas as gpd
from pathlib import Path

def validate_and_reproject(fp: Path, target_epsg: int = 32633) -> gpd.GeoDataFrame:
    gdf: gpd.GeoDataFrame = pyogrio.read_dataframe(fp, use_arrow=True)
    if gdf.crs is None:
        raise ValueError(f"No CRS defined in {fp.name}")
    gdf = gdf.to_crs(epsg=target_epsg)    # EPSG:32633 — UTM zone 33N
    gdf["geometry"] = gdf.geometry.buffer(0)
    return gdf

Progress bar leaves ghost artifacts when a worker raises an uncaught exception

If an unhandled exception escapes the with Progress() context manager, Rich’s live display may not restore the terminal cursor. Always re-raise exceptions after the with Progress() block exits, not from inside it:

# BAD — exception escapes the context manager mid-render
with Progress() as p:
    task = p.add_task("...", total=100)
    raise RuntimeError("oops")   # Rich may leave cursor hidden

# GOOD — collect exceptions and raise after the display closes
errors: list[Exception] = []
with Progress() as p:
    task = p.add_task("...", total=100)
    for future in as_completed(futures):
        try:
            future.result()
        except Exception as exc:
            errors.append(exc)
        p.advance(task)

if errors:
    raise ExceptionGroup("batch errors", errors)

For structured logging patterns that complement progress tracking, see error handling in spatial pipelines, which covers JSON log emission, POSIX exit codes, and retry strategies across synchronous and async execution models.

Verification

After a batch run, confirm the implementation is working correctly at three levels:

1. Exit code check

python validate.py --input-dir ./sample_data --workers 4
echo "Exit: $?"
# Expected: 0 (all passed) or 1 (some failed)

2. Checkpoint manifest inspection

python -c "
import json, pathlib
m = json.loads(pathlib.Path('.checkpoints/validate_manifest.json').read_text())
print(f'Completed: {len(m[\"completed\"])}')
print(f'Failed: {len(m[\"failed\"])}')
print(f'Sample completed: {m[\"completed\"][:3]}')
"

3. Structured log cross-check

# Count ERROR lines in the JSON log and compare to manifest failed count
python -c "
import json, pathlib
lines = pathlib.Path('batch.log').read_text().splitlines()
errors = [json.loads(l) for l in lines if l.strip() and json.loads(l).get('level') == 'ERROR']
print(f'Log errors: {len(errors)}')
"

Expected output on a clean dataset:

Tasks: 500 total, 0 already done, 500 pending.
Validating vectors ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 500/500 0:00:00
Exit: 0
Completed: 500
Failed: 0
Log errors: 0

Performance Notes

Progress instrumentation adds measurable overhead. At high throughput, the choices below make the difference between a negligible tax and a bottleneck:

Pattern Overhead Recommendation
progress.advance() per feature 5–15 µs per call Batch-advance every 1 000 features
Checkpoint flush per task Full JSON serialisation per task Flush every 50–100 tasks or 30 s
Rendering from worker threads ANSI write contention Never; drive UI from main thread only
tqdm in non-TTY context Repeated string formatting Pass disable=not sys.stdout.isatty()

For memory footprint: the SafeCounter above uses ~200 bytes. The CheckpointManager holds a Python set of completed keys in memory — at 1 M tasks with 64-character keys this is roughly 64 MB. For very large task counts, store completed keys in a SQLite table and query with SELECT COUNT(*) instead of holding the full set in a set.

For deeper parallelism considerations, including how to combine thread-pool progress tracking with process-based concurrency, see memory management for large datasets and the discussion of multiprocessing.Manager proxies in chunked vector data reading.

FAQ

Why does updating a Rich progress bar from a worker thread cause garbled output?

Rich’s live display uses an internal lock, but calling advance() from multiple worker threads while the main thread renders causes interleaved ANSI cursor writes. The correct pattern is to advance a plain threading.Lock-guarded counter in workers and call progress.update() from the main thread only, at as_completed() boundaries.

How often should I flush progress state to the checkpoint manifest?

Every 50–100 completed tasks or every 30 seconds, whichever comes first. Use an atomic write: write to a .tmp file first, then os.replace() it into place so a crash never leaves a corrupt manifest. Flushing on every task serialises I/O and negates the throughput benefit of the worker pool.

Can tqdm and Rich be used together in the same pipeline?

Avoid mixing them in the same terminal session — both manipulate ANSI cursor positioning and will produce garbled output. Rich is preferable for multi-task dashboards with ETA, memory, and throughput columns. tqdm integrates well with pandas iteration via tqdm.pandas() and is lighter weight when you need a single bar. Pick one per process.

How do I suppress progress output in CI or when piping to a log aggregator?

Check sys.stdout.isatty() at startup or honour the NO_COLOR environment variable. Pass disable=True to tqdm, or construct rich.console.Console(force_terminal=False). This prevents orphaned ANSI codes in log files and avoids breaking CI log viewers.

What is the overhead of rich.progress on a high-throughput vector pipeline?

Calling progress.advance() on every feature costs roughly 5–15 µs. At 1 M features per second this adds 5–15 s of pure overhead. Advancing in chunks of 1 000 features — progress.advance(1000) after each chunk — reduces the overhead to negligible while keeping ETA rendering accurate to within a second.