ZeptoDB C++ API Reference
Last updated: 2026-03-22
Table of Contents
Section titled “Table of Contents”Quick Start
Section titled “Quick Start”Complete example: ingest ticks, run SQL, read raw columns
Section titled “Complete example: ingest ticks, run SQL, read raw columns”#include "zeptodb/core/pipeline.h"#include "zeptodb/sql/executor.h"#include "zeptodb/common/types.h"#include <iostream>
int main() { using namespace zeptodb::core; using namespace zeptodb::sql;
// 1. Create and start pipeline (pure in-memory) ZeptoPipeline pipeline; pipeline.start();
// 2. Ingest ticks for (int i = 0; i < 1000; ++i) { TickMessage msg; msg.symbol_id = 1; msg.price = 15000 + i * 10; // 15000, 15010, ..., 24990 msg.volume = 100 + i; msg.recv_ts = now_ns(); pipeline.ingest_tick(msg); } pipeline.drain_sync(); // flush to column store synchronously
// 3. Direct query (C++ API) auto r = pipeline.query_vwap(1); std::cout << "VWAP: " << r.value << " rows_scanned: " << r.rows_scanned << "\n";
// 4. SQL query QueryExecutor exec{pipeline}; exec.enable_parallel();
auto result = exec.execute( "SELECT count(*), sum(volume), avg(price), vwap(price, volume) " "FROM trades WHERE symbol = 1" );
if (!result.ok()) { std::cerr << "Error: " << result.error << "\n"; return 1; } for (size_t i = 0; i < result.column_names.size(); ++i) { std::cout << result.column_names[i] << " = " << result.rows[0][i] << "\n"; } std::cout << "Execution: " << result.execution_time_us << " μs\n";
// 5. Zero-copy raw column access auto& pm = pipeline.partition_manager(); auto parts = pm.get_partitions(1); for (auto* part : parts) { const int64_t* prices = part->get_column("price"); size_t n = part->row_count(); std::cout << "Partition: " << n << " rows, " << "first price = " << prices[0] << "\n"; }
pipeline.stop(); return 0;}5-minute bar aggregation via SQL
Section titled “5-minute bar aggregation via SQL”auto result = exec.execute(R"sql( SELECT xbar(timestamp, 300000000000) AS bar, first(price) AS open, max(price) AS high, min(price) AS low, last(price) AS close, sum(volume) AS volume FROM trades WHERE symbol = 1 GROUP BY xbar(timestamp, 300000000000) ORDER BY bar ASC)sql");
for (const auto& row : result.rows) { int64_t bar = row[0]; int64_t open = row[1]; int64_t high = row[2]; int64_t low = row[3]; int64_t close = row[4]; int64_t volume = row[5]; std::cout << bar << " O=" << open << " H=" << high << " L=" << low << " C=" << close << " V=" << volume << "\n";}Time-range query with partition pruning
Section titled “Time-range query with partition pruning”int64_t from_ns = now_ns() - 3600LL * 1'000'000'000LL; // last 1 hourint64_t to_ns = now_ns();
auto result = exec.execute( "SELECT vwap(price, volume), count(*) FROM trades " "WHERE symbol = 1 AND timestamp BETWEEN " + std::to_string(from_ns) + " AND " + std::to_string(to_ns));ZeptoPipeline
Section titled “ZeptoPipeline”#include "zeptodb/core/pipeline.h" — Namespace: zeptodb::core
The top-level end-to-end pipeline: tick ingestion → column store → query execution.
Construction
Section titled “Construction”#include "zeptodb/core/pipeline.h"using namespace zeptodb::core;
// Default config (pure in-memory, 32 MB arena per partition)ZeptoPipeline pipeline;
// Custom configPipelineConfig cfg;cfg.arena_size_per_partition = 64ULL * 1024 * 1024; // 64 MBcfg.drain_batch_size = 512;cfg.drain_sleep_us = 5;cfg.storage_mode = StorageMode::TIERED;cfg.hdb_base_path = "/data/zepto_hdb";ZeptoPipeline pipeline{cfg};Lifecycle
Section titled “Lifecycle”pipeline.start(); // start background drain threadpipeline.stop(); // flush queue + stop drain thread
// Sync drain — useful in tests without background threadsize_t drained = pipeline.drain_sync();size_t drained = pipeline.drain_sync(/*max_items=*/1000);PipelineConfig fields
Section titled “PipelineConfig fields”struct PipelineConfig { size_t arena_size_per_partition = 32ULL * 1024 * 1024; // 32 MB size_t drain_batch_size = 256; uint32_t drain_sleep_us = 10; StorageMode storage_mode = StorageMode::PURE_IN_MEMORY; std::string hdb_base_path = "/tmp/zepto_hdb"; FlushConfig flush_config{}; // tiered mode HDB flush settings};StorageMode
Section titled “StorageMode”enum class StorageMode : uint8_t { PURE_IN_MEMORY = 0, // HFT: no HDB, maximum latency TIERED = 1, // RDB (today) + HDB (historical) hybrid PURE_ON_DISK = 2, // Backtesting: HDB only};Ingest
Section titled “Ingest”#include "zeptodb/common/types.h"
TickMessage msg;msg.symbol_id = 1;msg.price = 15000; // scaled integermsg.volume = 100;msg.recv_ts = now_ns(); // nanosecond timestamp
// Lock-free, thread-safe — returns false if ring buffer is fullbool ok = pipeline.ingest_tick(msg);Direct queries
Section titled “Direct queries”// VWAP (all time)QueryResult r = pipeline.query_vwap(symbol_id);if (r.ok()) double vwap = r.value;
// VWAP (time range)QueryResult r = pipeline.query_vwap(symbol_id, from_ns, to_ns);
// Row countQueryResult r = pipeline.query_count(symbol_id);int64_t count = r.ivalue;
// Filter + sum: sum(col) WHERE col > thresholdQueryResult r = pipeline.query_filter_sum(symbol_id, "volume", 100);
// Total rows stored across all partitionssize_t total = pipeline.total_stored_rows();QueryResult
Section titled “QueryResult”struct QueryResult { enum class Type : uint8_t { VWAP, SUM, COUNT, ERROR };
Type type = Type::ERROR; double value = 0.0; // VWAP, AVG int64_t ivalue = 0; // COUNT, SUM size_t rows_scanned = 0; int64_t latency_ns = 0; std::string error_msg;
bool ok() const { return type != Type::ERROR; }};Statistics
Section titled “Statistics”const PipelineStats& s = pipeline.stats();
s.ticks_ingested.load() // total ticks received (queue push)s.ticks_stored.load() // ticks written to column stores.ticks_dropped.load() // dropped (ring buffer overflow)s.queries_executed.load()s.total_rows_scanned.load()s.partitions_created.load()s.last_ingest_latency_ns.load()Sub-component access
Section titled “Sub-component access”PartitionManager& pm = pipeline.partition_manager();TickPlant& tp = pipeline.tick_plant();
// nullptr in PURE_IN_MEMORY modeHDBReader* hdb = pipeline.hdb_reader();FlushManager* fm = pipeline.flush_manager();QueryExecutor (SQL)
Section titled “QueryExecutor (SQL)”#include "zeptodb/sql/executor.h" — Namespace: zeptodb::sql
Parses SQL strings and executes them against ZeptoPipeline.
Construction
Section titled “Construction”#include "zeptodb/sql/executor.h"using namespace zeptodb::sql;
// Default: serial execution, LocalQuerySchedulerQueryExecutor exec{pipeline};
// Custom scheduler injection (testing or distributed)auto sched = std::make_unique<MyDistributedScheduler>(...);QueryExecutor exec{pipeline, std::move(sched)};Parallel execution
Section titled “Parallel execution”// Enable parallel (auto = hardware_concurrency threads)exec.enable_parallel();
// Enable with explicit settingsexec.enable_parallel( /*num_threads=*/8, /*row_threshold=*/100'000 // use serial for < 100k rows);
exec.disable_parallel();
// Inspect current settingsconst ParallelOptions& opts = exec.parallel_options();opts.enabled // boolopts.num_threads // size_t (0 = hardware_concurrency)opts.row_threshold // size_tExecute SQL
Section titled “Execute SQL”QueryResultSet result = exec.execute( "SELECT vwap(price, volume), count(*) " "FROM trades WHERE symbol = 1");
if (!result.ok()) { std::cerr << "Error: " << result.error << "\n"; return;}
// Column namesfor (const std::string& col : result.column_names) { ... }
// Rows — all values as int64for (const std::vector<int64_t>& row : result.rows) { for (size_t i = 0; i < row.size(); ++i) { std::cout << result.column_names[i] << " = " << row[i] << "\n"; }}
std::cout << result.execution_time_us << " μs, " << result.rows_scanned << " rows scanned\n";Execute with cancellation token
Section titled “Execute with cancellation token”#include "zeptodb/auth/cancellation_token.h"
zeptodb::auth::CancellationToken token;
// Cancel from another threadstd::thread canceller([&token] { std::this_thread::sleep_for(std::chrono::milliseconds(100)); token.cancel();});
QueryResultSet result = exec.execute(sql, &token);canceller.join();
if (!result.ok()) { // result.error == "Query cancelled"}QueryResultSet
Section titled “QueryResultSet”struct QueryResultSet { std::vector<std::string> column_names; std::vector<ColumnType> column_types; std::vector<std::vector<int64_t>> rows; // all values as int64
double execution_time_us = 0.0; size_t rows_scanned = 0; std::string error; // empty if ok
bool ok() const { return error.empty(); }};PartitionManager & Partition
Section titled “PartitionManager & Partition”#include "zeptodb/storage/partition_manager.h" — Namespace: zeptodb::storage
PartitionManager
Section titled “PartitionManager”PartitionManager& pm = pipeline.partition_manager();
// Get or create partition for (symbol, timestamp)// Creates a new partition if none exists for this (symbol, date_bucket)Partition& part = pm.get_or_create(symbol_id, timestamp_ns);
// All partitions for a symbol (ordered by time)std::vector<Partition*> parts = pm.get_partitions(symbol_id);
// Partitions overlapping [from_ns, to_ns] — O(partitions) with O(1) overlap checkstd::vector<Partition*> parts = pm.get_partitions_for_time_range( symbol_id, from_ns, to_ns);
// Total partition count (all symbols)size_t n = pm.partition_count();Partition
Section titled “Partition”Direct read-only access to column data — zero copy.
// Column data pointers (nullptr if column doesn't exist)const int64_t* prices = part.get_column("price");const int64_t* volumes = part.get_column("volume");const int64_t* timestamps = part.get_column("timestamp");size_t row_count = part.row_count();
// Partition keyconst PartitionKey& key = part.key();key.symbol_id // SymbolId (int64)key.date // Date bucket (int64, nanoseconds floored to day)
// Time range binary search — O(log n) on sorted timestamp column// Returns [begin_row, end_row) half-open rangeauto [begin_row, end_row] = part.timestamp_range(from_ns, to_ns);
// O(1) overlap check using first/last row timestampsbool overlaps = part.overlaps_time_range(from_ns, to_ns);TickMessage
Section titled “TickMessage”#include "zeptodb/common/types.h"
using SymbolId = int64_t;using Timestamp = int64_t; // nanoseconds since Unix epoch
struct TickMessage { SymbolId symbol_id = 0; int64_t price = 0; // scaled integer (e.g. cents: 150.25 → 15025) int64_t volume = 0; Timestamp recv_ts = 0; // nanoseconds since epoch int64_t bid = 0; // optional int64_t ask = 0; // optional int64_t extra[4] = {}; // user-defined columns};Timestamp utilities
Section titled “Timestamp utilities”#include "zeptodb/common/types.h"
// Current nanosecond timestampTimestamp ts = now_ns();
// Convert from epoch secondsTimestamp ts = 1711000000LL * 1'000'000'000LL;
// Convert from epoch millisecondsTimestamp ts = 1711000000000LL * 1'000'000LL;
// Nanosecond constantsconstexpr int64_t NS_PER_US = 1'000LL;constexpr int64_t NS_PER_MS = 1'000'000LL;constexpr int64_t NS_PER_S = 1'000'000'000LL;constexpr int64_t NS_PER_MIN = 60'000'000'000LL;constexpr int64_t NS_PER_H = 3'600'000'000'000LL;constexpr int64_t NS_PER_DAY = 86'400'000'000'000LL;Auth — CancellationToken
Section titled “Auth — CancellationToken”#include "zeptodb/auth/cancellation_token.h" — Namespace: zeptodb::auth
Used to cancel long-running queries from another thread.
#include "zeptodb/auth/cancellation_token.h"
zeptodb::auth::CancellationToken token;
// In another thread:token.cancel(); // signals cancellationtoken.is_cancelled(); // bool — check from executor hot loop
// Reset for reusetoken.reset();Quick Build Reference
Section titled “Quick Build Reference”mkdir -p build && cd buildcmake .. -G Ninja \ -DCMAKE_BUILD_TYPE=Release \ -DCMAKE_C_COMPILER=clang-19 \ -DCMAKE_CXX_COMPILER=clang++-19 \ -DAPEX_USE_PARQUET=OFF \ -DAPEX_USE_S3=OFF \ -DAPEX_BUILD_PYTHON=OFFninja -j$(nproc)
# Run tests./tests/zepto_testsSee also: SQL Reference · Python Reference · HTTP Reference