Skip to content

ZeptoDB Python API Reference

Last updated: 2026-05-27

Two Python interfaces are available:

InterfaceDescriptionUse case
zeptodbLow-level pybind11 C++ bindingIn-process, maximum performance
zepto_pyHigh-level Python packagePandas/Polars/Arrow interop, HTTP client


End-to-end: ingest from polars, query via SQL, export to pandas

Section titled “End-to-end: ingest from polars, query via SQL, export to pandas”
import zeptodb
import polars as pl
from zepto_py import from_polars, query_to_pandas
# 1. Start pipeline
pipeline = zeptodb.Pipeline()
pipeline.start()
# 2. Build a polars DataFrame and ingest (zero-copy Arrow path)
df = pl.DataFrame({
"symbol": [1, 1, 1, 2, 2],
"price": [15000, 15010, 15020, 20000, 20010],
"volume": [100, 150, 200, 80, 90],
})
from_polars(df, pipeline,
symbol_col="symbol", price_col="price", volume_col="volume")
pipeline.drain()
# 3. Run SQL and get a pandas DataFrame
result = query_to_pandas(
pipeline,
"SELECT symbol, vwap(price, volume) AS vwap, sum(volume) AS vol "
"FROM trades GROUP BY symbol ORDER BY symbol"
)
print(result)
# symbol vwap vol
# 0 1 15012.222 450
# 1 2 20004.706 170
# 4. Zero-copy numpy column access
import numpy as np
prices = pipeline.get_column(symbol=1, name="price") # np.ndarray[int64], ~522ns
print(prices) # [15000, 15010, 15020]
pipeline.stop()
import zepto_py as zepto
# Connect to running zepto_server
db = zepto.connect("localhost", 8123)
# SQL → pandas
df = db.query_pandas(
"SELECT xbar(timestamp, 300000000000) AS bar, "
"first(price) AS open, last(price) AS close, sum(volume) AS vol "
"FROM trades WHERE symbol = 1 "
"GROUP BY xbar(timestamp, 300000000000) ORDER BY bar"
)
print(df)
import zepto_py as zepto
db = zepto.connect("localhost", 8123)
memory_id = db.memory.put(
"User prefers concise answers.",
embedding=[1.0, 0.0],
tenant_id="tenant_a",
namespace="agent",
user_id="u1",
session_id="s1",
type="preference",
token_count=5,
importance=2.0,
pinned=True,
)
matches = db.memory.search(
[1.0, 0.0],
tenant_id="tenant_a",
namespace="agent",
user_id="u1",
limit=5,
)
context = db.memory.get_context(
[1.0, 0.0],
tenant_id="tenant_a",
namespace="agent",
token_budget=128,
)
cache_id = db.cache.store(
"Summarize the latest task",
"Short task summary",
embedding=[0.9, 0.1],
tenant_id="tenant_a",
namespace="agent",
)
hit = db.cache.lookup(
" summarize THE latest task ",
embedding=[0.88, 0.12],
tenant_id="tenant_a",
namespace="agent",
)

High-throughput ingest with StreamingSession

Section titled “High-throughput ingest with StreamingSession”
import pandas as pd
from zepto_py import StreamingSession
sess = StreamingSession(pipeline, batch_size=50_000, error_mode="skip")
# Ingest a large DataFrame with progress display
big_df = pd.DataFrame({
"symbol": [1] * 1_000_000,
"price": range(15000, 16000000, 15),
"volume": [100] * 1_000_000,
})
sess.ingest_pandas(big_df, show_progress=True,
symbol_col="symbol", price_col="price", volume_col="volume")
# Ingested 1,000,000 rows in 1.82s (549,451 rows/sec)
stats = sess.stats()
print(f"Ingested {stats.rows_ingested:,} rows at {stats.throughput:,.0f} rows/sec")
import pyarrow as pa
from zepto_py import ArrowSession
sess = ArrowSession(pipeline)
# Export to Arrow (zero-copy)
table = sess.to_arrow(symbol=1)
print(table.schema) # timestamp: int64, price: int64, volume: int64
# Query with DuckDB directly on ZeptoDB data
conn = sess.to_duckdb(symbol=1, table_name="trades")
df = conn.execute(
"SELECT avg(price), stddev(price) FROM trades"
).fetchdf()
print(df)

The zeptodb module is the low-level C++ binding built with pybind11. Build with cmake -DAPEX_BUILD_PYTHON=ON.

