Skip to content

Parallel Query Engine: From Single-Core to Multi-Node

ZeptoDB’s query executor started as a single-threaded loop over partitions. Parallelizing it was straightforward — the harder problem was designing an abstraction that scales from multi-core to multi-node without rewriting the executor. This post covers the QueryScheduler pattern and the parallel execution results.


“The goal is not to implement multi-node right now, but to set up a structure where nodes can be added later without modifying QueryExecutor code.”

The key insight: where execution happens (local threads vs. remote nodes) should be invisible to the query executor. The executor describes what to compute; the scheduler decides where.


class QueryScheduler {
virtual vector<PartialAggResult> scatter(const vector<QueryFragment>&) = 0;
virtual PartialAggResult gather(vector<PartialAggResult>&&) = 0;
virtual size_t worker_count() const = 0;
virtual string scheduler_type() const = 0;
};

Two operations: scatter distributes work fragments to workers, gather merges partial results. The executor doesn’t know if workers are threads or remote nodes.

QueryExecutor
└── QueryScheduler (abstract)
├── LocalQueryScheduler ← thread pool, implemented now
└── DistributedQueryScheduler ← UCX transport, future
// Default: local thread pool
QueryExecutor ex(pipeline);
// Testing or distributed: inject any scheduler
QueryExecutor ex(pipeline, std::make_unique<DistributedQueryScheduler>(nodes));

The executor’s parallel paths call scatter() and gather() — the implementation is swappable at construction time.


Not every query benefits from parallelism. The ParallelScanExecutor selects the execution mode based on data shape:

total_rows < 100K → SERIAL (thread overhead > benefit)
num_partitions ≥ num_threads → PARTITION (one partition per thread)
otherwise → CHUNKED (split rows within partition)

The PARTITION mode is the common case for time-series workloads — data is already partitioned by symbol, and each partition can be aggregated independently.

A subtle bug: if num_threads == 1, the parallel entry point calls the serial path, which calls the parallel entry point again. The fix is a guard at the top:

if (pool_raw_ == nullptr || pool_raw_->num_threads() <= 1) {
return exec_group_agg_serial(stmt, partitions); // direct serial call
}

Each worker produces a PartialAggResult — a partial aggregate that can be merged:

Thread 0: partitions[0..2] → PartialAggResult { sum=1500, count=300 }
Thread 1: partitions[3..5] → PartialAggResult { sum=2200, count=450 }
↓ gather()
Final: { sum=3700, count=750, avg=4.93 }

For GROUP BY, each partial result contains a map of group_key → partial_aggregate. The gather step merges maps by key.

GROUP BY partials need map<int64_t, PartialAggResult> — but the value type is the struct itself (incomplete type). The fix: shared_ptr indirection.

// Won't compile: PartialAggResult is incomplete
std::unordered_map<int64_t, PartialAggResult> group_partials;
// Fix: shared_ptr breaks the incomplete type cycle
std::unordered_map<int64_t, std::shared_ptr<PartialAggResult>> group_partials;

1M rows, 2 symbols, varying thread counts:

Query1T2T4T8T
GROUP BY symbol sum(volume)0.862ms0.460ms0.398ms0.248ms
1.87x2.16x3.48x

Single-partition queries (WHERE symbol = X) automatically fall through to the serial path — no thread pool overhead for queries that can’t benefit.

The scatter/gather API itself adds ~0.033ms per round — negligible compared to query execution time.


The architecture is designed so that switching from local to distributed execution requires zero changes to QueryExecutor:

Current (single node) Future (multi-node)
───────────────────── ────────────────────
QueryExecutor QueryExecutor ← unchanged
└── LocalQueryScheduler └── DistributedQueryScheduler
└── WorkerPool (jthread) ├── UCX transport (node A)
├── UCX transport (node B)
└── ...

The distributed scheduler will:

  • scatter: Serialize QueryFragment via FlatBuffers, send to nodes via UCX
  • gather: Receive PartialAggResult from each node, merge locally

PartialAggResult::serialize() / deserialize() stubs are already in place. The UCX transport backend is implemented from the cluster layer.

3.48x on 8 threads

Near-linear scaling for GROUP BY across partitions. Serial fallback for small data.

Zero executor changes

QueryScheduler DI pattern means multi-node is a scheduler swap, not a rewrite.

Automatic mode selection

SERIAL / PARTITION / CHUNKED chosen based on data shape. No manual tuning needed.


Related: GROUP BY Optimization → · Cost-Based Query Planner → · Bare-Metal Tuning →