CONCAT
Simple row concatenation for non-aggregate queries. Post-merge ORDER BY/LIMIT applied after.
Distributing data across nodes is the easy part. Getting correct query results back is where it gets interesting. This post covers how ZeptoDB handles VWAP decomposition, distributed ORDER BY, and seven other correctness gaps we found and fixed in the scatter-gather query path.
VWAP(price, volume) = SUM(price × volume) / SUM(volume). When scattered to multiple nodes, each node computes a local VWAP. Naively averaging local VWAPs gives the wrong answer:
Node 1: 3 ticks, price=100, vol=10 Local VWAP = 100
Node 2: 2 ticks, price=200, vol=20 Local VWAP = 200
Wrong: (100 + 200) / 2 = 150Correct: (3000 + 8000) / (30 + 40) = 157.14The fix: decompose VWAP into its constituent sums before scattering.
The coordinator rewrites the scatter SQL using the same pattern as AVG:
Original: SELECT VWAP(price, volume) FROM trades
Scatter to each node: SELECT SUM(price * volume) AS sum_pv, SUM(volume) AS sum_v FROM trades
Coordinator merge: result = total_sum_pv / total_sum_vThis is implemented in build_avg_rewrite() with an is_vwap flag on AvgColInfo. The pattern generalizes to any ratio-based aggregate.
Each node applies ORDER BY and LIMIT locally, but concatenating sorted sublists doesn’t produce a sorted result. The coordinator now applies a final sort-and-truncate after merging:
Node 1 (ORDER BY price DESC LIMIT 3): [500, 300, 100]Node 2 (ORDER BY price DESC LIMIT 3): [400, 200, 150]
Concat: [500, 300, 100, 400, 200, 150] ← unsorted!Sort: [500, 400, 300, 200, 150, 100]Limit 3: [500, 400, 300] ← correctThe post-merge step parses the original SQL for ORDER BY and LIMIT clauses, then applies std::sort + resize() as a final processing step in execute_sql().
We identified and fixed 9 distributed query correctness gaps:
| Gap | Strategy | Key Insight |
|---|---|---|
| VWAP | Decompose to SUM(p×v), SUM(v) | Ratio aggregates can’t be averaged |
| ORDER BY + LIMIT | Post-merge sort + truncate | Local sort ≠ global sort |
| HAVING | Strip from scatter, apply post-merge | Filter after global aggregation |
| DISTINCT | std::set dedup at coordinator | Cross-node duplicates possible |
| Window functions | Fetch-and-compute via temp pipeline | Needs full dataset context |
| FIRST / LAST | Fetch-and-compute + store_tick_direct() | Position-dependent aggregates |
| COUNT(DISTINCT) | Parser + executor + fetch-and-compute | Can’t sum local counts |
| Subquery / CTE | Detect → fetch-and-compute | Needs full dataset |
| Multi-column ORDER BY | Composite key sort in post-merge | Lexicographic comparison |
The coordinator uses three merge strategies based on query type:
CONCAT
Simple row concatenation for non-aggregate queries. Post-merge ORDER BY/LIMIT applied after.
SCALAR_AGG
For queries with only aggregate functions (SUM, AVG, VWAP). Partial results merged arithmetically.
MERGE_GROUP_BY
For GROUP BY queries. Groups from different nodes with the same key are merged.
The fetch-and-compute fallback (used for window functions, CTE, FIRST/LAST, COUNT DISTINCT) pulls all raw data to the coordinator and runs the query locally. This is correct but expensive — acceptable for complex queries where decomposition isn’t possible.
SELECT * was misclassified as SCALAR_AGG because the is_star flag caused the aggregate-detection loop to skip the column. The fix: treat agg == NONE as non-aggregate regardless of is_star.
| Gap | Impact |
|---|---|
| Cancel propagation | Coordinator timeout should cancel remote RPCs |
| Partial failure policy | Some nodes fail: return partial result or error? |
| In-flight query safety | Node add/remove during scatter → race condition |
| Dual-write during migration | Data loss gap during partition move |
| AVG int64 truncation | Float AVG loses precision with integer division |
| VWAP int64 overflow | SUM(price × volume) can overflow int64 |
These are tracked in the backlog. The precision issues (overflow, truncation) will be addressed when we add float column support.
All 9 correctness fixes are verified with dedicated tests:
QueryCoordinator.TwoNodeRemote_DistributedVwap ✅ VWAP=157QueryCoordinator.TwoNodeRemote_OrderByLimit ✅ DESC [500,400,300]QueryCoordinator.TwoNodeRemote_DistributedHaving ✅ Post-merge filterQueryCoordinator.TwoNodeRemote_DistributedDistinct ✅ 4 unique valuesQueryCoordinator.TwoNodeRemote_DistributedWindowFunction ✅ Cross-node LAGQueryCoordinator.TwoNodeRemote_DistributedFirstLast ✅ FIRST=100, LAST=500QueryCoordinator.TwoNodeRemote_DistributedCountDistinct ✅ COUNT(DISTINCT)=4QueryCoordinator.TwoNodeRemote_DistributedCTE ✅ CTE on full datasetQueryCoordinator.TwoNodeRemote_MultiColumnOrderBy ✅ Composite sortTotal suite: 607 tests passing.