import zeptodb
# Default config (pure in-memory, 32 MB arena per partition)
pipeline = zeptodb.Pipeline()
# Custom config
pipeline = zeptodb.Pipeline(config=zeptodb.PipelineConfig(
arena_size=32 * 1024 * 1024,
drain_batch_size=256,
storage_mode=zeptodb.StorageMode.PURE_IN_MEMORY,
# storage_mode=zeptodb.StorageMode.TIERED,
# storage_mode=zeptodb.StorageMode.PURE_ON_DISK,
))
pipeline.start() # start background drain thread
pipeline.stop() # flush queue + stop drain thread
pipeline.drain() # synchronous drain — useful in tests without background thread
# Single tick
pipeline.ingest(symbol=1, price=15000, volume=100)
pipeline.ingest(symbol=1, price=15010, volume=50, timestamp=1711000000000000000)
# Batch ingest — vectorized, single C++ call, no Python loop
import numpy as np
syms = np.array([1, 1, 1], dtype=np.int64)
prices = np.array([15000, 15010, 15020], dtype=np.int64)
vols = np.array([100, 50, 75], dtype=np.int64)
pipeline.ingest_batch(syms, prices, vols)
# Float batch ingest — auto scale float64 → int64 in C++
prices_f = np.array([150.00, 150.10, 150.20], dtype=np.float64)
vols_f = np.array([100.0, 50.0, 75.0], dtype=np.float64)
pipeline.ingest_float_batch(syms, prices_f, vols_f, price_scale=100.0)
# Table-aware ingest (Stage B, devlog 084) — lands rows in a specific
# CREATE TABLE table so that SELECT FROM other_table sees 0 rows.
pipeline.execute("CREATE TABLE trades (symbol INT64, price INT64, volume INT64, timestamp TIMESTAMP_NS)")
pipeline.ingest_batch(syms, prices, vols, table_name="trades")
# Unknown table raises ValueError:
# pipeline.ingest_batch(syms, prices, vols, table_name="nope") # ValueError
result = pipeline.vwap(symbol=1) # → float
result = pipeline.vwap(symbol=1, from_ns=t0, to_ns=t1)
result = pipeline.count(symbol=1) # → int
result = pipeline.sum(symbol=1, col="volume") # → int

Returns a numpy view into ZeptoDB’s internal column buffer — no copy, ~522ns.

prices = pipeline.get_column(symbol=1, name="price") # np.ndarray[int64]
volumes = pipeline.get_column(symbol=1, name="volume") # np.ndarray[int64]
timestamps = pipeline.get_column(symbol=1, name="timestamp") # np.ndarray[int64]
assert prices.base is not None # it's a view
assert not prices.flags['OWNDATA'] # zero-copy confirmed
stats = pipeline.stats()
stats.ticks_ingested # int — total ticks received
stats.ticks_stored # int — written to column store
stats.ticks_dropped # int — dropped (queue overflow)
stats.queries_executed # int
stats.total_rows_scanned # int
stats.partitions_created # int
stats.last_ingest_latency_ns # int

Participate in a ZeptoDB cluster as an in-process node. After enable_cluster_routing(), every ingest* / INSERT call dispatches via the consistent-hash PartitionRouter — to the local pipeline when self_id owns the symbol, otherwise forwarded over TCP RPC to the owning peer.

def enable_cluster_routing(
self_id: int,
peers: list[tuple[int, str, int]], # (node_id, host, http_port)
remove_self_from_ring: bool = True,
rpc_timeout_ms: int = 2000,
) -> None

Parameters:

NameTypeDescription
self_idintThis node’s ID in the cluster.
peerslist[tuple[int, str, int]](node_id, host, http_port) for every remote storage pod. Each tuple must have exactly three elements; a wrong shape raises TypeError / ValueError before any internal state is touched.
remove_self_from_ringboolTrue (default) matches the stateless ingest-node pattern (Option A, devlog 113) — the hash ring never picks self, so every tick is forwarded. False means this pipeline is a full cluster node that also owns a slice of the ring.
rpc_timeout_msintPer-peer TCP RPC timeout. Used for every TcpRpcClient in the peer pool.

Port convention: peer RPC port = peer HTTP port + 100 (same as zepto_http_server and zepto_ingest_node). You pass the HTTP port; the binding derives the RPC port internally.

