Skip to content

Data Durability: Intra-Day Snapshots and Crash Recovery

ZeptoDB keeps all hot data in memory. Before this work, a process crash lost everything since the last end-of-day HDB flush. For intra-day data — often impossible to re-ingest from upstream feeds — this was the highest product-readiness risk. Here’s how we fixed it.


Before

ACTIVE partitions (current hour) had zero disk representation. Crash = total intra-day data loss.

After

Periodic snapshots every 60s. Crash loses at most one snapshot interval. Recovery is automatic on restart.

ScenarioBeforeAfter
Crash, ACTIVE partitionAll intra-day data lost≤ 60s of data lost
Crash, SEALED partitionLost if not yet flushed≤ 60s of data lost
Graceful EOD shutdownPreserved via HDB flushSame (+ snapshot on stop)

Normal operation (flush_loop every 1s):
├── do_flush_sealed() [existing: SEALED → HDB on memory pressure]
└── if now - last_snap ≥ 60s
└── do_snapshot() [new: ALL partitions → snapshot_path]
Restart (ZeptoPipeline::start):
├── [if enable_recovery] HDBReader(recovery_snapshot_path)
│ └── for each symbol/hour → read columns → store_tick() × N rows
├── flush_manager->start()
└── drain_threads.start() ← recovery completes BEFORE these start

The key insight: HDBWriter already had write_column_file() with LZ4 compression, directory creation, and correct binary format. The only change needed was removing the sealed-state precondition.

include/zeptodb/storage/hdb_writer.h
size_t snapshot_partition(const Partition& partition,
const std::string& snapshot_dir);

Writes all columns to:

{snapshot_dir}/{symbol_id}/{hour_epoch}/price.bin
{snapshot_dir}/{symbol_id}/{hour_epoch}/volume.bin
{snapshot_dir}/{symbol_id}/{hour_epoch}/timestamp.bin
{snapshot_dir}/{symbol_id}/{hour_epoch}/msg_type.bin

Files are overwritten on each snapshot cycle — idempotent by design. Same LZ4-compressed binary format as regular HDB flush.


New configuration fields:

struct FlushConfig {
bool enable_auto_snapshot = false;
uint32_t snapshot_interval_ms = 60'000; // 60s default
std::string snapshot_path = "";
// ... existing fields unchanged
};

do_snapshot() iterates all partitions via pm_.get_all_partitions() and calls snapshot_partition() for each non-empty one. The timer check runs inside the existing flush_loop() — no new threads needed.

snapshot_now() provides a synchronous trigger for tests and manual operations.


Recovery runs inside ZeptoPipeline::start(), before drain threads are spawned:

// Simplified recovery logic in start()
if (config_.enable_recovery && !config_.recovery_snapshot_path.empty()) {
for (auto& symbol_dir : fs::directory_iterator(snapshot_path)) {
for (auto& hour_dir : fs::directory_iterator(symbol_dir)) {
auto timestamps = HDBReader::read_column<int64_t>(hour_dir / "timestamp.bin");
auto prices = HDBReader::read_column<int64_t>(hour_dir / "price.bin");
auto volumes = HDBReader::read_column<int64_t>(hour_dir / "volume.bin");
auto msg_types = HDBReader::read_column<int32_t>(hour_dir / "msg_type.bin");
for (size_t i = 0; i < timestamps.size(); ++i) {
store_tick({timestamps[i], prices[i], volumes[i], ...});
}
}
}
}
// THEN start drain threads

This gives a clean single-threaded replay window — no locks needed. Drain threads see a fully populated partition map when they start.


PipelineConfig config;
config.enable_recovery = true;
config.recovery_snapshot_path = "/data/zeptodb/snapshots";
FlushConfig flush_config;
flush_config.enable_auto_snapshot = true;
flush_config.snapshot_interval_ms = 60'000; // 60s RPO
flush_config.snapshot_path = "/data/zeptodb/snapshots";

Works at all StorageMode levels: PURE_IN_MEMORY, TIERED, and PURE_ON_DISK.


DecisionRationale
Binary format (not Parquet)Avoids Arrow/Parquet dependency in PURE_IN_MEMORY mode
Overwrite (not versioned)Simplicity; snapshot is idempotent
Single-threaded recoveryNo locks needed; drain threads start after recovery
Reuse HDBReaderRecovery path uses the same reader as HDB queries
Timer in existing flush_loopNo new threads; piggybacks on 1s check interval

HDBTest.AutoSnapshot_CreatesFiles PASSED (6 ms)
→ Insert 100 rows into ACTIVE partition
→ snapshot_now()
→ Assert price.bin exists
HDBTest.Recovery_ReloadsData PASSED (6 ms)
→ Phase 1: Insert 50 rows, snapshot, destroy pipeline
→ Phase 2: New PURE_IN_MEMORY pipeline with enable_recovery=true
→ start() → Assert total_stored_rows() == 50

Total suite: 605/605 passing.