The streaming engine that starts embedded
Sub-microsecond SQL over event streams. Embed it in Rust or Python, or run the standalone server. No JVM.
let db = LaminarDB::open()?;
db.execute("CREATE SOURCE trades (
symbol VARCHAR, price DOUBLE, volume BIGINT, ts BIGINT
)").await?;
db.execute("CREATE STREAM vwap AS
SELECT symbol,
SUM(price * CAST(volume AS DOUBLE))
/ SUM(CAST(volume AS DOUBLE)) AS vwap
FROM trades
GROUP BY symbol, tumble(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
").await?;
db.start().await?;
What it is
LaminarDB runs SQL queries over event streams. Windows, joins, aggregations: the engine evaluates them continuously as events arrive. Built on Apache Arrow 57 and DataFusion 52.
Deployment.
Embed it in Rust (cargo add laminar-db) or Python (pip install laminardb).
Or run the standalone laminardb binary with Kafka, CDC, and Delta Lake connectors.
It is not a message broker and it is not a general-purpose database. It processes streams.
Quick start
Add the dependency and write your first streaming query.
// Cargo.toml: laminar-db = "0.23", tokio = { version = "1", features = ["full"] }
use laminar_db::LaminarDB;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db = LaminarDB::open()?;
db.execute("CREATE SOURCE events (
key VARCHAR, value DOUBLE, ts BIGINT
)").await?;
db.execute("CREATE STREAM summary AS
SELECT key, COUNT(*) AS total, AVG(value) AS avg
FROM events
GROUP BY key, tumble(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
").await?;
db.start().await?;
// Push events into the source (typed via derive macro)
let source = db.source::<Event>("events")?;
source.push(Event { key: "sensor-1".into(), value: 42.0, ts: 1700000000000 });
// Subscribe to streaming results
let sub = db.subscribe::<Summary>("summary")?;
while let Some(rows) = sub.poll() {
for row in &rows {
println!("{}: count={}, avg={:.2}", row.key, row.total, row.avg);
}
}
Ok(())
}
# pip install laminardb
import laminardb
conn = laminardb.open(":memory:")
conn.execute("""
CREATE SOURCE events (key VARCHAR, value DOUBLE, ts TIMESTAMP)
""")
conn.execute("""
CREATE STREAM summary AS
SELECT key, COUNT(*) AS total, AVG(value) AS avg
FROM events
GROUP BY key, TUMBLE(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
""")
# Push events in, then query results back
conn.insert("events", [
{"key": "sensor-1", "value": 42.0, "ts": 1700000000000},
{"key": "sensor-1", "value": 44.0, "ts": 1700000001000},
])
conn.sql("SELECT * FROM summary").show()
conn.close()
-- Tumbling window: 1-minute OHLC bars
CREATE STREAM ohlc_1m AS
SELECT symbol,
first_value(price) AS open,
MAX(price) AS high, MIN(price) AS low,
last_value(price) AS close,
SUM(volume) AS volume
FROM trades
GROUP BY symbol, tumble(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE;
-- Session window: detect activity bursts
CREATE STREAM sessions AS
SELECT user_id, COUNT(*) AS clicks,
MAX(ts) - MIN(ts) AS duration_ms
FROM clickstream
GROUP BY user_id, session(ts, INTERVAL '30' SECOND)
EMIT ON WINDOW CLOSE;
-- ASOF join: enrich trades with closest preceding quote
SELECT t.symbol, t.price, q.bid, q.ask,
t.price - q.bid AS spread
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts)
ON t.symbol = q.symbol;
-- Interval join: match orders to payments within 1 hour
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
-- Temporal join: point-in-time lookup
SELECT o.*, r.rate
FROM orders o
JOIN rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
# Option 1: Download pre-compiled binary (latest release)
VERSION=$(curl -s https://api.github.com/repos/laminardb/laminardb/releases/latest | grep tag_name | cut -d '"' -f4)
# Linux x86_64
curl -LO "https://github.com/laminardb/laminardb/releases/download/${VERSION}/laminardb-server-x86_64-unknown-linux-gnu-${VERSION}.tar.gz"
tar xzf laminardb-server-*.tar.gz
# macOS Apple Silicon
curl -LO "https://github.com/laminardb/laminardb/releases/download/${VERSION}/laminardb-server-aarch64-apple-darwin-${VERSION}.tar.gz"
# All platforms: github.com/laminardb/laminardb/releases
# Option 2: Build from source
cargo install laminar-server
# Option 3: Docker (multi-arch, on Docker Hub and GHCR)
docker run -p 8080:8080 laminardb/laminardb-server:latest
# or: docker run -p 8080:8080 ghcr.io/laminardb/laminardb-server:latest
# full stack (server + Redpanda + Prometheus + Grafana): docker compose up
# Run the binary
./laminardb --config laminardb.toml
-- Kafka source with Avro and Schema Registry
CREATE SOURCE trades (
symbol VARCHAR, price DOUBLE, volume BIGINT, ts BIGINT
) FROM KAFKA (
brokers = 'localhost:9092',
topic = 'market-trades',
group_id = 'laminar-analytics',
format = 'avro',
schema.registry.url = 'http://schema-registry:8081'
);
-- Real-time aggregation
CREATE STREAM trade_summary AS
SELECT symbol, COUNT(*) AS trades,
AVG(price) AS avg_price,
SUM(volume) AS total_volume
FROM trades
GROUP BY symbol, tumble(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE;
-- Kafka sink with exactly-once delivery
CREATE SINK output INTO KAFKA (
brokers = 'localhost:9092',
topic = 'trade-summaries',
format = 'json',
delivery.guarantee = 'exactly-once'
) AS SELECT * FROM trade_summary;
-- Delta Lake sink for historical analytics
CREATE SINK lake INTO DELTA_LAKE (
path = 's3://my-bucket/trade_summary',
write_mode = 'append',
delivery.guarantee = 'exactly-once'
) AS SELECT * FROM trade_summary;
More examples: examples/ · API reference · SQL reference
Connectors
All connectors are feature-gated. Compile only what you use.
Enable via cargo add laminar-db --features kafka,delta-lake.
| Connector | Direction | Status | Feature Flag | Notes |
|---|---|---|---|---|
| Kafka | Source + Sink | Implemented | kafka |
Avro, JSON, CSV. Confluent Schema Registry. Exactly-once via 2PC. |
| PostgreSQL CDC | Source | Implemented | postgres-cdc |
Logical replication via pgoutput. LSN-based checkpoint tracking. |
| PostgreSQL Sink | Sink | Implemented | postgres-sink |
COPY BINARY (append) or INSERT ON CONFLICT (upsert). |
| MySQL CDC | Source | Implemented | mysql-cdc |
Binlog replication. GTID or file+position modes. |
| MongoDB CDC | Source + Sink | Implemented | mongodb-cdc |
Change streams, resume tokens. Sink: ordered/unordered writes, upsert, CDC replay. |
| Delta Lake | Source + Sink | Implemented | delta-lake |
S3, Azure, GCS, Unity, Glue catalogs. Exactly-once epoch tracking. |
| Apache Iceberg | Source + Sink | Implemented | iceberg |
REST catalog. Incremental reads via snapshot manifests. |
| OpenTelemetry | Source | Implemented | otel |
OTLP/gRPC receiver. Traces, metrics, logs with schema auto-discovery. |
| WebSocket | Source + Sink | Implemented | websocket |
Client and server modes. JSON/CSV serialization. |
| File Auto-Loader | Source + Sink | Implemented | files |
CSV, JSON Lines, Parquet, plain text. Schema inference and evolution. |
| Parquet Lookup | Source | Implemented | parquet-lookup |
Object-store-backed dimension tables for lookup joins. |
Custom connectors: implement the SourceConnector / SinkConnector traits.
AI Functions
Call AI models inline in streaming SQL. Each function names a model resolved from a registry; whether it runs against a remote LLM over HTTP or a local ONNX encoder in-process is hidden behind the function.
-- Score headlines as they arrive, in one continuous query
CREATE STREAM flagged AS
SELECT id, headline,
ai_sentiment(headline, model => 'finbert') AS sentiment
FROM news;
-- Zero/few-shot classification with explicit labels
SELECT id,
ai_classify(body, model => 'intent-mini',
labels => ARRAY['question', 'complaint', 'praise']) AS intent
FROM tickets;
-- Embedding vectors for downstream similarity
SELECT id, ai_embed(chunk, model => 'minilm') AS vector
FROM documents;
Functions
| Function | Task | Backend |
|---|---|---|
| ai_classify | Zero/few-shot classification (labels => ARRAY[…]) |
Remote or local |
| ai_sentiment | Sentiment | Remote or local |
| ai_embed | Embedding vector | Remote or local |
| ai_complete | Text completion | Remote only |
| ai_summarize | Summarization | Remote only |
| ai_translate | Translation | Remote only |
| ai_gen | Open-ended generation | Remote only |
| ai_extract | Structured extraction | Remote only |
Local models are encoder-only (BERT / DistilBERT / MiniLM family) and cover classify, sentiment, and embed. Generative tasks require a remote provider.
Off the hot path
The streaming pipeline never blocks on a model call. The operator serves cache hits inline and hands misses to a background worker; enriched rows emit in a later cycle.
In-flight rows survive checkpoint and restore, and the watermark is held so async-enriched rows are not dropped as late.
Cached results
Inference results are cached by (content, model, params), so repeated
inputs are served without another model call.
The cache is byte-bounded.
Dual backend, per call
The backend is chosen per call by model name from config: a remote LLM over HTTP (OpenAI-compatible, Anthropic) or a local ONNX encoder.
Local inference uses ONNX Runtime, loaded dynamically at runtime — install
ONNX Runtime ≥ 1.24 and set ORT_DYLIB_PATH or put it on the
search path.
Configuration
Models and providers are declared under [ai.providers.*],
[models.*], and [ai.defaults]. API keys are referenced by
environment-variable name — never stored in config. The catalog views
laminar.models and laminar.ai_calls are SELECT-able.
Deployment
Embedded
Rust: cargo add laminar-db.
Python: pip install laminardb.
The engine runs in your process. Push events, subscribe to results.
Suited to edge processing, embedded analytics, and latency-sensitive apps.
Standalone Server
Download pre-compiled binaries
for Linux (x86_64, aarch64, musl), macOS (x86_64, Apple Silicon), and Windows.
Or cargo install laminar-server, or pull the
Docker image
(docker run -p 8080:8080 laminardb/laminardb-server). Helm chart available.
TOML configuration with ${ENV_VAR} substitution.
REST API, Prometheus metrics, ad-hoc SQL, and hot-reload of source, sink, and pipeline sections.
Typical uses: CDC pipelines, stream-to-lakehouse, event routing.
Why LaminarDB
One engine for streaming SQL — from an embedded library to a standalone server.
Microsecond latency
A dedicated compute thread with zero allocations on the data path. State lookups land in 10–16ns, and a single core sustains over a million events per second.
Starts embedded
cargo add laminar-db or pip install laminardb and the
engine runs in your process. No JVM, no broker, nothing to operate.
Move to the standalone server when you need it.
Real streaming SQL
Tumbling, sliding, hopping, and session windows; ASOF, temporal, interval,
and lookup joins; EMIT control and watermarks — built on Apache
Arrow and DataFusion.
Exactly-once
Barrier checkpoints with two-phase commit. State and source offsets persist to object storage, and recovery restores both together.
Connects to everything
Kafka, PostgreSQL / MySQL / MongoDB CDC, Delta Lake, Iceberg, OpenTelemetry, WebSocket, and files. Every connector is feature-gated, so you compile only what you use.
AI in the query
Call models inline in streaming SQL with ai_classify,
ai_sentiment, and ai_embed — backed by a remote LLM or a
local ONNX encoder, off the hot path.
Benchmarks
Criterion numbers from an AMD Ryzen AI 7 350 laptop. Not server-grade hardware:
dedicated boxes with isolated cores will do better. Reproduce with cargo bench.
| Metric | Target | Measured (2026-02-28 snapshot) | Benchmark File |
|---|---|---|---|
| State lookup (AHash get_ref) | <500ns | 10–16ns (zero-copy) | streaming_bench.rs |
| Throughput / core | 500K events/s | 1.1–1.46M events/s | streaming_bench.rs |
| Event latency (mean) | <10µs | 0.55–1.16µs | latency_bench.rs |
| Checkpoint recovery | <10s | 1.39ms | recovery_bench.rs |
Reproduce with cargo bench --bench streaming_bench or
cargo bench --bench latency_bench.
All benches are driven by Criterion.
What works today
Current version: 0.23.0 (pre-1.0). Releases
Engine & SQL
- Streaming engine (StreamingCoordinator, operator graph, state stores)
- Window functions (tumbling, sliding/hopping, session)
- Joins (interval, ASOF, temporal, temporal probe, lookup, semi/anti)
- Materialized view result storage and queryability
- AI functions in streaming SQL (remote LLM / local ONNX)
- Barrier-based checkpointing with 2PC exactly-once
Connectivity & operations
- Connectors: Kafka, PG/MySQL/MongoDB CDC, Delta Lake, Iceberg, OTel, WebSocket, Files
- Object store checkpoints (file, S3, Azure, GCS)
- Standalone server with REST API, Prometheus metrics, hot-reload
- SUBSCRIBE over the PostgreSQL wire protocol (psql, psycopg, JDBC)
- Push-based WebSocket stream subscriptions
- Docker (multi-arch) and Helm chart deployment
Frequently asked questions
What is LaminarDB?
LaminarDB is an open-source streaming SQL engine written in Rust. It runs continuous SQL queries over event streams using Apache Arrow and DataFusion. It can be embedded in Rust or Python applications or run as a standalone server.
How fast is LaminarDB?
LaminarDB targets sub-microsecond state lookups (under 500 nanoseconds) and processes over 500,000 events per core per second. Criterion benchmarks on an AMD Ryzen AI 7 350 laptop measured state lookups in 10 to 16 nanoseconds and throughput of 1.1 to 1.46 million events per second per core.
What connectors does LaminarDB support?
LaminarDB ships with sources and sinks for Apache Kafka, PostgreSQL CDC, MySQL CDC, MongoDB CDC, Delta Lake, Apache Iceberg, OpenTelemetry OTLP, WebSocket, and rolling file formats (CSV, JSON, Parquet). All connectors are feature-gated so you only compile what you use.
Is LaminarDB production-ready?
LaminarDB is pre-1.0. Single-node embedded and standalone server deployments are stable, including exactly-once sinks via barrier checkpoints.
Can I use LaminarDB from Python?
Yes. Install the Python package with pip install laminardb. It exposes the same streaming SQL engine as the Rust crate, with a Python-native API for pushing events and subscribing to results.
What SQL features does LaminarDB support?
LaminarDB supports standard SQL through DataFusion plus streaming extensions: tumbling, sliding, hopping, and session windows; ASOF, temporal, temporal probe, interval, and lookup joins; EMIT clauses (ON WATERMARK, ON WINDOW CLOSE, PERIODICALLY, CHANGES, FINAL); watermarks with ALLOW LATENESS; and late data routing via LATE DATA TO.
How does LaminarDB handle exactly-once delivery?
LaminarDB uses Chandy-Lamport style barrier checkpoints with two-phase commit. Source offsets and sink commit status are stored in a checkpoint manifest. On recovery, the engine restores operator state and rewinds sources to the last committed checkpoint.
What is an ASOF join in LaminarDB?
An ASOF join matches each row in the left stream to the single closest row in the right stream by timestamp within the same key partition. LaminarDB supports three directions: Backward (most recent prior event), Forward (next event), and Nearest (minimum absolute time difference).
What is a temporal probe join?
A temporal probe join produces multiple output rows for each left event by probing the right stream at a list of fixed time offsets. Offsets are specified with RANGE FROM x TO y STEP z or LIST of explicit millisecond values. It is designed for impact-curve analysis and lagged feature lookups.
Does LaminarDB require a JVM?
No. LaminarDB is written in Rust and ships as a native binary. It has no JVM or other runtime dependencies beyond the operating system.
Is LaminarDB open source?
Yes. LaminarDB is licensed under Apache 2.0 and the source is hosted at github.com/laminardb/laminardb.
Contributing
LaminarDB is Apache 2.0 licensed and contributions are welcome. You will need Rust 1.85 or newer. Read the contribution guide before starting: the data path has strict constraints on allocations, locks, and syscalls.