Before
ACTIVE partitions (current hour) had zero disk representation. Crash = total intra-day data loss.
ZeptoDB keeps all hot data in memory. Before this work, a process crash lost everything since the last end-of-day HDB flush. For intra-day data — often impossible to re-ingest from upstream feeds — this was the highest product-readiness risk. Here’s how we fixed it.
Before
ACTIVE partitions (current hour) had zero disk representation. Crash = total intra-day data loss.
After
Periodic snapshots every 60s. Crash loses at most one snapshot interval. Recovery is automatic on restart.
| Scenario | Before | After |
|---|---|---|
| Crash, ACTIVE partition | All intra-day data lost | ≤ 60s of data lost |
| Crash, SEALED partition | Lost if not yet flushed | ≤ 60s of data lost |
| Graceful EOD shutdown | Preserved via HDB flush | Same (+ snapshot on stop) |
Normal operation (flush_loop every 1s): ├── do_flush_sealed() [existing: SEALED → HDB on memory pressure] └── if now - last_snap ≥ 60s └── do_snapshot() [new: ALL partitions → snapshot_path]
Restart (ZeptoPipeline::start): ├── [if enable_recovery] HDBReader(recovery_snapshot_path) │ └── for each symbol/hour → read columns → store_tick() × N rows ├── flush_manager->start() └── drain_threads.start() ← recovery completes BEFORE these startThe key insight: HDBWriter already had write_column_file() with LZ4 compression, directory creation, and correct binary format. The only change needed was removing the sealed-state precondition.
size_t snapshot_partition(const Partition& partition, const std::string& snapshot_dir);Writes all columns to:
{snapshot_dir}/{symbol_id}/{hour_epoch}/price.bin{snapshot_dir}/{symbol_id}/{hour_epoch}/volume.bin{snapshot_dir}/{symbol_id}/{hour_epoch}/timestamp.bin{snapshot_dir}/{symbol_id}/{hour_epoch}/msg_type.binFiles are overwritten on each snapshot cycle — idempotent by design. Same LZ4-compressed binary format as regular HDB flush.
New configuration fields:
struct FlushConfig { bool enable_auto_snapshot = false; uint32_t snapshot_interval_ms = 60'000; // 60s default std::string snapshot_path = ""; // ... existing fields unchanged};do_snapshot() iterates all partitions via pm_.get_all_partitions() and calls snapshot_partition() for each non-empty one. The timer check runs inside the existing flush_loop() — no new threads needed.
snapshot_now() provides a synchronous trigger for tests and manual operations.
Recovery runs inside ZeptoPipeline::start(), before drain threads are spawned:
// Simplified recovery logic in start()if (config_.enable_recovery && !config_.recovery_snapshot_path.empty()) { for (auto& symbol_dir : fs::directory_iterator(snapshot_path)) { for (auto& hour_dir : fs::directory_iterator(symbol_dir)) { auto timestamps = HDBReader::read_column<int64_t>(hour_dir / "timestamp.bin"); auto prices = HDBReader::read_column<int64_t>(hour_dir / "price.bin"); auto volumes = HDBReader::read_column<int64_t>(hour_dir / "volume.bin"); auto msg_types = HDBReader::read_column<int32_t>(hour_dir / "msg_type.bin");
for (size_t i = 0; i < timestamps.size(); ++i) { store_tick({timestamps[i], prices[i], volumes[i], ...}); } } }}// THEN start drain threadsThis gives a clean single-threaded replay window — no locks needed. Drain threads see a fully populated partition map when they start.
PipelineConfig config;config.enable_recovery = true;config.recovery_snapshot_path = "/data/zeptodb/snapshots";
FlushConfig flush_config;flush_config.enable_auto_snapshot = true;flush_config.snapshot_interval_ms = 60'000; // 60s RPOflush_config.snapshot_path = "/data/zeptodb/snapshots";Works at all StorageMode levels: PURE_IN_MEMORY, TIERED, and PURE_ON_DISK.
| Decision | Rationale |
|---|---|
| Binary format (not Parquet) | Avoids Arrow/Parquet dependency in PURE_IN_MEMORY mode |
| Overwrite (not versioned) | Simplicity; snapshot is idempotent |
| Single-threaded recovery | No locks needed; drain threads start after recovery |
Reuse HDBReader | Recovery path uses the same reader as HDB queries |
Timer in existing flush_loop | No new threads; piggybacks on 1s check interval |
HDBTest.AutoSnapshot_CreatesFiles PASSED (6 ms) → Insert 100 rows into ACTIVE partition → snapshot_now() → Assert price.bin exists
HDBTest.Recovery_ReloadsData PASSED (6 ms) → Phase 1: Insert 50 rows, snapshot, destroy pipeline → Phase 2: New PURE_IN_MEMORY pipeline with enable_recovery=true → start() → Assert total_stored_rows() == 50Total suite: 605/605 passing.