Chunked Vector Data Reading for Spatial Pipelines
Stream multi-gigabyte vector datasets into a predictable memory footprint using pyogrio offset/limit cursor reads, memory-aware chunk sizing, and incremental GeoParquet output — without ever materializing the full GeoDataFrame.
This page is part of the Spatial Batch Processing & Async Workflows guide.
Prerequisites
- Python 3.9+ — required for
__future__annotations,matchstatements, and stableasyncio.to_thread()semantics - Core libraries:
pyogrio>=0.7.0,geopandas>=1.0.0,pyarrow>=14.0,typer>=0.9.0,psutil>=5.9.0 - System backend: GDAL 3.4+ compiled with Parquet/Arrow support. Verify with
python -c "import pyogrio; print(pyogrio.list_drivers().get('Parquet'))" - Hardware: 8 GB RAM minimum; NVMe storage for shapefile/GeoPackage I/O; 4+ cores for downstream spatial joins
pip install "pyogrio>=0.7" "geopandas>=1.0" "pyarrow>=14" typer psutil
Confirm Arrow and Parquet driver availability before running any batch:
import pyogrio
drivers = pyogrio.list_drivers()
assert drivers.get("Parquet") in ("rw", "r"), (
"GDAL Parquet driver missing — rebuild GDAL with Arrow/Parquet support"
)
print(f"pyogrio {pyogrio.__version__} ready, Parquet driver: {drivers['Parquet']}")
Problem Framing
A standard geopandas.read_file("admin_boundaries_world.gpkg") call materialises the entire dataset into a single GeoDataFrame. At 50 million polygon features from an OpenStreetMap extract, that call allocates 12–40 GB of RAM, thrashes the page cache, and terminates with MemoryError on any commodity server. The failure is not gradual — it is a hard crash at the point of read, with no partial progress and no recovery path.
The same pattern fails for dense IoT telemetry files, continental-scale address datasets, and high-resolution cadastral boundaries. The fix is not “buy more RAM” — it is a cursor-based streaming approach where features are ingested, transformed, and flushed to disk in fixed-size batches, keeping the working set constant regardless of dataset size.
Step-by-Step Implementation
Step 1 — Inspect Metadata Without Loading Features
pyogrio.read_info() returns lightweight metadata (row count, CRS EPSG code, geometry type, bounding box, attribute schema) by querying the OGR layer header. No features are decoded.
import pyogrio
from pathlib import Path
INPUT = Path("data/osm_buildings_europe.gpkg")
info = pyogrio.read_info(str(INPUT))
print(f"Features : {info['features']:,}")
print(f"CRS : {info.get('crs', 'unknown')}")
print(f"Geometry : {info.get('geometry_type', 'unknown')}")
print(f"Fields : {info.get('fields', [])}")
The features key gives the total row count that drives all subsequent chunking arithmetic. If this field returns None (some remote OGR sources), fall back to a test read of 1,000 rows and extrapolate from file size.
Step 2 — Calculate a Memory-Aware Chunk Size
Chunk sizing is not a configuration constant — it is a function of geometry complexity, attribute width, and available RAM. Dense multipolygon features can consume 50× more memory per row than point features with identical schemas.
import psutil
def sample_row_footprint_mb(path: str, sample_rows: int = 5_000) -> float:
"""Measure per-row RSS cost by reading a small sample."""
import os
proc = psutil.Process(os.getpid())
before = proc.memory_info().rss
sample = pyogrio.read_dataframe(path, rows=slice(0, sample_rows), use_arrow=True)
after = proc.memory_info().rss
del sample
return (after - before) / (1024 ** 2) / sample_rows # MB per row
def calculate_chunk_size(
path: str,
total_rows: int,
safety_factor: float = 0.6,
) -> int:
"""Derive chunk size from live memory measurement."""
available_mb = psutil.virtual_memory().available / (1024 ** 2)
row_mb = sample_row_footprint_mb(path)
if row_mb <= 0:
row_mb = 0.001 # fallback: 1 KB/row estimate
safe_rows = int((available_mb * safety_factor) / row_mb)
# Clamp between 5,000 (prevent tiny chunks) and 500,000 (prevent OOM)
return max(5_000, min(safe_rows, 500_000))
Step 3 — Iterate with an Offset/Limit Cursor
pyogrio.read_dataframe() accepts a rows parameter as a Python slice(offset, offset + count). This maps to OGR’s SetNextByIndex / GetNextFeature cursor under the hood, so only the requested window is decoded and transferred into memory.
Setting use_arrow=True bypasses Python object construction entirely — features flow through Apache Arrow columnar buffers before conversion to GeoDataFrame, reducing peak RSS by 30–50% on attribute-heavy datasets.
import pyogrio
from geopandas import GeoDataFrame
def iter_chunks(path: str, chunk_size: int, total_rows: int):
"""Yield GeoDataFrames in offset/limit windows."""
offset = 0
while offset < total_rows:
chunk: GeoDataFrame = pyogrio.read_dataframe(
path,
rows=slice(offset, offset + chunk_size),
use_arrow=True, # Arrow-backed read: 30–50% less RSS
)
if chunk.empty:
break
yield offset, chunk
offset += len(chunk)
Driver compatibility note: The Shapefile (.shp) driver does not support random-seek offsets. If your input is a shapefile, convert it to GeoPackage (EPSG-preserving, single-file) or FlatGeobuf first:
ogr2ogr -f GPKG output.gpkg input.shp
Step 4 — Validate Schema on Every Chunk
Real-world vector datasets sourced from multiple municipalities, data vendors, or sensor streams frequently contain attribute drift — columns added, renamed, or dropped between spatial partitions. Detecting this early prevents silent data loss in downstream queries.
from typing import Optional
import logging
logger = logging.getLogger("chunked_reader")
def validate_chunk_schema(
chunk,
expected_cols: Optional[set],
expected_crs: Optional[str],
) -> tuple[set, str]:
"""Assert column set and CRS stability; return updated expectations."""
actual_cols = set(chunk.columns)
actual_crs = chunk.crs.to_epsg() if chunk.crs else None
if expected_cols is None:
return actual_cols, actual_crs # first chunk sets the baseline
missing = expected_cols - actual_cols
extra = actual_cols - expected_cols
if missing:
logger.warning(f"Schema drift — missing columns: {missing}")
if extra:
logger.info(f"Schema drift — new columns: {extra}")
if actual_crs != expected_crs:
logger.warning(
f"CRS mismatch: expected EPSG:{expected_crs}, got EPSG:{actual_crs} — reprojecting"
)
chunk = chunk.to_crs(epsg=int(expected_crs))
return expected_cols, expected_crs
Step 5 — Write Incrementally to GeoParquet
GeoParquet files are immutable once written — to_parquet() rewrites the entire file on each call, so calling it inside the chunk loop would O(n²) disk I/O. The correct pattern uses pyarrow.parquet.write_to_dataset() to append each chunk as a new row-group file in a partitioned output directory. The full directory reads as a single logical dataset.
import pyarrow.parquet as pq
import pyarrow as pa
def write_chunk_to_dataset(
chunk,
output_dir: str,
partition_cols: list[str] | None = None,
) -> None:
"""Append one chunk to a partitioned GeoParquet dataset."""
table = pa.Table.from_pandas(chunk, preserve_index=False)
pq.write_to_dataset(
table,
root_path=output_dir,
partition_cols=partition_cols or [],
existing_data_behavior="overwrite_or_ignore",
use_legacy_dataset=False,
)
For spatial partitioning (e.g., grouping by country code or UTM zone), add the partition column to the GeoDataFrame before this call:
chunk["country_iso"] = chunk.geometry.apply(
lambda g: reverse_geocode_country(g.centroid) # your lookup here
)
write_chunk_to_dataset(chunk, "output/buildings/", partition_cols=["country_iso"])
Step 6 — Checkpoint the Cursor Offset
Long-running ingestion jobs are interrupted by disk-full errors, OOM kills, and network timeouts. Persisting the last successfully written offset to a .state file enables deterministic resume without reprocessing completed chunks.
import json
from pathlib import Path
STATE_FILE = Path(".ingest_state.json")
def load_checkpoint() -> int:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text()).get("offset", 0)
return 0
def save_checkpoint(offset: int) -> None:
STATE_FILE.write_text(json.dumps({"offset": offset}))
def clear_checkpoint() -> None:
STATE_FILE.unlink(missing_ok=True)
Configuration Integration
Chunked ingestion plugs directly into a layered config stack. Defaults live in code, a YAML config overrides them, environment variables override the YAML, and CLI flags take final precedence. This mirrors the pattern described in Configuration File Management for Geospatial CLI Workflows.
import os
import yaml
from pathlib import Path
DEFAULTS = {
"chunk_size": None, # None → calculate from memory
"max_memory_mb": 4096.0,
"output_format": "geoparquet",
"log_level": "INFO",
}
def load_config(config_path: Path | None) -> dict:
cfg = dict(DEFAULTS)
if config_path and config_path.exists():
with config_path.open() as f:
cfg.update(yaml.safe_load(f) or {})
# Environment variable overrides (INGEST_ prefix)
for key in cfg:
env_key = f"INGEST_{key.upper()}"
if env_key in os.environ:
cfg[key] = type(cfg[key])(os.environ[env_key]) if cfg[key] is not None else os.environ[env_key]
return cfg
The CLI flag layer (implemented with Argument Parsing with Typer) then overrides any value from the YAML/env layer when the user explicitly provides it.
Data-Flow Diagram
The diagram below shows the full pipeline: metadata inspection feeds chunk sizing, the offset cursor iterates chunks through schema validation and atomic writes, with the checkpoint file enabling resume on failure.
Full Production CLI
The following typer CLI assembles all six steps into a production-ready ingestion command with structured logging, POSIX exit codes, and resume-from-checkpoint support.
from __future__ import annotations
import json
import logging
import sys
import time
from pathlib import Path
from typing import Optional
import psutil
import pyarrow as pa
import pyarrow.parquet as pq
import pyogrio
import typer
from geopandas import GeoDataFrame
app = typer.Typer(add_completion=False)
logger = logging.getLogger("chunked_reader")
# ── Checkpointing ─────────────────────────────────────────────────────────────
def _state_path(output_dir: Path) -> Path:
return output_dir.parent / f".{output_dir.name}_state.json"
def _load_checkpoint(output_dir: Path) -> int:
p = _state_path(output_dir)
return json.loads(p.read_text()).get("offset", 0) if p.exists() else 0
def _save_checkpoint(output_dir: Path, offset: int) -> None:
_state_path(output_dir).write_text(json.dumps({"offset": offset}))
def _clear_checkpoint(output_dir: Path) -> None:
_state_path(output_dir).unlink(missing_ok=True)
# ── Memory helpers ─────────────────────────────────────────────────────────────
def _rss_mb() -> float:
return psutil.Process().memory_info().rss / (1024 ** 2)
def _sample_row_footprint_mb(path: str, n: int = 5_000) -> float:
before = _rss_mb()
s = pyogrio.read_dataframe(path, rows=slice(0, n), use_arrow=True)
after = _rss_mb()
del s
return max((after - before) / n, 1e-6)
def _auto_chunk_size(path: str, safety: float = 0.60) -> int:
avail_mb = psutil.virtual_memory().available / (1024 ** 2)
row_mb = _sample_row_footprint_mb(path)
return max(5_000, min(int(avail_mb * safety / row_mb), 500_000))
# ── Schema / CRS validation ────────────────────────────────────────────────────
def _validate(
chunk: GeoDataFrame,
expected_cols: Optional[set],
expected_epsg: Optional[int],
) -> tuple[GeoDataFrame, set, Optional[int]]:
cols = set(chunk.columns)
epsg = chunk.crs.to_epsg() if chunk.crs else None
if expected_cols is None:
return chunk, cols, epsg
missing = expected_cols - cols
if missing:
logger.warning("Schema drift — missing columns: %s", missing)
if epsg != expected_epsg and expected_epsg is not None:
logger.warning("CRS mismatch EPSG:%s → re-projecting to EPSG:%s", epsg, expected_epsg)
chunk = chunk.to_crs(epsg=expected_epsg)
return chunk, expected_cols, expected_epsg
# ── Incremental GeoParquet write ───────────────────────────────────────────────
def _write_chunk(chunk: GeoDataFrame, output_dir: str) -> None:
table = pa.Table.from_pandas(chunk, preserve_index=False)
pq.write_to_dataset(
table,
root_path=output_dir,
existing_data_behavior="overwrite_or_ignore",
use_legacy_dataset=False,
)
# ── CLI command ────────────────────────────────────────────────────────────────
@app.command()
def ingest(
input_path: Path = typer.Argument(..., exists=True, dir_okay=False, help="Source vector file (.gpkg, .fgb, .geojson)"),
output_dir: Path = typer.Argument(..., file_okay=False, help="Output GeoParquet dataset directory"),
chunk_size: Optional[int] = typer.Option(None, "--chunk-size", "-c", help="Rows per batch (auto if omitted)"),
resume: bool = typer.Option(False, "--resume", "-r", help="Resume from last checkpoint"),
) -> None:
"""Stream a large vector file to GeoParquet in memory-bounded chunks."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
datefmt="%H:%M:%S",
)
output_dir.mkdir(parents=True, exist_ok=True)
# Step 1 — metadata
try:
info = pyogrio.read_info(str(input_path))
except Exception as exc:
logger.error("Cannot read metadata from %s: %s", input_path, exc)
raise typer.Exit(code=1)
total_rows: int = info.get("features") or 0
if total_rows == 0:
logger.error("Dataset has 0 features or row count is unavailable.")
raise typer.Exit(code=1)
logger.info("Features: %s | CRS: %s", f"{total_rows:,}", info.get("crs", "unknown"))
# Step 2 — chunk size
effective_chunk = chunk_size or _auto_chunk_size(str(input_path))
logger.info("Chunk size: %s rows", f"{effective_chunk:,}")
# Step 6 — resume from checkpoint
start_offset = _load_checkpoint(output_dir) if resume else 0
if start_offset:
logger.info("Resuming from offset %s", f"{start_offset:,}")
offset = start_offset
chunks_done = 0
expected_cols: Optional[set] = None
expected_epsg: Optional[int] = None
t0 = time.perf_counter()
while offset < total_rows:
rss_before = _rss_mb()
# Step 3 — cursor read
try:
chunk: GeoDataFrame = pyogrio.read_dataframe(
str(input_path),
rows=slice(offset, offset + effective_chunk),
use_arrow=True,
)
except Exception as exc:
logger.error("Read failed at offset %s: %s", offset, exc)
raise typer.Exit(code=1)
if chunk.empty:
break
# Step 4 — schema validation
chunk, expected_cols, expected_epsg = _validate(chunk, expected_cols, expected_epsg)
# Step 5 — incremental write
try:
_write_chunk(chunk, str(output_dir))
except Exception as exc:
logger.error("Write failed at offset %s: %s", offset, exc)
raise typer.Exit(code=1)
offset += len(chunk)
chunks_done += 1
_save_checkpoint(output_dir, offset) # persist before next iteration
rss_delta = _rss_mb() - rss_before
pct = 100 * offset / total_rows
logger.info(
"Chunk %d | %s/%s rows (%.1f%%) | ΔRSS %+.1f MB",
chunks_done, f"{offset:,}", f"{total_rows:,}", pct, rss_delta,
)
elapsed = time.perf_counter() - t0
logger.info("Done. %d chunks in %.2fs → %s", chunks_done, elapsed, output_dir)
_clear_checkpoint(output_dir)
raise typer.Exit(code=0)
if __name__ == "__main__":
app()
Error Handling & Gotchas
Shapefile driver does not support random seek
The Shapefile OGR driver lacks offset-based cursor positioning. Calling read_dataframe("file.shp", rows=slice(50000, 100000)) silently reads from the start, returning the wrong rows. Fix: convert to GeoPackage or FlatGeobuf before chunking:
ogr2ogr -f FlatGeobuf output.fgb input.shp -progress
FlatGeobuf is preferred for pure streaming (no spatial index overhead on write); GeoPackage is preferred when you need attribute indexes on the output.
GeoDataFrame.to_parquet() inside a loop is O(n²)
Each to_parquet() call rewrites the entire Parquet file from scratch. On 500 chunks of 100,000 rows each, this is 50 million rows of rewrite work. Fix: use pyarrow.parquet.write_to_dataset() as shown in Step 5 — it appends one row-group file per chunk call, keeping each write O(chunk_size).
CRS not persisted across chunks in some formats
GeoJSON files do not store CRS in the OGR layer header — read_info() returns None for CRS, and read_dataframe() returns a GeoDataFrame with crs=None. Fix: capture the CRS from the first chunk, assert it on all subsequent chunks, and explicitly assign chunk = chunk.set_crs(epsg=4326) when processing GeoJSON.
Memory growth from Arrow object caching
pyarrow caches schema metadata across reads in some versions. If RSS grows monotonically across chunks even with del chunk, call pa.default_cpu_memory_pool().release_unused() after each write to return unused Arrow buffers to the OS.
GDAL virtual file system for remote sources
For S3 or HTTP-hosted vector files, prefix paths with /vsicurl/ or /vsis3/. Set GDAL_HTTP_MAX_RETRY=3 and GDAL_HTTP_RETRY_DELAY=2 as environment variables before importing pyogrio to add automatic retry on transient network failures.
Verification
After a successful ingestion run, verify the output with pyarrow and geopandas:
import pyarrow.parquet as pq
import geopandas as gpd
# Check row count matches the source
ds = pq.ParquetDataset("output/buildings/")
total_written = ds.read().num_rows
print(f"Rows written: {total_written:,}")
assert total_written == expected_total, f"Row count mismatch: {total_written} vs {expected_total}"
# Spot-check geometry validity on the last partition file
files = sorted(ds.files)
last_chunk = gpd.read_parquet(files[-1])
invalid = (~last_chunk.geometry.is_valid).sum()
print(f"Invalid geometries in last chunk: {invalid}")
assert invalid == 0, "Geometry validation failed — check source data"
# Confirm CRS is preserved
print(f"Output CRS: {last_chunk.crs}")
Check that the CLI exits with code 0 on success:
python ingest_cli.py data/osm_buildings_europe.gpkg output/buildings/
echo "Exit code: $?" # expected: 0
For CI pipelines, assert the checkpoint file is removed on clean completion:
test ! -f output/.buildings_state.json && echo "Checkpoint cleaned up — OK"
Performance Notes
| Scenario | Chunk size | Typical peak RSS | Throughput |
|---|---|---|---|
| Point features, few attributes | 200,000 rows | ~400 MB | ~600,000 rows/s |
| Polygon buildings, 15 attributes | 50,000 rows | ~800 MB | ~80,000 rows/s |
| Dense multipolygon admin boundaries | 10,000 rows | ~1.2 GB | ~18,000 rows/s |
| GeoJSON (no Arrow path) | 20,000 rows | ~1.8 GB | ~12,000 rows/s |
The use_arrow=True flag accounts for the 30–50% RSS reduction in the polygon and multipolygon rows above. Enabling it is essentially free — there is no throughput penalty on pyogrio 0.7+.
For CPU-bound downstream work (spatial joins, coordinate reprojection, geometry simplification), decouple ingestion from compute: the ingestion loop writes raw chunks to a queue directory, and a pool of worker processes consumes them independently. This pattern is detailed in Multiprocessing Geospatial Tasks.
For raster counterparts — where the cursor concept maps to window-based tile reads rather than row offsets — see Async I/O for Raster Processing. The Memory Management for Large Datasets page covers cross-format strategies for keeping RSS bounded across both vector and raster pipelines.
If the ingestion job fails mid-run, the error handling strategies in Error Handling in Spatial Pipelines cover retry semantics, structured failure logging, and partial-result recovery.
FAQ
How do I choose chunk size for a large shapefile?
Convert the shapefile to GeoPackage first (see the “Shapefile driver” gotcha above), then let _auto_chunk_size() sample 5,000 rows and measure the RSS delta. For point datasets you will typically land at 150,000–300,000 rows/chunk; for dense polygon datasets, expect 5,000–25,000. Never set a hard 100,000-row default without profiling — it will OOM on multipolygon datasets.
Can I write chunked output to a single GeoParquet file?
No. Parquet files are immutable once written — appending is not possible at the file level. Use pyarrow.parquet.write_to_dataset() to write each chunk as a separate row-group file in a directory. geopandas.read_parquet("output/buildings/") and pyarrow.parquet.ParquetDataset("output/buildings/") both treat the directory as a single logical table.
Why does my RSS keep growing even though I delete each chunk?
Arrow caches memory pool allocations internally. After del chunk, call pyarrow.default_cpu_memory_pool().release_unused() to return unused Arrow buffers to the OS. Also check for references held by logging formatters or exception tracebacks — Python will not GC an object while any reference to it is live.
What POSIX exit codes should a chunked ingestion CLI return?
Follow the convention established in the Spatial Batch Processing & Async Workflows guide: 0 for success, 1 for runtime error (schema drift, write failure, CRS mismatch), 2 for bad arguments (missing path, invalid chunk size), 3 for partial completion (checkpoint exists but run did not finish cleanly).
How do I stream from S3 without downloading the full file?
Prefix the path with /vsis3/bucket/key and set AWS_NO_SIGN_REQUEST=YES for public buckets (or configure standard AWS credential env vars for private ones). GDAL’s /vsis3/ virtual filesystem streams the file over HTTP range requests, so pyogrio’s offset/limit cursor will issue byte-range reads rather than downloading the entire file. Set GDAL_HTTP_MAX_RETRY=3 to handle transient S3 throttling.
Related
- Spatial Batch Processing & Async Workflows — parent guide covering the full pipeline from ingestion to output
- Memory Management for Large Datasets — RSS budgeting, swap avoidance, and cross-format memory strategies
- Multiprocessing Geospatial Tasks — offloading CPU-bound spatial joins and reprojections to worker pools
- Error Handling in Spatial Pipelines — retry logic, structured failure logs, and partial-result recovery