Processing 100,000 GeoJSON files with Python asyncio requires decoupling disk I/O from CPU-bound JSON parsing. The bottleneck is rarely raw disk bandwidth — it is uncontrolled concurrency that exhausts file descriptors, triggers memory thrashing, or blocks the event loop during deserialization. A bounded asyncio.Semaphore, a capped asyncio.Queue, and batched output writes keep peak memory under 200 MB while delivering 3–5x throughput over a synchronous script.
This page is part of the Async I/O for Raster Processing guide, which sits inside the broader Spatial Batch Processing & Async Workflows reference.
Prerequisites
| Requirement | Detail |
|---|---|
| Python | 3.10+ (asyncio.TaskGroup available; asyncio.to_thread stable since 3.9) |
aiofiles |
pip install aiofiles — non-blocking file I/O via the default thread pool |
orjson (optional) |
pip install orjson — 2–3x faster JSON parsing for large payloads |
| OS file-descriptor limit | ulimit -n 65536 before running; persistent via /etc/security/limits.conf |
No GDAL installation is needed for pure GeoJSON work. If your pipeline feeds validated features into a rasterio or pyogrio stage, see Chunked Vector Data Reading for downstream integration patterns.
Pipeline Architecture
The three-stage design below keeps every layer independently bounded, so a slow writer never stalls readers and a slow disk never queues unbounded paths in memory.
Unlike Async I/O for Raster Processing — which requires chunked binary streaming and GDAL bindings — vector GeoJSON processing is text-heavy and thrives on pure-Python async I/O with minimal C-extension overhead.
Complete Working Implementation
The script below is self-contained. It discovers all .geojson files under input_dir, validates their RFC 7946 FeatureCollection structure, rounds coordinates to six decimal places (EPSG:4326), and writes merged output in 500-feature batches.
#!/usr/bin/env python3
"""
Async GeoJSON batch processor — handles 100k+ files on a single event loop.
Usage:
python process_geojson.py /data/raw /data/processed
Requirements:
pip install aiofiles
ulimit -n 65536 (before running)
"""
import asyncio
import json
import logging
import sys
from pathlib import Path
from typing import Optional
import aiofiles
from aiofiles.os import makedirs
# ── Tuning knobs ─────────────────────────────────────────────────────────────
MAX_CONCURRENCY = 80 # NVMe: try 150; HDD: try 20; watch `iotop -a`
BATCH_SIZE = 500 # features per output file; tune for downstream tooling
MAX_RETRIES = 3 # exponential back-off on transient I/O errors
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
stream=sys.stderr,
)
log = logging.getLogger(__name__)
async def read_geojson(path: Path, sem: asyncio.Semaphore) -> Optional[dict]:
"""Open and parse a single GeoJSON file, gated by the shared semaphore."""
async with sem: # ← backpressure: blocks when 80 ops in flight
for attempt in range(MAX_RETRIES):
try:
async with aiofiles.open(path, encoding="utf-8") as fh:
raw = await fh.read()
return json.loads(raw) # swap for orjson.loads for ~2x speed
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
log.warning("Parse error %s (attempt %d): %s", path.name, attempt + 1, exc)
return None # unrecoverable; skip file
except OSError as exc:
delay = 0.1 * (2 ** attempt)
log.warning(
"I/O error %s (attempt %d): %s — retrying in %.1fs",
path.name, attempt + 1, exc, delay,
)
await asyncio.sleep(delay)
log.error("Giving up on %s after %d retries", path.name, MAX_RETRIES)
return None
def validate_and_transform(data: dict) -> Optional[dict]:
"""
Validate RFC 7946 FeatureCollection structure and normalise coordinates.
Rounds Point coordinates to 6 decimal places (~0.11 m precision at equator
in EPSG:4326). Extend here for CRS coercion, bbox clipping, or schema
checks specific to your dataset.
"""
if not isinstance(data, dict) or data.get("type") != "FeatureCollection":
return None
for feature in data.get("features", []):
geom = feature.get("geometry") or {}
if geom.get("type") == "Point":
coords = geom.get("coordinates", [])
# coordinates: [longitude, latitude] per RFC 7946 §3.1.2
geom["coordinates"] = [round(c, 6) for c in coords]
return data
async def _flush_batch(output_dir: Path, batch: list, batch_index: int) -> None:
"""Serialise a list of GeoJSON features to a single FeatureCollection file."""
out_path = output_dir / f"batch_{batch_index:06d}.geojson"
payload = json.dumps(
{"type": "FeatureCollection", "features": batch},
ensure_ascii=False,
separators=(",", ":"), # compact JSON; saves ~15% on large batches
)
async with aiofiles.open(out_path, mode="w", encoding="utf-8") as fh:
await fh.write(payload)
log.info("Wrote %d features → %s", len(batch), out_path.name)
async def batch_writer(
output_dir: Path,
results: asyncio.Queue,
batch_size: int = BATCH_SIZE,
) -> None:
"""Drain the results queue and flush to disk in fixed-size batches."""
await makedirs(str(output_dir), exist_ok=True)
pending: list = []
index = 0
while True:
item = await results.get()
if item is None: # sentinel: all workers have finished
if pending:
await _flush_batch(output_dir, pending, index)
break
# Flatten individual features out of their source FeatureCollections
pending.extend(item.get("features", []))
if len(pending) >= batch_size:
await _flush_batch(output_dir, pending[:batch_size], index)
pending = pending[batch_size:]
index += 1
async def worker(
path: Path,
sem: asyncio.Semaphore,
results: asyncio.Queue,
) -> None:
"""Read → validate → enqueue one GeoJSON file."""
data = await read_geojson(path, sem)
if data is None:
return
transformed = validate_and_transform(data)
if transformed is not None:
await results.put(transformed)
async def main(input_dir: Path, output_dir: Path) -> int:
paths = sorted(input_dir.rglob("*.geojson"))
if not paths:
log.error("No .geojson files found under %s", input_dir)
return 2 # POSIX exit code 2 = bad arguments / no input
log.info("Discovered %d files. MAX_CONCURRENCY=%d", len(paths), MAX_CONCURRENCY)
sem = asyncio.Semaphore(MAX_CONCURRENCY)
results = asyncio.Queue(maxsize=BATCH_SIZE * 2) # cap queue depth to 1000 features
writer_task = asyncio.create_task(batch_writer(output_dir, results))
try:
await asyncio.gather(*(worker(p, sem, results) for p in paths))
finally:
await results.put(None) # signal the writer to flush and exit
await writer_task
log.info("Done.")
return 0
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Async GeoJSON batch processor")
parser.add_argument("input_dir", type=Path, help="Directory tree of .geojson files")
parser.add_argument("output_dir", type=Path, help="Destination for merged output files")
args = parser.parse_args()
sys.exit(asyncio.run(main(args.input_dir, args.output_dir)))
Step Annotations
-
asyncio.Semaphore(MAX_CONCURRENCY)— the concurrency gate. Everyaiofiles.open()call is wrapped inasync with sem. When 80 file handles are open, new workers suspend until one closes. On HDDs, lower this to 20; on NVMe SSDs you can raise it to 150. Watchiotop -aand back off ifiowaitexceeds 60%. -
asyncio.Queue(maxsize=BATCH_SIZE * 2)— backpressure on the results side. Without amaxsize, all 100k validated payloads accumulate in RAM before the writer flushes any of them. Capping at 1000 (BATCH_SIZE * 2) means the writer must consume a batch before workers can enqueue more, bounding peak memory to roughly1000 × avg_file_size. -
json.loads(raw)vsorjson.loads(raw). The stdlibjson.loadsreleases the GIL during C-level parsing, so it is safe on the event loop for files up to ~50 KB. If your GeoJSON files are larger (dense LineString or Polygon geometries), swap inorjson.loads— it is 2–3x faster and has the same call signature. For files above 200 KB with heavy validation loops, move parsing toloop.run_in_executor(None, orjson.loads, raw). -
round(c, 6)coordinate normalisation — EPSG:4326 precision. Six decimal places gives ~0.11 m precision at the equator, which exceeds the accuracy of most field GPS devices. If your downstream tool ispyogrioorgeopandas, apply the coordinate transform there instead to avoid double-serialisation overhead. -
pending.extend(item.get("features", []))— flattening FeatureCollections. Source files are each aFeatureCollection. Extendingpendingwith their individual features — rather than nesting collections — produces output files that any GIS tool can ingest directly without an extra unwrap step. -
sys.exit(asyncio.run(main(...)))— POSIX exit codes.mainreturns0on success and2when no input files are found. Any uncaught exception propagates throughasyncio.runand exits with code1. These follow the POSIX convention (0 = OK, 1 = runtime error, 2 = usage / bad input) used throughout Spatial Batch Processing & Async Workflows.
Named Gotcha: OSError 24 — Too Many Open Files
The most common failure when scaling above ~200 concurrent tasks is:
OSError: [Errno 24] Too many open files: '/data/raw/tile_00042.geojson'
Root cause: Linux defaults to 1024 open file descriptors per process. Each aiofiles.open() holds one until the async with block exits.
Fix (two steps, both required):
# 1. Raise the soft limit for the current shell session
ulimit -n 65536
# 2. Verify (should print 65536)
ulimit -n
For persistent configuration add to /etc/security/limits.conf:
* soft nofile 65536
* hard nofile 65536
Then keep MAX_CONCURRENCY to at most half the soft limit (e.g. 32768). In practice, 80–200 concurrent opens is the throughput sweet spot on most storage — raising it further yields diminishing returns while increasing per-process overhead.
Verification
After the script completes, confirm output integrity with:
# Count total output files
ls /data/processed/*.geojson | wc -l
# Spot-check one output file is valid GeoJSON
python3 - <<'EOF'
import json, pathlib, sys
p = sorted(pathlib.Path("/data/processed").glob("*.geojson"))[0]
data = json.loads(p.read_text())
assert data["type"] == "FeatureCollection", "Not a FeatureCollection"
assert isinstance(data["features"], list), "features is not a list"
print(f"{p.name}: {len(data['features'])} features — OK")
EOF
# Cross-check total feature count against input (requires jq)
# Input total:
find /data/raw -name "*.geojson" -exec jq '.features | length' {} \; | awk '{s+=$1} END{print "Input features:", s}'
# Output total:
find /data/processed -name "*.geojson" -exec jq '.features | length' {} \; | awk '{s+=$1} END{print "Output features:", s}'
The two feature counts should match (or differ only by the number of files that failed validation — check stderr for Parse error or Giving up lines to reconcile any gap).
FAQ
Why create one big asyncio.gather call for 100k tasks instead of a worker-pool pattern?
For tasks that are mostly waiting on I/O (not CPU), asyncio.gather over coroutines is efficient because suspended coroutines are cheap (a few hundred bytes each, not OS threads). The asyncio.Semaphore ensures that only MAX_CONCURRENCY tasks are actually holding open file handles at any moment. If you prefer an explicit worker-pool pattern (useful when tasks have heavy CPU phases), replace asyncio.gather with asyncio.Queue-fed workers and asyncio.TaskGroup.
How do I handle GeoJSON files that are actually newline-delimited GeoJSON (GeoJSONSeq / RFC 8142)?
Newline-delimited GeoJSON (.geojsonl or .ndjson) stores one Feature per line, not a FeatureCollection. Replace the json.loads(raw) call with a list comprehension over lines: [json.loads(line) for line in raw.splitlines() if line.strip()]. Then adjust validate_and_transform to accept a list of Feature objects rather than a FeatureCollection dict. For large .geojsonl files, stream line-by-line with async for line in fh to avoid loading the entire file into memory.
Should I use asyncio.to_thread or loop.run_in_executor for JSON parsing?
asyncio.to_thread(json.loads, raw) is the idiomatic Python 3.9+ form — it wraps run_in_executor with the default ThreadPoolExecutor and is slightly more readable. Both are functionally identical. Use either when parsing payloads above ~50 KB or when your validation logic includes regex matching, geometry coordinate walks, or schema checks that keep the CPU busy for more than ~1 ms per file.
Related
- Async I/O for Raster Processing — the parent guide covering event-loop safety, GDAL bridge patterns, and semaphore sizing for binary raster formats
- Chunked Vector Data Reading — how to feed validated GeoJSON features into pyogrio or geopandas in memory-safe chunks
- Error Handling in Spatial Pipelines — structured logging, retry strategies, and exit-code conventions for the kind of pipeline built on this page