Idempotent. Calling twice tears down the prior wiring (adapter → peer RPC map → coordinator) cleanly, then rebuilds. Argument parsing happens before teardown, so a bad peer tuple on the second call leaves the first wiring intact.

Example — in-process cluster front-door:

import zeptodb
p = zeptodb.Pipeline()
p.start()
# Forward every tick to one of two storage pods.
p.enable_cluster_routing(
self_id=99999, # not in the ring → always forward
peers=[
(0, "storage-0.zeptodb-headless", 8123),
(1, "storage-1.zeptodb-headless", 8123),
],
remove_self_from_ring=True,
rpc_timeout_ms=2000,
)
# SQL INSERT and ingest() now route via PartitionRouter.
p.execute("CREATE TABLE trades (symbol INT64, price INT64, volume INT64, timestamp TIMESTAMP_NS)")
p.ingest_batch(syms, prices, vols, table_name="trades")

Example — full cluster node:

p.enable_cluster_routing(
self_id=0,
peers=[(1, "node-1", 8123), (2, "node-2", 8123)],
remove_self_from_ring=False, # self owns a slice of the ring
)

Errors:

  • TypeErrorpeers element is not a tuple, or self_id / port is not castable to uint32_t / uint16_t.
  • ValueError — a peers tuple does not have exactly 3 elements.
  • RuntimeError — after successful wiring, an ingest fails because the ring is empty (no peers, self removed) or every peer is unreachable. Same error type as the pre-existing “queue full” path.

from zeptodb.sql import QueryExecutor
executor = QueryExecutor(pipeline)
# Parallel execution
executor.enable_parallel() # auto thread count
executor.enable_parallel(num_threads=8, row_threshold=100_000)
executor.disable_parallel()
# Execute SQL
result = executor.execute("SELECT vwap(price, volume) FROM trades WHERE symbol = 1")
# Result fields
result.column_names # list[str]
result.rows # list[list[int]] — all values as int64
result.execution_time_us # float
result.rows_scanned # int
result.error # str — empty if ok
result.ok() # bool
# Iterate results
for row in result.rows:
sym_id = row[result.column_names.index("symbol")]
price = row[result.column_names.index("price")]

Connects to a running zepto_server on port 8123 (ClickHouse-compatible HTTP API). The connection exposes SQL/dataframe helpers plus memory and cache wrappers for the AI memory HTTP endpoints.

import zepto_py as zepto
from zepto_py import ZeptoConnection
# Connect
db = zepto.connect("localhost", 8123)
# or equivalently:
db = ZeptoConnection(host="localhost", port=8123)
# Health check
db.ping() # → True if server is up
# SQL → pandas DataFrame
df = db.query_pandas("SELECT symbol, avg(price) FROM trades GROUP BY symbol")
# Returns: pd.DataFrame
# SQL → polars DataFrame
df = db.query_polars("SELECT symbol, avg(price) FROM trades GROUP BY symbol")
# Returns: pl.DataFrame
# SQL → numpy dict
arrays = db.query_numpy("SELECT price, volume FROM trades WHERE symbol = 1")
# Returns: dict[str, np.ndarray]
# Ingest pandas → HTTP
db.ingest_pandas(df,
symbol_col="symbol",
price_col="price",
volume_col="volume")
# Ingest pandas into a specific table (devlog 088)
db.ingest_pandas(df, table_name="my_trades") # INSERT INTO my_trades ...
# Ingest polars → HTTP
db.ingest_polars(df_polars)
db.ingest_polars(df_polars, table_name="my_trades")
# DDL convenience helpers (devlog 088) — wrappers over execute() / query()
db.create_table(
"my_trades",
[("symbol", "INT64"), ("price", "INT64"),
("volume", "INT64"), ("timestamp", "INT64")],
if_not_exists=True,
)
db.list_tables() # → ['my_trades', ...]
db.drop_table("my_trades", if_exists=True)

