Subsection

Async I/O for Raster Processing: CLI Patterns

Raster batch pipelines stall at I/O: a synchronous loop over 500 cloud-optimized GeoTIFFs spends 90 % of its wall time blocked on network round-trips, not on pixel math. The fix is to decouple network and disk waits from GDAL compute using Python’s asyncio event loop — part of the broader Spatial Batch Processing & Async Workflows guide.

TL;DR

Use aiohttp to fetch rasters concurrently, buffer each file to a NamedTemporaryFile, then offload every rasterio call to asyncio.to_thread() so GDAL’s blocking C routines never stall the event loop. A semaphore caps concurrency; a single shared ClientSession reuses TCP connections across the entire batch.

Prerequisites

Requirement Version Notes
Python 3.9+ asyncio.to_thread() added in 3.9
aiohttp ≥ 3.8 async HTTP client
rasterio ≥ 1.3 GDAL-backed raster I/O
click ≥ 8.1 CLI argument parsing
tqdm ≥ 4.65 async-aware progress bars
GDAL ≥ 3.4 compiled with curl, libtiff, libjpeg
pip install aiohttp rasterio click tqdm
python -c "import rasterio; print(rasterio.__gdal_version__())"
ulimit -n 4096   # raise open-file-descriptor limit for concurrent handles

Verify GDAL driver availability for your target format:

import rasterio
print(rasterio.drivers.raster_driver_extensions())  # includes 'tif', 'jp2', 'vrt'

For CLI Subcommand Organization patterns that structure multi-step pipelines into composable commands, see the CLI Architecture guide.

Problem Framing

A naive synchronous pipeline processes each raster sequentially:

# slow: each fetch blocks until complete before the next starts
for url in urls:
    path = download(url)          # ~800 ms network wait
    result = process(path)        # ~120 ms GDAL decompression
    results.append(result)
# wall time ≈ 920 ms × N

With 500 tiles, that is over seven minutes of mostly idle time. The event loop approach cuts wall time to roughly the duration of the slowest batch — typically 15–25 seconds at MAX_CONCURRENT = 15 on a 100 Mbit connection.

The second pain point is memory. Loading entire GeoTIFFs for large mosaics causes RSS to grow linearly with batch size. The solution is windowed reads: GDAL reads only the requested pixel rectangle from disk or over the network range request. For broader coverage of memory-safe strategies see Memory Management for Large Datasets.

Architecture Overview

Async raster pipeline data-flow diagram URL list feeds a semaphore-controlled aiohttp fetch layer, which writes temporary files, then asyncio.to_thread offloads rasterio reads to a thread pool, and results are collected by asyncio.gather. URL list (text file) Semaphore asyncio.Semaphore (limit=15) aiohttp fetch ClientSession · TCP reuse iter_chunked(8192) NamedTemporaryFile suffix=".tif" Thread pool asyncio.to_thread() rasterio.open() + write asyncio.gather() List[Dict] results I/O phase compute phase

The pipeline deliberately separates the I/O phase (all downloads run concurrently behind the semaphore) from the compute phase (GDAL thread-pool workers). Mixing the two in a single coroutine serialises each download-then-process step and eliminates the concurrency benefit.

Step-by-Step Implementation

Step 1 — Concurrency controls and session setup

import asyncio
import tempfile
import os
from pathlib import Path
from typing import Any

import aiohttp

MAX_CONCURRENT = 15          # adjust to bandwidth and remote API quotas


def make_session(concurrency: int) -> aiohttp.ClientSession:
    """Return a shared ClientSession with DNS caching and bounded TCP pool."""
    connector = aiohttp.TCPConnector(
        limit=concurrency,
        limit_per_host=concurrency,
        ttl_dns_cache=300,       # cache DNS for 5 minutes
        ssl=False,               # set True for HTTPS-only remotes
    )
    timeout = aiohttp.ClientTimeout(total=120, connect=10)
    return aiohttp.ClientSession(connector=connector, timeout=timeout)

limit_per_host prevents a single slow remote from consuming the entire pool when the URL list spans multiple hosts (e.g. AWS S3 + a WMS tile server).

Step 2 — Async raster fetch with semaphore

