Skip to content

Distributed Query Correctness: VWAP and ORDER BY Across Nodes

Distributing data across nodes is the easy part. Getting correct query results back is where it gets interesting. This post covers how ZeptoDB handles VWAP decomposition, distributed ORDER BY, and seven other correctness gaps we found and fixed in the scatter-gather query path.


VWAP(price, volume) = SUM(price × volume) / SUM(volume). When scattered to multiple nodes, each node computes a local VWAP. Naively averaging local VWAPs gives the wrong answer:

Node 1: 3 ticks, price=100, vol=10
Local VWAP = 100
Node 2: 2 ticks, price=200, vol=20
Local VWAP = 200
Wrong: (100 + 200) / 2 = 150
Correct: (3000 + 8000) / (30 + 40) = 157.14

The fix: decompose VWAP into its constituent sums before scattering.


The coordinator rewrites the scatter SQL using the same pattern as AVG:

Original: SELECT VWAP(price, volume) FROM trades
Scatter to each node:
SELECT SUM(price * volume) AS sum_pv,
SUM(volume) AS sum_v
FROM trades
Coordinator merge:
result = total_sum_pv / total_sum_v

This is implemented in build_avg_rewrite() with an is_vwap flag on AvgColInfo. The pattern generalizes to any ratio-based aggregate.


Each node applies ORDER BY and LIMIT locally, but concatenating sorted sublists doesn’t produce a sorted result. The coordinator now applies a final sort-and-truncate after merging:

Node 1 (ORDER BY price DESC LIMIT 3): [500, 300, 100]
Node 2 (ORDER BY price DESC LIMIT 3): [400, 200, 150]
Concat: [500, 300, 100, 400, 200, 150] ← unsorted!
Sort: [500, 400, 300, 200, 150, 100]
Limit 3: [500, 400, 300] ← correct

The post-merge step parses the original SQL for ORDER BY and LIMIT clauses, then applies std::sort + resize() as a final processing step in execute_sql().


We identified and fixed 9 distributed query correctness gaps:

GapStrategyKey Insight
VWAPDecompose to SUM(p×v), SUM(v)Ratio aggregates can’t be averaged
ORDER BY + LIMITPost-merge sort + truncateLocal sort ≠ global sort
HAVINGStrip from scatter, apply post-mergeFilter after global aggregation
DISTINCTstd::set dedup at coordinatorCross-node duplicates possible
Window functionsFetch-and-compute via temp pipelineNeeds full dataset context
FIRST / LASTFetch-and-compute + store_tick_direct()Position-dependent aggregates
COUNT(DISTINCT)Parser + executor + fetch-and-computeCan’t sum local counts
Subquery / CTEDetect → fetch-and-computeNeeds full dataset
Multi-column ORDER BYComposite key sort in post-mergeLexicographic comparison

The coordinator uses three merge strategies based on query type:

CONCAT

Simple row concatenation for non-aggregate queries. Post-merge ORDER BY/LIMIT applied after.

SCALAR_AGG

For queries with only aggregate functions (SUM, AVG, VWAP). Partial results merged arithmetically.

MERGE_GROUP_BY

For GROUP BY queries. Groups from different nodes with the same key are merged.

The fetch-and-compute fallback (used for window functions, CTE, FIRST/LAST, COUNT DISTINCT) pulls all raw data to the coordinator and runs the query locally. This is correct but expensive — acceptable for complex queries where decomposition isn’t possible.


SELECT * was misclassified as SCALAR_AGG because the is_star flag caused the aggregate-detection loop to skip the column. The fix: treat agg == NONE as non-aggregate regardless of is_star.


GapImpact
Cancel propagationCoordinator timeout should cancel remote RPCs
Partial failure policySome nodes fail: return partial result or error?
In-flight query safetyNode add/remove during scatter → race condition
Dual-write during migrationData loss gap during partition move
AVG int64 truncationFloat AVG loses precision with integer division
VWAP int64 overflowSUM(price × volume) can overflow int64

These are tracked in the backlog. The precision issues (overflow, truncation) will be addressed when we add float column support.


All 9 correctness fixes are verified with dedicated tests:

QueryCoordinator.TwoNodeRemote_DistributedVwap ✅ VWAP=157
QueryCoordinator.TwoNodeRemote_OrderByLimit ✅ DESC [500,400,300]
QueryCoordinator.TwoNodeRemote_DistributedHaving ✅ Post-merge filter
QueryCoordinator.TwoNodeRemote_DistributedDistinct ✅ 4 unique values
QueryCoordinator.TwoNodeRemote_DistributedWindowFunction ✅ Cross-node LAG
QueryCoordinator.TwoNodeRemote_DistributedFirstLast ✅ FIRST=100, LAST=500
QueryCoordinator.TwoNodeRemote_DistributedCountDistinct ✅ COUNT(DISTINCT)=4
QueryCoordinator.TwoNodeRemote_DistributedCTE ✅ CTE on full dataset
QueryCoordinator.TwoNodeRemote_MultiColumnOrderBy ✅ Composite sort

Total suite: 607 tests passing.