The high-level HTTP client mirrors the /api/ai/* endpoints. Embeddings are client-supplied lists of floats; ZeptoDB stores and ranks them but does not create embeddings or call LLM providers.

Stores or updates a memory object and returns its memory_id.

memory_id = db.memory.put(
content,
embedding=[1.0, 0.0],
memory_id="",
namespace="default",
tenant_id="",
user_id="",
session_id="",
agent_id="",
type="memory",
metadata_json="{}",
token_count=0,
importance=0.0,
expires_at_ns=0,
pinned=False,
)

Returns the matches array from /api/ai/memories/search.

matches = db.memory.search(
query_embedding=[1.0, 0.0],
namespace="default",
tenant_id="",
user_id="",
session_id="",
agent_id="",
type="",
limit=10,
)

Returns ranked memories plus token_count under the requested budget.

context = db.memory.get_context(
query_embedding=[1.0, 0.0],
token_budget=256,
namespace="default",
tenant_id="",
user_id="",
session_id="",
agent_id="",
type="",
limit=10,
)

Stores a prompt/response cache entry and returns its cache_id.

cache_id = db.cache.store(
prompt,
response,
embedding=[1.0, 0.0],
cache_id="",
namespace="default",
tenant_id="",
metadata_json="{}",
token_count=0,
expires_at_ns=0,
)

Performs exact prompt lookup first, then semantic lookup using the supplied embedding and threshold.

hit = db.cache.lookup(
prompt,
embedding=[1.0, 0.0],
namespace="default",
tenant_id="",
semantic_threshold=0.92,
)

Runnable examples live in examples/agent_memory/:

  • provider_cache.py — application-level exact/semantic cache before a mock provider call.
  • langgraph_memory.py — LangGraph-style retrieve/call/remember node functions without requiring LangGraph at runtime.
  • production_agent_demo.py — production-shaped turn flow that retrieves context, checks the cache, calls a provider on miss, stores the result, and records AgentOps telemetry.
  • agent_attached_timeseries_demo.py — five vertical scenarios that seed live time-series tables and related MemoryRecord context for finance, IoT, observability, robotics, and game/live-ops agents.
  • agentops_schema.py — canonical agent_runs, retrieval_events, cache_events, llm_calls, and tool_calls table definitions.

The examples use deterministic client-side embeddings and mock model/provider functions, so they can run without API keys. Replace the embedding and provider functions with your production model stack.

Optional adapters in examples/agent_memory/adapters.py connect the same flow to installed provider/framework SDKs without adding hard dependencies:

  • OpenAIResponsesAdapter(model=...)
  • AnthropicMessagesAdapter(model=...)
  • build_langgraph_app(db, model, embed)
MethodSignaturePurpose
create_tablecreate_table(name, columns, if_not_exists=False)columns is a list[tuple[str, str]] of (col_name, sql_type); issues CREATE TABLE [IF NOT EXISTS] name (...)
drop_tabledrop_table(name, if_exists=False)Issues DROP TABLE [IF EXISTS] name
list_tableslist_tables() -> list[str]Runs SHOW TABLES and returns the first column of every row
ingest_pandasingest_pandas(df, ..., table_name="ticks")table_name kwarg (default "ticks" for backward compat) selects the destination table for the generated INSERT statements
ingest_polarsingest_polars(df, ..., table_name="ticks")Passes table_name through to ingest_pandas

Identifier validation (devlog 089). All caller-supplied identifiers are validated before interpolation into SQL — invalid names raise ValueError and never hit the wire.

ArgRegexMethods that validate it
table / column name^[A-Za-z_][A-Za-z0-9_]*$create_table, drop_table, ingest_pandas, ingest_polars
column type string^[A-Za-z0-9_]+$create_table
db.create_table("ticks; DROP TABLE x; --", [("a", "INT64")])
# ValueError: Invalid SQL identifier: "ticks; DROP TABLE x; --"
db.create_table("safe", [("col; DROP", "INT64")])
# ValueError: Invalid SQL identifier: "col; DROP"

ingest_pandas also SQL-escapes single quotes inside string values (standard '' doubling), so DataFrame rows containing it's are inserted correctly.


Standalone converters. Requires a live zeptodb.Pipeline (C++ binding) object — no HTTP.

from zepto_py import (
from_pandas, from_polars, from_arrow,
to_pandas, to_polars,
query_to_pandas, query_to_polars,
)

Vectorized — uses df[col].to_numpy(copy=False) then single ingest_batch() call. No Python row loop.

from_pandas(
df, # pd.DataFrame
pipeline, # zeptodb.Pipeline
symbol_col="symbol", # column name for symbol id
price_col="price", # column name for price
volume_col="volume", # column name for volume
timestamp_col="timestamp", # optional; omit to use now_ns()
price_scale=1.0, # float → int64: value * price_scale
)

Zero-copy via Arrow buffer — Series.to_numpy() returns Arrow buffer directly for numeric columns without nulls.

from_polars(
df, # pl.DataFrame
pipeline,
symbol_col="symbol",
price_col="price",
volume_col="volume",
price_scale=1.0,
)
from_arrow(
table, # pa.Table
pipeline,
symbol_col="symbol",
price_col="price",
volume_col="volume",
)
# All columns for a symbol → pandas
df = to_pandas(pipeline, symbol=1)
# Returns: pd.DataFrame with columns: timestamp, price, volume, ...
# All columns for a symbol → polars
df = to_polars(pipeline, symbol=1)
# SQL → pandas
df = query_to_pandas(pipeline,
"SELECT avg(price), sum(volume) FROM trades WHERE symbol = 1")
# SQL → polars
df = query_to_polars(pipeline,
"SELECT symbol, sum(volume) FROM trades GROUP BY symbol")
MethodThroughput
from_polars()~3.3M rows/sec
from_pandas()~2M rows/sec
from_arrow()~3M rows/sec

Zero-copy Arrow table exchange and DuckDB registration.

from zepto_py import ArrowSession
import pyarrow as pa
sess = ArrowSession(pipeline)
# Arrow Table → ZeptoDB
sess.ingest_arrow(
table, # pa.Table
symbol_col="symbol",
price_col="price",
volume_col="volume",
)
# Arrow RecordBatch → ZeptoDB
sess.ingest_record_batch(batch) # pa.RecordBatch
# Per-column Arrow arrays → ZeptoDB (raw columnar)
sess.ingest_arrow_columnar(
symbols=sym_array, # pa.Array[int64]
prices=px_array, # pa.Array[int64]
volumes=vol_array, # pa.Array[int64]
)
# ZeptoDB → Arrow Table (zero-copy)
table = sess.to_arrow(symbol=1)
# Returns: pa.Table with schema: timestamp: int64, price: int64, volume: int64
# ZeptoDB → Arrow RecordBatchReader (streaming/lazy)
reader = sess.to_record_batch_reader(symbol=1)
for batch in reader:
print(batch.num_rows)
# ZeptoDB → polars (via Arrow buffer — true zero-copy)
df = sess.to_polars_zero_copy(symbol=1)
# Returns: pl.DataFrame sharing Arrow buffer — no copy
# ZeptoDB → DuckDB in-memory table
conn = sess.to_duckdb(symbol=1, table_name="trades")
result = conn.execute("SELECT avg(price) FROM trades").fetchdf()
# Schema
arrow_schema = sess.get_schema() # → pa.Schema
from zepto_py.arrow import zeptodb_schema_to_arrow
schema = zepto_schema_to_arrow(["timestamp", "price", "volume"])

zepto_py.streaming — high-throughput ingest

Section titled “zepto_py.streaming — high-throughput ingest”

Batch ingest from pandas/polars/generators with progress and error handling.

from zepto_py import StreamingSession
sess = StreamingSession(
pipeline,
batch_size=50_000, # rows per C++ ingest_batch() call
error_mode="skip", # see error modes below
)
sess.ingest_pandas(
df,
show_progress=True, # print throughput stats at end
symbol_col="symbol",
price_col="price",
volume_col="volume",
)
# → Ingested 1,000,000 rows in 1.82s (549,451 rows/sec)
sess.ingest_polars(
df_polars,
show_progress=True,
)
def tick_generator():
for row in external_feed:
yield {"symbol": row.sym, "price": row.px, "volume": row.vol}
sess.ingest_iter(
tick_generator(),
show_progress=True,
total=1_000_000, # optional: total for progress display
)
stats = sess.stats()
stats.rows_ingested # int
stats.rows_skipped # int
stats.batches # int
stats.elapsed_sec # float
stats.throughput # float (rows/sec)
ModeBehavior
"skip"Skip bad rows, continue ingesting
"raise"Raise exception on first error
"warn"Print warning to stderr, continue

From \ TopandaspolarsnumpyArrowDuckDBHTTP
ZeptoDB (in-proc)to_pandas()to_polars()get_column()to_arrow()to_duckdb()
ZeptoDB (HTTP)query_pandas()query_polars()query_numpy()POST /
pandas → APEXfrom_pandas()ingest_pandas()
polars → APEXfrom_polars()ingest_polars()
Arrow → APEXingest_arrow()

See also: SQL Reference · C++ Reference · HTTP Reference