async def fetch_raster_to_temp(
    session: aiohttp.ClientSession,
    url: str,
    semaphore: asyncio.Semaphore,
) -> str:
    """Stream a remote raster to a named temp file; return its path."""
    async with semaphore:
        async with session.get(url) as resp:
            resp.raise_for_status()
            # GDAL needs a real filename with the right extension for driver detection
            suffix = Path(url).suffix or ".tif"
            with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
                async for chunk in resp.content.iter_chunked(65536):
                    tmp.write(chunk)
                return tmp.name

iter_chunked(65536) (64 KB) is a reasonable chunk size: small enough to keep memory bounded per connection, large enough to avoid excessive syscall overhead for files in the 10–500 MB range typical of remote sensing tiles.

Step 3 — Thread-safe rasterio processing

rasterio and GDAL release the Python GIL but do not yield to asyncio. Wrapping calls in asyncio.to_thread() keeps the event loop responsive.

import rasterio
from rasterio.windows import Window
from rasterio.crs import CRS


async def process_raster_async(
    temp_path: str,
    output_dir: Path,
    epsg: int = 4326,
    window: Window = Window(0, 0, 512, 512),
) -> dict[str, Any]:
    """Read a windowed tile, reproject if needed, write output GeoTIFF."""

    def _read_and_write() -> dict[str, Any]:
        target_crs = CRS.from_epsg(epsg)
        with rasterio.open(temp_path) as src:
            # Validate CRS before any pixel work
            if src.crs is None:
                raise ValueError(f"No CRS in {temp_path}; cannot reproject to EPSG:{epsg}")
            meta = {
                "crs": src.crs.to_epsg(),
                "width": src.width,
                "height": src.height,
                "count": src.count,
                "dtype": src.dtypes[0],
            }
            data = src.read(window=window)
            out_transform = src.window_transform(window)

        stem = Path(temp_path).stem
        out_path = output_dir / f"{stem}_w{window.col_off}_{window.row_off}.tif"
        with rasterio.open(
            str(out_path), "w",
            driver="GTiff",
            height=window.height,
            width=window.width,
            count=meta["count"],
            dtype=meta["dtype"],
            crs=target_crs,
            transform=out_transform,
            compress="lzw",
            tiled=True,
            blockxsize=256,
            blockysize=256,
        ) as dst:
            dst.write(data)

        os.unlink(temp_path)   # clean up immediately; do not accumulate temp files
        return {"output": str(out_path), "source_epsg": meta["crs"], "target_epsg": epsg}

    return await asyncio.to_thread(_read_and_write)

Setting tiled=True with 256×256 blocks produces a COG-compatible output that allows downstream HTTP range requests — preserving the same access pattern for the next pipeline stage.

Step 4 — Batch orchestration with separated fetch and process phases

from tqdm.asyncio import tqdm as async_tqdm


async def run_batch(
    urls: list[str],
    output_dir: Path,
    concurrency: int = MAX_CONCURRENT,
    epsg: int = 4326,
) -> list[dict[str, Any]]:
    semaphore = asyncio.Semaphore(concurrency)
    output_dir.mkdir(parents=True, exist_ok=True)

    async with make_session(concurrency) as session:
        # Phase 1: fetch all tiles concurrently
        fetch_tasks = [
            fetch_raster_to_temp(session, url, semaphore) for url in urls
        ]
        temp_paths: list[str] = await async_tqdm.gather(
            *fetch_tasks, desc="Fetching", unit="tile"
        )

        # Phase 2: process concurrently — downloads are complete before this starts
        process_tasks = [
            process_raster_async(p, output_dir, epsg=epsg)
            for p in temp_paths
        ]
        results = await async_tqdm.gather(
            *process_tasks, desc="Processing", unit="tile",
            return_exceptions=True,  # collect errors without aborting the batch
        )

    errors = [r for r in results if isinstance(r, Exception)]
    if errors:
        import logging
        for err in errors:
            logging.error("tile_failed", extra={"error": str(err)})

    return [r for r in results if not isinstance(r, Exception)]

return_exceptions=True on gather is critical for production: a single malformed tile or unreachable URL should not abort the other 499. Failed tiles are logged and excluded from the return value; the caller can inspect and retry them. Error handling in spatial pipelines covers structured retry and dead-letter patterns in more depth.

Step 5 — Click CLI entry point

import json
import sys
import click


