ZeptoDB Python API Reference
Last updated: 2026-05-27
Two Python interfaces are available:
| Interface | Description | Use case |
|---|---|---|
zeptodb | Low-level pybind11 C++ binding | In-process, maximum performance |
zepto_py | High-level Python package | Pandas/Polars/Arrow interop, HTTP client |
Table of Contents
Section titled “Table of Contents”- zeptodb — pybind11 binding
- zepto_py.connection — HTTP client
- Agent Memory and Cache
- zepto_py.dataframe — bulk ingest/export
- zepto_py.arrow — Arrow / DuckDB interop
- zepto_py.streaming — high-throughput ingest
- Interoperability Matrix
Quick Start
Section titled “Quick Start”End-to-end: ingest from polars, query via SQL, export to pandas
Section titled “End-to-end: ingest from polars, query via SQL, export to pandas”import zeptodbimport polars as plfrom zepto_py import from_polars, query_to_pandas
# 1. Start pipelinepipeline = zeptodb.Pipeline()pipeline.start()
# 2. Build a polars DataFrame and ingest (zero-copy Arrow path)df = pl.DataFrame({ "symbol": [1, 1, 1, 2, 2], "price": [15000, 15010, 15020, 20000, 20010], "volume": [100, 150, 200, 80, 90],})from_polars(df, pipeline, symbol_col="symbol", price_col="price", volume_col="volume")pipeline.drain()
# 3. Run SQL and get a pandas DataFrameresult = query_to_pandas( pipeline, "SELECT symbol, vwap(price, volume) AS vwap, sum(volume) AS vol " "FROM trades GROUP BY symbol ORDER BY symbol")print(result)# symbol vwap vol# 0 1 15012.222 450# 1 2 20004.706 170
# 4. Zero-copy numpy column accessimport numpy as npprices = pipeline.get_column(symbol=1, name="price") # np.ndarray[int64], ~522nsprint(prices) # [15000, 15010, 15020]
pipeline.stop()HTTP client quick start
Section titled “HTTP client quick start”import zepto_py as zepto
# Connect to running zepto_serverdb = zepto.connect("localhost", 8123)
# SQL → pandasdf = db.query_pandas( "SELECT xbar(timestamp, 300000000000) AS bar, " "first(price) AS open, last(price) AS close, sum(volume) AS vol " "FROM trades WHERE symbol = 1 " "GROUP BY xbar(timestamp, 300000000000) ORDER BY bar")print(df)Agent memory and cache
Section titled “Agent memory and cache”import zepto_py as zepto
db = zepto.connect("localhost", 8123)
memory_id = db.memory.put( "User prefers concise answers.", embedding=[1.0, 0.0], tenant_id="tenant_a", namespace="agent", user_id="u1", session_id="s1", type="preference", token_count=5, importance=2.0, pinned=True,)
matches = db.memory.search( [1.0, 0.0], tenant_id="tenant_a", namespace="agent", user_id="u1", limit=5,)
context = db.memory.get_context( [1.0, 0.0], tenant_id="tenant_a", namespace="agent", token_budget=128,)
cache_id = db.cache.store( "Summarize the latest task", "Short task summary", embedding=[0.9, 0.1], tenant_id="tenant_a", namespace="agent",)
hit = db.cache.lookup( " summarize THE latest task ", embedding=[0.88, 0.12], tenant_id="tenant_a", namespace="agent",)High-throughput ingest with StreamingSession
Section titled “High-throughput ingest with StreamingSession”import pandas as pdfrom zepto_py import StreamingSession
sess = StreamingSession(pipeline, batch_size=50_000, error_mode="skip")
# Ingest a large DataFrame with progress displaybig_df = pd.DataFrame({ "symbol": [1] * 1_000_000, "price": range(15000, 16000000, 15), "volume": [100] * 1_000_000,})sess.ingest_pandas(big_df, show_progress=True, symbol_col="symbol", price_col="price", volume_col="volume")# Ingested 1,000,000 rows in 1.82s (549,451 rows/sec)
stats = sess.stats()print(f"Ingested {stats.rows_ingested:,} rows at {stats.throughput:,.0f} rows/sec")Arrow / DuckDB interop
Section titled “Arrow / DuckDB interop”import pyarrow as pafrom zepto_py import ArrowSession
sess = ArrowSession(pipeline)
# Export to Arrow (zero-copy)table = sess.to_arrow(symbol=1)print(table.schema) # timestamp: int64, price: int64, volume: int64
# Query with DuckDB directly on ZeptoDB dataconn = sess.to_duckdb(symbol=1, table_name="trades")df = conn.execute( "SELECT avg(price), stddev(price) FROM trades").fetchdf()print(df)zeptodb — pybind11 binding
Section titled “zeptodb — pybind11 binding”The zeptodb module is the low-level C++ binding built with pybind11.
Build with cmake -DAPEX_BUILD_PYTHON=ON.
zeptodb.Pipeline
Section titled “zeptodb.Pipeline”Construction
Section titled “Construction”import zeptodb
# Default config (pure in-memory, 32 MB arena per partition)pipeline = zeptodb.Pipeline()
# Custom configpipeline = zeptodb.Pipeline(config=zeptodb.PipelineConfig( arena_size=32 * 1024 * 1024, drain_batch_size=256, storage_mode=zeptodb.StorageMode.PURE_IN_MEMORY, # storage_mode=zeptodb.StorageMode.TIERED, # storage_mode=zeptodb.StorageMode.PURE_ON_DISK,))Lifecycle
Section titled “Lifecycle”pipeline.start() # start background drain threadpipeline.stop() # flush queue + stop drain threadpipeline.drain() # synchronous drain — useful in tests without background threadIngest
Section titled “Ingest”# Single tickpipeline.ingest(symbol=1, price=15000, volume=100)pipeline.ingest(symbol=1, price=15010, volume=50, timestamp=1711000000000000000)
# Batch ingest — vectorized, single C++ call, no Python loopimport numpy as npsyms = np.array([1, 1, 1], dtype=np.int64)prices = np.array([15000, 15010, 15020], dtype=np.int64)vols = np.array([100, 50, 75], dtype=np.int64)pipeline.ingest_batch(syms, prices, vols)
# Float batch ingest — auto scale float64 → int64 in C++prices_f = np.array([150.00, 150.10, 150.20], dtype=np.float64)vols_f = np.array([100.0, 50.0, 75.0], dtype=np.float64)pipeline.ingest_float_batch(syms, prices_f, vols_f, price_scale=100.0)
# Table-aware ingest (Stage B, devlog 084) — lands rows in a specific# CREATE TABLE table so that SELECT FROM other_table sees 0 rows.pipeline.execute("CREATE TABLE trades (symbol INT64, price INT64, volume INT64, timestamp TIMESTAMP_NS)")pipeline.ingest_batch(syms, prices, vols, table_name="trades")# Unknown table raises ValueError:# pipeline.ingest_batch(syms, prices, vols, table_name="nope") # ValueErrorDirect queries (C++ execution)
Section titled “Direct queries (C++ execution)”result = pipeline.vwap(symbol=1) # → floatresult = pipeline.vwap(symbol=1, from_ns=t0, to_ns=t1)result = pipeline.count(symbol=1) # → intresult = pipeline.sum(symbol=1, col="volume") # → intZero-copy column export
Section titled “Zero-copy column export”Returns a numpy view into ZeptoDB’s internal column buffer — no copy, ~522ns.
prices = pipeline.get_column(symbol=1, name="price") # np.ndarray[int64]volumes = pipeline.get_column(symbol=1, name="volume") # np.ndarray[int64]timestamps = pipeline.get_column(symbol=1, name="timestamp") # np.ndarray[int64]
assert prices.base is not None # it's a viewassert not prices.flags['OWNDATA'] # zero-copy confirmedStatistics
Section titled “Statistics”stats = pipeline.stats()stats.ticks_ingested # int — total ticks receivedstats.ticks_stored # int — written to column storestats.ticks_dropped # int — dropped (queue overflow)stats.queries_executed # intstats.total_rows_scanned # intstats.partitions_created # intstats.last_ingest_latency_ns # intCluster routing (devlog 114 — P8-I5)
Section titled “Cluster routing (devlog 114 — P8-I5)”Participate in a ZeptoDB cluster as an in-process node. After
enable_cluster_routing(), every ingest* / INSERT call dispatches
via the consistent-hash PartitionRouter — to the local pipeline when
self_id owns the symbol, otherwise forwarded over TCP RPC to the
owning peer.
def enable_cluster_routing( self_id: int, peers: list[tuple[int, str, int]], # (node_id, host, http_port) remove_self_from_ring: bool = True, rpc_timeout_ms: int = 2000,) -> NoneParameters:
| Name | Type | Description |
|---|---|---|
self_id | int | This node’s ID in the cluster. |
peers | list[tuple[int, str, int]] | (node_id, host, http_port) for every remote storage pod. Each tuple must have exactly three elements; a wrong shape raises TypeError / ValueError before any internal state is touched. |
remove_self_from_ring | bool | True (default) matches the stateless ingest-node pattern (Option A, devlog 113) — the hash ring never picks self, so every tick is forwarded. False means this pipeline is a full cluster node that also owns a slice of the ring. |
rpc_timeout_ms | int | Per-peer TCP RPC timeout. Used for every TcpRpcClient in the peer pool. |
Port convention: peer RPC port = peer HTTP port + 100 (same as
zepto_http_server and zepto_ingest_node). You pass the HTTP port;
the binding derives the RPC port internally.
Idempotent. Calling twice tears down the prior wiring (adapter → peer RPC map → coordinator) cleanly, then rebuilds. Argument parsing happens before teardown, so a bad peer tuple on the second call leaves the first wiring intact.
Example — in-process cluster front-door:
import zeptodb
p = zeptodb.Pipeline()p.start()
# Forward every tick to one of two storage pods.p.enable_cluster_routing( self_id=99999, # not in the ring → always forward peers=[ (0, "storage-0.zeptodb-headless", 8123), (1, "storage-1.zeptodb-headless", 8123), ], remove_self_from_ring=True, rpc_timeout_ms=2000,)
# SQL INSERT and ingest() now route via PartitionRouter.p.execute("CREATE TABLE trades (symbol INT64, price INT64, volume INT64, timestamp TIMESTAMP_NS)")p.ingest_batch(syms, prices, vols, table_name="trades")Example — full cluster node:
p.enable_cluster_routing( self_id=0, peers=[(1, "node-1", 8123), (2, "node-2", 8123)], remove_self_from_ring=False, # self owns a slice of the ring)Errors:
TypeError—peerselement is not a tuple, orself_id/ port is not castable touint32_t/uint16_t.ValueError— apeerstuple does not have exactly 3 elements.RuntimeError— after successful wiring, an ingest fails because the ring is empty (no peers, self removed) or every peer is unreachable. Same error type as the pre-existing “queue full” path.
zeptodb.sql.QueryExecutor
Section titled “zeptodb.sql.QueryExecutor”from zeptodb.sql import QueryExecutor
executor = QueryExecutor(pipeline)
# Parallel executionexecutor.enable_parallel() # auto thread countexecutor.enable_parallel(num_threads=8, row_threshold=100_000)executor.disable_parallel()
# Execute SQLresult = executor.execute("SELECT vwap(price, volume) FROM trades WHERE symbol = 1")
# Result fieldsresult.column_names # list[str]result.rows # list[list[int]] — all values as int64result.execution_time_us # floatresult.rows_scanned # intresult.error # str — empty if okresult.ok() # bool
# Iterate resultsfor row in result.rows: sym_id = row[result.column_names.index("symbol")] price = row[result.column_names.index("price")]zepto_py.connection — HTTP client
Section titled “zepto_py.connection — HTTP client”Connects to a running zepto_server on port 8123 (ClickHouse-compatible HTTP
API). The connection exposes SQL/dataframe helpers plus memory and cache
wrappers for the AI memory HTTP endpoints.
import zepto_py as zeptofrom zepto_py import ZeptoConnection
# Connectdb = zepto.connect("localhost", 8123)# or equivalently:db = ZeptoConnection(host="localhost", port=8123)
# Health checkdb.ping() # → True if server is up
# SQL → pandas DataFramedf = db.query_pandas("SELECT symbol, avg(price) FROM trades GROUP BY symbol")# Returns: pd.DataFrame
# SQL → polars DataFramedf = db.query_polars("SELECT symbol, avg(price) FROM trades GROUP BY symbol")# Returns: pl.DataFrame
# SQL → numpy dictarrays = db.query_numpy("SELECT price, volume FROM trades WHERE symbol = 1")# Returns: dict[str, np.ndarray]
# Ingest pandas → HTTPdb.ingest_pandas(df, symbol_col="symbol", price_col="price", volume_col="volume")
# Ingest pandas into a specific table (devlog 088)db.ingest_pandas(df, table_name="my_trades") # INSERT INTO my_trades ...
# Ingest polars → HTTPdb.ingest_polars(df_polars)db.ingest_polars(df_polars, table_name="my_trades")
# DDL convenience helpers (devlog 088) — wrappers over execute() / query()db.create_table( "my_trades", [("symbol", "INT64"), ("price", "INT64"), ("volume", "INT64"), ("timestamp", "INT64")], if_not_exists=True,)db.list_tables() # → ['my_trades', ...]db.drop_table("my_trades", if_exists=True)Agent Memory and Cache
Section titled “Agent Memory and Cache”The high-level HTTP client mirrors the /api/ai/* endpoints. Embeddings are
client-supplied lists of floats; ZeptoDB stores and ranks them but does not create
embeddings or call LLM providers.
db.memory.put(...)
Section titled “db.memory.put(...)”Stores or updates a memory object and returns its memory_id.
memory_id = db.memory.put( content, embedding=[1.0, 0.0], memory_id="", namespace="default", tenant_id="", user_id="", session_id="", agent_id="", type="memory", metadata_json="{}", token_count=0, importance=0.0, expires_at_ns=0, pinned=False,)db.memory.search(...)
Section titled “db.memory.search(...)”Returns the matches array from /api/ai/memories/search.
matches = db.memory.search( query_embedding=[1.0, 0.0], namespace="default", tenant_id="", user_id="", session_id="", agent_id="", type="", limit=10,)db.memory.get_context(...)
Section titled “db.memory.get_context(...)”Returns ranked memories plus token_count under the requested budget.
context = db.memory.get_context( query_embedding=[1.0, 0.0], token_budget=256, namespace="default", tenant_id="", user_id="", session_id="", agent_id="", type="", limit=10,)db.cache.store(...)
Section titled “db.cache.store(...)”Stores a prompt/response cache entry and returns its cache_id.
cache_id = db.cache.store( prompt, response, embedding=[1.0, 0.0], cache_id="", namespace="default", tenant_id="", metadata_json="{}", token_count=0, expires_at_ns=0,)db.cache.lookup(...)
Section titled “db.cache.lookup(...)”Performs exact prompt lookup first, then semantic lookup using the supplied embedding and threshold.
hit = db.cache.lookup( prompt, embedding=[1.0, 0.0], namespace="default", tenant_id="", semantic_threshold=0.92,)Agent examples
Section titled “Agent examples”Runnable examples live in examples/agent_memory/:
provider_cache.py— application-level exact/semantic cache before a mock provider call.langgraph_memory.py— LangGraph-style retrieve/call/remember node functions without requiring LangGraph at runtime.production_agent_demo.py— production-shaped turn flow that retrieves context, checks the cache, calls a provider on miss, stores the result, and records AgentOps telemetry.agent_attached_timeseries_demo.py— five vertical scenarios that seed live time-series tables and relatedMemoryRecordcontext for finance, IoT, observability, robotics, and game/live-ops agents.agentops_schema.py— canonicalagent_runs,retrieval_events,cache_events,llm_calls, andtool_callstable definitions.
The examples use deterministic client-side embeddings and mock model/provider functions, so they can run without API keys. Replace the embedding and provider functions with your production model stack.
Optional adapters in examples/agent_memory/adapters.py connect the same flow to
installed provider/framework SDKs without adding hard dependencies:
OpenAIResponsesAdapter(model=...)AnthropicMessagesAdapter(model=...)build_langgraph_app(db, model, embed)
ZeptoConnection DDL helpers (devlog 088)
Section titled “ZeptoConnection DDL helpers (devlog 088)”| Method | Signature | Purpose |
|---|---|---|
create_table | create_table(name, columns, if_not_exists=False) | columns is a list[tuple[str, str]] of (col_name, sql_type); issues CREATE TABLE [IF NOT EXISTS] name (...) |
drop_table | drop_table(name, if_exists=False) | Issues DROP TABLE [IF EXISTS] name |
list_tables | list_tables() -> list[str] | Runs SHOW TABLES and returns the first column of every row |
ingest_pandas | ingest_pandas(df, ..., table_name="ticks") | table_name kwarg (default "ticks" for backward compat) selects the destination table for the generated INSERT statements |
ingest_polars | ingest_polars(df, ..., table_name="ticks") | Passes table_name through to ingest_pandas |
Identifier validation (devlog 089). All caller-supplied identifiers
are validated before interpolation into SQL — invalid names raise
ValueError and never hit the wire.
| Arg | Regex | Methods that validate it |
|---|---|---|
table / column name | ^[A-Za-z_][A-Za-z0-9_]*$ | create_table, drop_table, ingest_pandas, ingest_polars |
column type string | ^[A-Za-z0-9_]+$ | create_table |
db.create_table("ticks; DROP TABLE x; --", [("a", "INT64")])# ValueError: Invalid SQL identifier: "ticks; DROP TABLE x; --"
db.create_table("safe", [("col; DROP", "INT64")])# ValueError: Invalid SQL identifier: "col; DROP"ingest_pandas also SQL-escapes single quotes inside string values
(standard '' doubling), so DataFrame rows containing it's are
inserted correctly.
zepto_py.dataframe — bulk ingest/export
Section titled “zepto_py.dataframe — bulk ingest/export”Standalone converters. Requires a live zeptodb.Pipeline (C++ binding) object — no HTTP.
from zepto_py import ( from_pandas, from_polars, from_arrow, to_pandas, to_polars, query_to_pandas, query_to_polars,)Ingest
Section titled “Ingest”from_pandas
Section titled “from_pandas”Vectorized — uses df[col].to_numpy(copy=False) then single ingest_batch() call. No Python row loop.
from_pandas( df, # pd.DataFrame pipeline, # zeptodb.Pipeline symbol_col="symbol", # column name for symbol id price_col="price", # column name for price volume_col="volume", # column name for volume timestamp_col="timestamp", # optional; omit to use now_ns() price_scale=1.0, # float → int64: value * price_scale)from_polars
Section titled “from_polars”Zero-copy via Arrow buffer — Series.to_numpy() returns Arrow buffer directly for numeric columns without nulls.
from_polars( df, # pl.DataFrame pipeline, symbol_col="symbol", price_col="price", volume_col="volume", price_scale=1.0,)from_arrow
Section titled “from_arrow”from_arrow( table, # pa.Table pipeline, symbol_col="symbol", price_col="price", volume_col="volume",)Export
Section titled “Export”# All columns for a symbol → pandasdf = to_pandas(pipeline, symbol=1)# Returns: pd.DataFrame with columns: timestamp, price, volume, ...
# All columns for a symbol → polarsdf = to_polars(pipeline, symbol=1)
# SQL → pandasdf = query_to_pandas(pipeline, "SELECT avg(price), sum(volume) FROM trades WHERE symbol = 1")
# SQL → polarsdf = query_to_polars(pipeline, "SELECT symbol, sum(volume) FROM trades GROUP BY symbol")Performance (1M rows)
Section titled “Performance (1M rows)”| Method | Throughput |
|---|---|
from_polars() | ~3.3M rows/sec |
from_pandas() | ~2M rows/sec |
from_arrow() | ~3M rows/sec |
zepto_py.arrow — Arrow / DuckDB interop
Section titled “zepto_py.arrow — Arrow / DuckDB interop”Zero-copy Arrow table exchange and DuckDB registration.
from zepto_py import ArrowSessionimport pyarrow as pa
sess = ArrowSession(pipeline)Ingest
Section titled “Ingest”# Arrow Table → ZeptoDBsess.ingest_arrow( table, # pa.Table symbol_col="symbol", price_col="price", volume_col="volume",)
# Arrow RecordBatch → ZeptoDBsess.ingest_record_batch(batch) # pa.RecordBatch
# Per-column Arrow arrays → ZeptoDB (raw columnar)sess.ingest_arrow_columnar( symbols=sym_array, # pa.Array[int64] prices=px_array, # pa.Array[int64] volumes=vol_array, # pa.Array[int64])Export
Section titled “Export”# ZeptoDB → Arrow Table (zero-copy)table = sess.to_arrow(symbol=1)# Returns: pa.Table with schema: timestamp: int64, price: int64, volume: int64
# ZeptoDB → Arrow RecordBatchReader (streaming/lazy)reader = sess.to_record_batch_reader(symbol=1)for batch in reader: print(batch.num_rows)
# ZeptoDB → polars (via Arrow buffer — true zero-copy)df = sess.to_polars_zero_copy(symbol=1)# Returns: pl.DataFrame sharing Arrow buffer — no copy
# ZeptoDB → DuckDB in-memory tableconn = sess.to_duckdb(symbol=1, table_name="trades")result = conn.execute("SELECT avg(price) FROM trades").fetchdf()Utilities
Section titled “Utilities”# Schemaarrow_schema = sess.get_schema() # → pa.Schema
from zepto_py.arrow import zeptodb_schema_to_arrowschema = zepto_schema_to_arrow(["timestamp", "price", "volume"])zepto_py.streaming — high-throughput ingest
Section titled “zepto_py.streaming — high-throughput ingest”Batch ingest from pandas/polars/generators with progress and error handling.
from zepto_py import StreamingSession
sess = StreamingSession( pipeline, batch_size=50_000, # rows per C++ ingest_batch() call error_mode="skip", # see error modes below)ingest_pandas / ingest_polars
Section titled “ingest_pandas / ingest_polars”sess.ingest_pandas( df, show_progress=True, # print throughput stats at end symbol_col="symbol", price_col="price", volume_col="volume",)# → Ingested 1,000,000 rows in 1.82s (549,451 rows/sec)
sess.ingest_polars( df_polars, show_progress=True,)ingest_iter (generator / iterator)
Section titled “ingest_iter (generator / iterator)”def tick_generator(): for row in external_feed: yield {"symbol": row.sym, "price": row.px, "volume": row.vol}
sess.ingest_iter( tick_generator(), show_progress=True, total=1_000_000, # optional: total for progress display)Statistics
Section titled “Statistics”stats = sess.stats()stats.rows_ingested # intstats.rows_skipped # intstats.batches # intstats.elapsed_sec # floatstats.throughput # float (rows/sec)Error modes
Section titled “Error modes”| Mode | Behavior |
|---|---|
"skip" | Skip bad rows, continue ingesting |
"raise" | Raise exception on first error |
"warn" | Print warning to stderr, continue |
Interoperability Matrix
Section titled “Interoperability Matrix”| From \ To | pandas | polars | numpy | Arrow | DuckDB | HTTP |
|---|---|---|---|---|---|---|
| ZeptoDB (in-proc) | to_pandas() | to_polars() | get_column() | to_arrow() | to_duckdb() | — |
| ZeptoDB (HTTP) | query_pandas() | query_polars() | query_numpy() | — | — | POST / |
| pandas → APEX | from_pandas() | — | — | — | — | ingest_pandas() |
| polars → APEX | — | from_polars() | — | — | — | ingest_polars() |
| Arrow → APEX | — | — | — | ingest_arrow() | — | — |
See also: SQL Reference · C++ Reference · HTTP Reference