3.48x on 8 threads
Near-linear scaling for GROUP BY across partitions. Serial fallback for small data.
ZeptoDB’s query executor started as a single-threaded loop over partitions. Parallelizing it was straightforward — the harder problem was designing an abstraction that scales from multi-core to multi-node without rewriting the executor. This post covers the QueryScheduler pattern and the parallel execution results.
“The goal is not to implement multi-node right now, but to set up a structure where nodes can be added later without modifying QueryExecutor code.”
The key insight: where execution happens (local threads vs. remote nodes) should be invisible to the query executor. The executor describes what to compute; the scheduler decides where.
class QueryScheduler { virtual vector<PartialAggResult> scatter(const vector<QueryFragment>&) = 0; virtual PartialAggResult gather(vector<PartialAggResult>&&) = 0; virtual size_t worker_count() const = 0; virtual string scheduler_type() const = 0;};Two operations: scatter distributes work fragments to workers, gather merges partial results. The executor doesn’t know if workers are threads or remote nodes.
QueryExecutor │ └── QueryScheduler (abstract) ├── LocalQueryScheduler ← thread pool, implemented now └── DistributedQueryScheduler ← UCX transport, future// Default: local thread poolQueryExecutor ex(pipeline);
// Testing or distributed: inject any schedulerQueryExecutor ex(pipeline, std::make_unique<DistributedQueryScheduler>(nodes));The executor’s parallel paths call scatter() and gather() — the implementation is swappable at construction time.
Not every query benefits from parallelism. The ParallelScanExecutor selects the execution mode based on data shape:
total_rows < 100K → SERIAL (thread overhead > benefit)num_partitions ≥ num_threads → PARTITION (one partition per thread)otherwise → CHUNKED (split rows within partition)The PARTITION mode is the common case for time-series workloads — data is already partitioned by symbol, and each partition can be aggregated independently.
A subtle bug: if num_threads == 1, the parallel entry point calls the serial path, which calls the parallel entry point again. The fix is a guard at the top:
if (pool_raw_ == nullptr || pool_raw_->num_threads() <= 1) { return exec_group_agg_serial(stmt, partitions); // direct serial call}Each worker produces a PartialAggResult — a partial aggregate that can be merged:
Thread 0: partitions[0..2] → PartialAggResult { sum=1500, count=300 }Thread 1: partitions[3..5] → PartialAggResult { sum=2200, count=450 } ↓ gather() Final: { sum=3700, count=750, avg=4.93 }For GROUP BY, each partial result contains a map of group_key → partial_aggregate. The gather step merges maps by key.
GROUP BY partials need map<int64_t, PartialAggResult> — but the value type is the struct itself (incomplete type). The fix: shared_ptr indirection.
// Won't compile: PartialAggResult is incompletestd::unordered_map<int64_t, PartialAggResult> group_partials;
// Fix: shared_ptr breaks the incomplete type cyclestd::unordered_map<int64_t, std::shared_ptr<PartialAggResult>> group_partials;1M rows, 2 symbols, varying thread counts:
| Query | 1T | 2T | 4T | 8T |
|---|---|---|---|---|
| GROUP BY symbol sum(volume) | 0.862ms | 0.460ms | 0.398ms | 0.248ms |
| 1.87x | 2.16x | 3.48x |
Single-partition queries (WHERE symbol = X) automatically fall through to the serial path — no thread pool overhead for queries that can’t benefit.
The scatter/gather API itself adds ~0.033ms per round — negligible compared to query execution time.
The architecture is designed so that switching from local to distributed execution requires zero changes to QueryExecutor:
Current (single node) Future (multi-node)───────────────────── ────────────────────QueryExecutor QueryExecutor ← unchanged └── LocalQueryScheduler └── DistributedQueryScheduler └── WorkerPool (jthread) ├── UCX transport (node A) ├── UCX transport (node B) └── ...The distributed scheduler will:
QueryFragment via FlatBuffers, send to nodes via UCXPartialAggResult from each node, merge locallyPartialAggResult::serialize() / deserialize() stubs are already in place. The UCX transport backend is implemented from the cluster layer.
3.48x on 8 threads
Near-linear scaling for GROUP BY across partitions. Serial fallback for small data.
Zero executor changes
QueryScheduler DI pattern means multi-node is a scheduler swap, not a rewrite.
Automatic mode selection
SERIAL / PARTITION / CHUNKED chosen based on data shape. No manual tuning needed.
Related: GROUP BY Optimization → · Cost-Based Query Planner → · Bare-Metal Tuning →