@click.command()
@click.argument("url_file", type=click.Path(exists=True, dir_okay=False))
@click.option(
    "--output-dir", "-o",
    default="./output",
    type=click.Path(),
    show_default=True,
    help="Directory for processed windowed tiles.",
)
@click.option(
    "--concurrency", "-c",
    default=15,
    type=click.IntRange(1, 64),
    show_default=True,
    envvar="RASTER_CONCURRENCY",
    help="Max simultaneous network requests.",
)
@click.option(
    "--epsg",
    default=4326,
    type=int,
    show_default=True,
    envvar="RASTER_TARGET_EPSG",
    help="Target CRS for output tiles (EPSG code).",
)
@click.option(
    "--json-log", is_flag=True, default=False,
    help="Emit structured JSON summary to stdout.",
)
def cli(url_file: str, output_dir: str, concurrency: int, epsg: int, json_log: bool):
    """Async CLI: fetch and process GeoTIFFs from URL_FILE concurrently."""
    urls = Path(url_file).read_text().splitlines()
    urls = [u.strip() for u in urls if u.strip()]

    if not urls:
        click.echo("No URLs found in input file.", err=True)
        sys.exit(1)

    click.echo(f"Processing {len(urls)} tiles → {output_dir} (EPSG:{epsg})", err=True)
    results = asyncio.run(
        run_batch(urls, Path(output_dir), concurrency=concurrency, epsg=epsg)
    )

    summary = {"processed": len(results), "failed": len(urls) - len(results)}
    if json_log:
        click.echo(json.dumps(summary))
    else:
        click.echo(
            f"Done: {summary['processed']} tiles written, {summary['failed']} failed.",
            err=True,
        )
    sys.exit(0 if summary["failed"] == 0 else 1)


if __name__ == "__main__":
    cli()

The envvar parameters on --concurrency and --epsg follow the layered configuration pattern: environment variables override defaults, CLI flags override environment variables. This matches the precedence chain documented in Configuration File Management.

Configuration Integration

For persistent settings, drop a raster_pipeline.yaml alongside the script:

# raster_pipeline.yaml
concurrency: 20
epsg: 32633          # UTM zone 33N for European datasets
output_dir: ./tiles

Load it in cli() before processing flags:

import yaml

def load_config(path: Path = Path("raster_pipeline.yaml")) -> dict:
    if path.exists():
        return yaml.safe_load(path.read_text()) or {}
    return {}

Pass values as default_map to Click so YAML values sit between built-in defaults and CLI flags:

ctx.default_map = load_config()

The full YAML config pattern — including environment variable interpolation and schema validation — is covered in Managing YAML Configs for Geospatial CLI Workflows.

Error Handling and Gotchas

CRS is None

Remote tiles from undocumented WMS endpoints or older Landsat archives often lack embedded CRS metadata. rasterio.open() returns src.crs = None rather than raising. Always check before calling .to_epsg().

Fix: raise a domain-specific ValueError inside _read_and_write; the outer gather(return_exceptions=True) captures it without killing the batch.

Block misalignment on windowed writes

If window.height or window.width exceeds the actual raster dimensions, GDAL silently clips the read — the returned array is smaller than expected, causing shape mismatches when calling dst.write(data).

Fix: clamp the window with src.window(*src.bounds) intersected against your requested window before reading.

GDAL driver not available

rasterio.errors.RasterioIOError: No such file or directory on a .jp2 file means your GDAL build lacks OpenJPEG support.

Fix: install gdal-bin with --with-openjpeg or use pip install gdal[jp2openjpeg] wheels. Check at startup:

from rasterio.drivers import raster_driver_extensions
assert "jp2" in raster_driver_extensions(), "GDAL OpenJPEG driver not available"

Memory exhaustion with concurrent large tiles

Each asyncio.to_thread() worker holds the decompressed tile in RAM until dst.write() completes. At 15 concurrent 500 MB tiles, peak RSS exceeds 7 GB.

Fix: reduce concurrency or use rasterio windowed reads with smaller Window dimensions to process tiles in 256×256 chunks. See Memory Management for Large Datasets for producer-consumer queue patterns that bound peak memory independently of batch size.

aiohttp ResourceWarning on shutdown

ResourceWarning: Unclosed client session appears when ClientSession is created outside an async with block or the event loop exits before cleanup.

Fix: always use async with make_session(concurrency) as session: as shown in Step 4. Never store the session as a module-level global.

