To implement checkpointing for interrupted spatial batches, persist a lightweight, atomic state file that maps each spatial asset (file path, feature ID, or tile coordinate) to a completion flag, and wrap your processing loop in a signal-aware try/except block that flushes the state after every successful chunk. On restart, deserialize the state, filter out completed items, and resume from the exact offset. This eliminates redundant I/O, prevents partial writes in spatial formats like GeoPackage or Shapefile, and integrates cleanly with CLI toolchains.
Spatial workloads are inherently I/O heavy and frequently interrupted by OOM kills, network drops, or manual SIGINT. Without a deterministic resume mechanism, you waste compute cycles and risk corrupting spatial indexes or leaving orphaned .cpg/.shx sidecars. Effective checkpointing sits at the core of Spatial Batch Processing & Async Workflows and becomes production-ready when paired with structured Progress Tracking in Batch Jobs. Below is a battle-tested pattern for Python GIS CLI tools.
Core Implementation Pattern
The following implementation uses JSON for human-readable state inspection, atomic file replacement to prevent corruption on crash, and POSIX signal handlers for graceful interruption. It assumes a directory of GeoPackage files but adapts trivially to raster tiles, vector features, or database cursors.
import json
import os
import sys
import signal
import logging
from pathlib import Path
from typing import Dict, List, Set
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
CHECKPOINT_FILE = "spatial_batch_state.json"
class SpatialCheckpoint:
def __init__(self, path: str = CHECKPOINT_FILE):
self.path = Path(path)
self.state: Dict[str, bool] = self._load()
self.interrupted = False
# Register graceful shutdown handlers
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _load(self) -> Dict[str, bool]:
if not self.path.exists():
return {}
try:
with open(self.path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning("Corrupted checkpoint file. Starting fresh.")
return {}
def _save(self) -> None:
# Atomic write: write to .tmp then replace to prevent partial JSON on crash
tmp = self.path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(self.state, f, indent=2)
# os.replace guarantees atomicity across POSIX and Windows
os.replace(str(tmp), str(self.path))
def _handle_signal(self, signum: int, frame) -> None:
logger.info(f"Signal {signum} received. Flushing checkpoint...")
self.interrupted = True
self._save()
def mark_complete(self, asset_id: str) -> None:
self.state[asset_id] = True
# Flush immediately for safety; batch flushing is an option for high-throughput loops
self._save()
def get_pending(self, all_assets: List[str]) -> List[str]:
"""Filter out already-processed assets."""
return [a for a in all_assets if not self.state.get(a, False)]
Why This Works
- Atomic Persistence: Writing to a
.tmpfile and callingos.replace()prevents half-written JSON if the process crashes mid-flush. See Python’s os.replace documentation for cross-platform guarantees. - Signal Awareness: Registering
SIGINTandSIGTERMallowsCtrl+Cor orchestrator shutdowns to trigger a clean flush before exit. - Idempotent State: The
Dict[str, bool]structure is append-only. Re-processing a completed asset is a no-op, making the system safe for retries.
Integrating with the Processing Loop
The checkpoint class is only useful when tightly coupled to your execution loop. Below is a minimal, production-ready pattern that resumes exactly where it left off.
def process_spatial_batch(asset_paths: List[str]) -> None:
checkpoint = SpatialCheckpoint()
pending = checkpoint.get_pending(asset_paths)
logger.info(f"Resuming batch: {len(pending)} assets remaining out of {len(asset_paths)}")
for asset in pending:
if checkpoint.interrupted:
logger.info("Graceful shutdown requested. Exiting loop.")
break
try:
# Replace with your actual spatial processing logic (e.g., GDAL, GeoPandas, rasterio)
# _process_geopackage(asset)
logger.info(f"Processing: {asset}")
# Mark complete only after successful execution
checkpoint.mark_complete(asset)
except Exception as e:
logger.error(f"Failed {asset}: {e}")
# Optional: move to a dead-letter queue or skip
continue
if not checkpoint.interrupted:
logger.info("Batch completed successfully.")
Key Design Decisions
- Flush Frequency: Flushing after every asset guarantees zero rework on crash but adds I/O overhead. For high-throughput pipelines, batch flushes every
Nitems or use a write-ahead log (WAL) pattern. - Asset Identification: Use stable identifiers (absolute paths, UUIDs, or tile coordinates). Avoid relative paths or ephemeral database cursors.
- Error Isolation: Catch exceptions per asset. A single malformed GeoPackage shouldn’t abort the entire batch.
Spatial Format & Corruption Safeguards
Spatial file formats behave differently under interruption, and checkpointing must account for their internal structures:
- GeoPackage (SQLite-backed): Supports explicit transactions. Wrap your write logic in
BEGIN TRANSACTION/COMMITto ensure atomic feature insertion. If interrupted mid-transaction, SQLite rolls back cleanly, but your checkpoint must only mark the asset complete afterCOMMITsucceeds. Refer to the GDAL GeoPackage driver documentation for transaction best practices. - Shapefiles: Not transactional. Interrupted writes leave orphaned
.shp,.shx, or.dbfsidecars. Always write to a temporary directory, validate geometry, then move the complete trio to the target path before marking the checkpoint. - Cloud-Optimized Formats (GeoParquet, Zarr, Cloud-Optimized GeoTIFF): Designed for append/partial reads. Checkpoint at the chunk or tile level rather than the file level to maximize parallelism.
Production Hardening & Async Integration
When scaling beyond single-node CLI tools, adapt the pattern for distributed environments:
- State Storage: Replace local JSON with Redis, DynamoDB, or PostgreSQL. Use
SETNXorINSERT ... ON CONFLICT DO NOTHINGto prevent race conditions across workers. - Lease-Based Locking: Assign a time-bound lease per asset to prevent duplicate processing if a worker crashes after claiming a task but before updating state.
- Async/Queue Integration: Pair this checkpoint pattern with Celery, RQ, or AWS Step Functions. The checkpoint file becomes your source of truth for job reconciliation, while the queue handles scheduling and retries.
For teams building resilient pipelines, this deterministic resume logic forms the foundation of reliable Spatial Batch Processing & Async Workflows. When combined with structured metrics (duration per chunk, error rates, bytes processed), it enables accurate Progress Tracking in Batch Jobs without sacrificing throughput or risking data corruption.
Quick Validation Checklist
os.replace)- after successful write/commit
Implementing checkpointing for interrupted spatial batches transforms fragile, all-or-nothing scripts into resilient, production-grade data pipelines. Start with the atomic JSON pattern above, validate against your target format’s transaction model, and scale to distributed state stores as your workload grows.