Verification

Confirm the pipeline output is correct before promoting to production:

# 1. Run on a small test batch and check exit code
echo "https://example.com/tile_001.tif" > /tmp/test_urls.txt
python raster_pipeline.py /tmp/test_urls.txt --output-dir /tmp/out --json-log
# expected stdout: {"processed": 1, "failed": 0}
# expected exit code: 0

# 2. Verify CRS of output tiles
python - <<'EOF'
import rasterio, pathlib
for p in pathlib.Path("/tmp/out").glob("*.tif"):
    with rasterio.open(p) as src:
        print(p.name, src.crs.to_epsg(), src.width, src.height, src.count)
EOF

# 3. Checksum pixel data reproducibility
python - <<'EOF'
import rasterio, hashlib, pathlib
for p in sorted(pathlib.Path("/tmp/out").glob("*.tif")):
    with rasterio.open(p) as src:
        raw = src.read().tobytes()
    print(p.name, hashlib.sha256(raw).hexdigest()[:16])
EOF

Expected: CRS matches --epsg value; pixel SHA-256 is identical across repeated runs (idempotent).

For structured JSON log output, redirect stderr with 2>pipeline.log and inspect with jq '.error // "ok"' pipeline.log. The logging patterns used in Logging Spatial Transformation Results to Structured JSON apply directly here.

Performance Notes

Metric Synchronous baseline Async (15 workers) Notes
Wall time (100 tiles, ~50 MB each) ~820 s ~55–90 s 8–15x speedup typical on 100 Mbit uplink
Peak RSS ~500 MB ~1.2 GB concurrent decompression; reduce window size to control
CPU utilisation ~12 % ~45–60 % thread-pool workers use multiple cores for GDAL decompression
Open file descriptors ~10 ~60–80 ensure ulimit -n ≥ 4096

Tuning guidance:

  • If network utilisation plateaus below bandwidth capacity, increase the semaphore limit in steps of 5 and re-measure.
  • If CPU utilisation hits 100 % before the network saturates, the bottleneck is GDAL decompression. Switch to a ProcessPoolExecutor for compute-heavy transforms and coordinate the two pools using a bounded asyncio queue.
  • For batches of 10,000+ tiles, chunk the URL list into pages of 200–500, process each page with run_batch(), and write a checkpoint file after each page. The checkpoint pattern for interrupted spatial batches is covered in Implementing Checkpointing for Interrupted Spatial Batches.

FAQ

Why does rasterio block the asyncio event loop?

rasterio is a Cython wrapper around GDAL’s C library. GDAL file opens, header reads, and pixel decompression all release the Python GIL — so they do not block threads — but they do not yield to the asyncio event loop. They are blocking OS calls. asyncio.to_thread() moves them to a thread-pool worker so the loop can continue scheduling fetch coroutines.

What semaphore limit should I use for a COG batch?

Start at 10–20 and observe two metrics: network bytes/sec and asyncio event-loop lag. Cloud-optimised GeoTIFFs served via HTTP range requests generate many small requests; the bottleneck is usually outbound bandwidth or the remote host’s per-IP rate limit, not local CPU. If the remote returns 429 Too Many Requests, halve the limit and add jittered exponential backoff.

Can I combine asyncio fetching with multiprocessing for heavy transforms?

Yes. Run asyncio for all I/O phases via run_batch(), collect the list of local temp paths, then pass them to a multiprocessing.Pool for pixel-level transforms outside the event loop. Do not spawn subprocess workers inside a running loop — the interaction between fork() and the event loop is unsafe. See Multiprocessing Geospatial Tasks for pool-based patterns.

How do I handle GDAL driver not available inside an async coroutine?

Catch rasterio.errors.RasterioIOError inside the _read_and_write closure and re-raise as a domain-specific exception. The outer asyncio.gather(return_exceptions=True) collects it without aborting the batch. Log the offending URL with structured JSON so you can identify driver gaps post-run.

Is aiofiles better than NamedTemporaryFile for buffering rasters?

No. GDAL requires a seekable, named file handle with the correct extension for driver detection. aiofiles writes to named files asynchronously, but streaming chunks via aiohttp’s iter_chunked() into a standard NamedTemporaryFile is equivalent in practice — the write is sequential and the bottleneck is network, not the file write itself. aiofiles would add a dependency without measurable benefit here.