The streaming engine that starts embedded

Sub-microsecond SQL over event streams. Embed it in Rust or Python, or run the standalone server. No JVM.

$ cargo add laminar-db copy
$ pip install laminardb copy
main.rs
let db = LaminarDB::open()?;

db.execute("CREATE SOURCE trades (
    symbol VARCHAR, price DOUBLE, volume BIGINT, ts BIGINT
)").await?;

db.execute("CREATE STREAM vwap AS
    SELECT symbol,
           SUM(price * CAST(volume AS DOUBLE))
             / SUM(CAST(volume AS DOUBLE)) AS vwap
    FROM trades
    GROUP BY symbol, tumble(ts, INTERVAL '1' MINUTE)
    EMIT ON WINDOW CLOSE
").await?;

db.start().await?;
<500ns
state lookup target
1M+/s
events per core
0
hot-path allocations

What it is

LaminarDB runs SQL queries over event streams. Windows, joins, aggregations: the engine evaluates them continuously as events arrive. Built on Apache Arrow 57 and DataFusion 52.

Deployment. Embed it in Rust (cargo add laminar-db) or Python (pip install laminardb). Or run the standalone laminardb binary with Kafka, CDC, and Delta Lake connectors.

It is not a message broker and it is not a general-purpose database. It processes streams.

Compute Thread dedicated tokio runtime · zero alloc on data path
StreamingCoordinator Operators State Store DataFusion Exec
crossfire MPSC channels
I/O Runtime main tokio runtime · async
Source Tasks Sink Tasks Checkpoint Coordinator Recovery
Bounded channels
Control Plane no latency requirements
REST API WebSocket Metrics Config

Quick start

Add the dependency and write your first streaming query.

// Cargo.toml: laminar-db = "0.23", tokio = { version = "1", features = ["full"] }

use laminar_db::LaminarDB;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let db = LaminarDB::open()?;

    db.execute("CREATE SOURCE events (
        key VARCHAR, value DOUBLE, ts BIGINT
    )").await?;

    db.execute("CREATE STREAM summary AS
        SELECT key, COUNT(*) AS total, AVG(value) AS avg
        FROM events
        GROUP BY key, tumble(ts, INTERVAL '1' MINUTE)
        EMIT ON WINDOW CLOSE
    ").await?;

    db.start().await?;

    // Push events into the source (typed via derive macro)
    let source = db.source::<Event>("events")?;
    source.push(Event { key: "sensor-1".into(), value: 42.0, ts: 1700000000000 });

    // Subscribe to streaming results
    let sub = db.subscribe::<Summary>("summary")?;
    while let Some(rows) = sub.poll() {
        for row in &rows {
            println!("{}: count={}, avg={:.2}", row.key, row.total, row.avg);
        }
    }

    Ok(())
}
# pip install laminardb

import laminardb

conn = laminardb.open(":memory:")

conn.execute("""
    CREATE SOURCE events (key VARCHAR, value DOUBLE, ts TIMESTAMP)
""")

conn.execute("""
    CREATE STREAM summary AS
    SELECT key, COUNT(*) AS total, AVG(value) AS avg
    FROM events
    GROUP BY key, TUMBLE(ts, INTERVAL '1' MINUTE)
    EMIT ON WINDOW CLOSE
""")

# Push events in, then query results back
conn.insert("events", [
    {"key": "sensor-1", "value": 42.0, "ts": 1700000000000},
    {"key": "sensor-1", "value": 44.0, "ts": 1700000001000},
])

conn.sql("SELECT * FROM summary").show()
conn.close()
-- Tumbling window: 1-minute OHLC bars
CREATE STREAM ohlc_1m AS
SELECT symbol,
       first_value(price) AS open,
       MAX(price) AS high, MIN(price) AS low,
       last_value(price) AS close,
       SUM(volume) AS volume
FROM trades
GROUP BY symbol, tumble(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE;

-- Session window: detect activity bursts
CREATE STREAM sessions AS
SELECT user_id, COUNT(*) AS clicks,
       MAX(ts) - MIN(ts) AS duration_ms
FROM clickstream
GROUP BY user_id, session(ts, INTERVAL '30' SECOND)
EMIT ON WINDOW CLOSE;

-- ASOF join: enrich trades with closest preceding quote
SELECT t.symbol, t.price, q.bid, q.ask,
       t.price - q.bid AS spread
FROM trades t
ASOF JOIN quotes q
    MATCH_CONDITION(t.ts >= q.ts)
    ON t.symbol = q.symbol;

-- Interval join: match orders to payments within 1 hour
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p ON o.order_id = p.order_id
    AND p.ts BETWEEN o.ts AND o.ts + 3600000;

-- Temporal join: point-in-time lookup
SELECT o.*, r.rate
FROM orders o
JOIN rates FOR SYSTEM_TIME AS OF o.order_time AS r
    ON o.currency = r.currency;
# Option 1: Download pre-compiled binary (latest release)
VERSION=$(curl -s https://api.github.com/repos/laminardb/laminardb/releases/latest | grep tag_name | cut -d '"' -f4)

# Linux x86_64
curl -LO "https://github.com/laminardb/laminardb/releases/download/${VERSION}/laminardb-server-x86_64-unknown-linux-gnu-${VERSION}.tar.gz"
tar xzf laminardb-server-*.tar.gz

# macOS Apple Silicon
curl -LO "https://github.com/laminardb/laminardb/releases/download/${VERSION}/laminardb-server-aarch64-apple-darwin-${VERSION}.tar.gz"

# All platforms: github.com/laminardb/laminardb/releases

# Option 2: Build from source
cargo install laminar-server

# Option 3: Docker (multi-arch, on Docker Hub and GHCR)
docker run -p 8080:8080 laminardb/laminardb-server:latest
# or: docker run -p 8080:8080 ghcr.io/laminardb/laminardb-server:latest
# full stack (server + Redpanda + Prometheus + Grafana): docker compose up

# Run the binary
./laminardb --config laminardb.toml
-- Kafka source with Avro and Schema Registry
CREATE SOURCE trades (
    symbol VARCHAR, price DOUBLE, volume BIGINT, ts BIGINT
) FROM KAFKA (
    brokers = 'localhost:9092',
    topic = 'market-trades',
    group_id = 'laminar-analytics',
    format = 'avro',
    schema.registry.url = 'http://schema-registry:8081'
);

-- Real-time aggregation
CREATE STREAM trade_summary AS
SELECT symbol, COUNT(*) AS trades,
       AVG(price) AS avg_price,
       SUM(volume) AS total_volume
FROM trades
GROUP BY symbol, tumble(ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE;

-- Kafka sink with exactly-once delivery
CREATE SINK output INTO KAFKA (
    brokers = 'localhost:9092',
    topic = 'trade-summaries',
    format = 'json',
    delivery.guarantee = 'exactly-once'
) AS SELECT * FROM trade_summary;

-- Delta Lake sink for historical analytics
CREATE SINK lake INTO DELTA_LAKE (
    path = 's3://my-bucket/trade_summary',
    write_mode = 'append',
    delivery.guarantee = 'exactly-once'
) AS SELECT * FROM trade_summary;

More examples: examples/ · API reference · SQL reference

Connectors

All connectors are feature-gated. Compile only what you use. Enable via cargo add laminar-db --features kafka,delta-lake.

Connector Direction Status Feature Flag Notes
Kafka Source + Sink Implemented kafka Avro, JSON, CSV. Confluent Schema Registry. Exactly-once via 2PC.
PostgreSQL CDC Source Implemented postgres-cdc Logical replication via pgoutput. LSN-based checkpoint tracking.
PostgreSQL Sink Sink Implemented postgres-sink COPY BINARY (append) or INSERT ON CONFLICT (upsert).
MySQL CDC Source Implemented mysql-cdc Binlog replication. GTID or file+position modes.
MongoDB CDC Source + Sink Implemented mongodb-cdc Change streams, resume tokens. Sink: ordered/unordered writes, upsert, CDC replay.
Delta Lake Source + Sink Implemented delta-lake S3, Azure, GCS, Unity, Glue catalogs. Exactly-once epoch tracking.
Apache Iceberg Source + Sink Implemented iceberg REST catalog. Incremental reads via snapshot manifests.
OpenTelemetry Source Implemented otel OTLP/gRPC receiver. Traces, metrics, logs with schema auto-discovery.
WebSocket Source + Sink Implemented websocket Client and server modes. JSON/CSV serialization.
File Auto-Loader Source + Sink Implemented files CSV, JSON Lines, Parquet, plain text. Schema inference and evolution.
Parquet Lookup Source Implemented parquet-lookup Object-store-backed dimension tables for lookup joins.

Custom connectors: implement the SourceConnector / SinkConnector traits.

AI Functions

Call AI models inline in streaming SQL. Each function names a model resolved from a registry; whether it runs against a remote LLM over HTTP or a local ONNX encoder in-process is hidden behind the function.

sentiment.sql
-- Score headlines as they arrive, in one continuous query
CREATE STREAM flagged AS
SELECT id, headline,
       ai_sentiment(headline, model => 'finbert') AS sentiment
FROM news;

-- Zero/few-shot classification with explicit labels
SELECT id,
       ai_classify(body, model => 'intent-mini',
                   labels => ARRAY['question', 'complaint', 'praise']) AS intent
FROM tickets;

-- Embedding vectors for downstream similarity
SELECT id, ai_embed(chunk, model => 'minilm') AS vector
FROM documents;

Functions

Function Task Backend
ai_classify Zero/few-shot classification (labels => ARRAY[…]) Remote or local
ai_sentiment Sentiment Remote or local
ai_embed Embedding vector Remote or local
ai_complete Text completion Remote only
ai_summarize Summarization Remote only
ai_translate Translation Remote only
ai_gen Open-ended generation Remote only
ai_extract Structured extraction Remote only

Local models are encoder-only (BERT / DistilBERT / MiniLM family) and cover classify, sentiment, and embed. Generative tasks require a remote provider.

Off the hot path

The streaming pipeline never blocks on a model call. The operator serves cache hits inline and hands misses to a background worker; enriched rows emit in a later cycle.

In-flight rows survive checkpoint and restore, and the watermark is held so async-enriched rows are not dropped as late.

Cached results

Inference results are cached by (content, model, params), so repeated inputs are served without another model call.

The cache is byte-bounded.

Dual backend, per call

The backend is chosen per call by model name from config: a remote LLM over HTTP (OpenAI-compatible, Anthropic) or a local ONNX encoder.

Local inference uses ONNX Runtime, loaded dynamically at runtime — install ONNX Runtime ≥ 1.24 and set ORT_DYLIB_PATH or put it on the search path.

Configuration

Models and providers are declared under [ai.providers.*], [models.*], and [ai.defaults]. API keys are referenced by environment-variable name — never stored in config. The catalog views laminar.models and laminar.ai_calls are SELECT-able.

Deployment

Embedded

Rust: cargo add laminar-db. Python: pip install laminardb. The engine runs in your process. Push events, subscribe to results.

Suited to edge processing, embedded analytics, and latency-sensitive apps.

Standalone Server

Download pre-compiled binaries for Linux (x86_64, aarch64, musl), macOS (x86_64, Apple Silicon), and Windows. Or cargo install laminar-server, or pull the Docker image (docker run -p 8080:8080 laminardb/laminardb-server). Helm chart available.

TOML configuration with ${ENV_VAR} substitution. REST API, Prometheus metrics, ad-hoc SQL, and hot-reload of source, sink, and pipeline sections.

Typical uses: CDC pipelines, stream-to-lakehouse, event routing.

Why LaminarDB

One engine for streaming SQL — from an embedded library to a standalone server.

Microsecond latency

A dedicated compute thread with zero allocations on the data path. State lookups land in 10–16ns, and a single core sustains over a million events per second.

Starts embedded

cargo add laminar-db or pip install laminardb and the engine runs in your process. No JVM, no broker, nothing to operate. Move to the standalone server when you need it.

Real streaming SQL

Tumbling, sliding, hopping, and session windows; ASOF, temporal, interval, and lookup joins; EMIT control and watermarks — built on Apache Arrow and DataFusion.

Exactly-once

Barrier checkpoints with two-phase commit. State and source offsets persist to object storage, and recovery restores both together.

Connects to everything

Kafka, PostgreSQL / MySQL / MongoDB CDC, Delta Lake, Iceberg, OpenTelemetry, WebSocket, and files. Every connector is feature-gated, so you compile only what you use.

AI in the query

Call models inline in streaming SQL with ai_classify, ai_sentiment, and ai_embed — backed by a remote LLM or a local ONNX encoder, off the hot path.

Benchmarks

Criterion numbers from an AMD Ryzen AI 7 350 laptop. Not server-grade hardware: dedicated boxes with isolated cores will do better. Reproduce with cargo bench.

Metric Target Measured (2026-02-28 snapshot) Benchmark File
State lookup (AHash get_ref) <500ns 10–16ns (zero-copy) streaming_bench.rs
Throughput / core 500K events/s 1.1–1.46M events/s streaming_bench.rs
Event latency (mean) <10µs 0.55–1.16µs latency_bench.rs
Checkpoint recovery <10s 1.39ms recovery_bench.rs

Reproduce with cargo bench --bench streaming_bench or cargo bench --bench latency_bench. All benches are driven by Criterion.

What works today

Current version: 0.23.0 (pre-1.0). Releases

Engine & SQL

  • Streaming engine (StreamingCoordinator, operator graph, state stores)
  • Window functions (tumbling, sliding/hopping, session)
  • Joins (interval, ASOF, temporal, temporal probe, lookup, semi/anti)
  • Materialized view result storage and queryability
  • AI functions in streaming SQL (remote LLM / local ONNX)
  • Barrier-based checkpointing with 2PC exactly-once

Connectivity & operations

  • Connectors: Kafka, PG/MySQL/MongoDB CDC, Delta Lake, Iceberg, OTel, WebSocket, Files
  • Object store checkpoints (file, S3, Azure, GCS)
  • Standalone server with REST API, Prometheus metrics, hot-reload
  • SUBSCRIBE over the PostgreSQL wire protocol (psql, psycopg, JDBC)
  • Push-based WebSocket stream subscriptions
  • Docker (multi-arch) and Helm chart deployment

Frequently asked questions

What is LaminarDB?

LaminarDB is an open-source streaming SQL engine written in Rust. It runs continuous SQL queries over event streams using Apache Arrow and DataFusion. It can be embedded in Rust or Python applications or run as a standalone server.

How fast is LaminarDB?

LaminarDB targets sub-microsecond state lookups (under 500 nanoseconds) and processes over 500,000 events per core per second. Criterion benchmarks on an AMD Ryzen AI 7 350 laptop measured state lookups in 10 to 16 nanoseconds and throughput of 1.1 to 1.46 million events per second per core.

What connectors does LaminarDB support?

LaminarDB ships with sources and sinks for Apache Kafka, PostgreSQL CDC, MySQL CDC, MongoDB CDC, Delta Lake, Apache Iceberg, OpenTelemetry OTLP, WebSocket, and rolling file formats (CSV, JSON, Parquet). All connectors are feature-gated so you only compile what you use.

Is LaminarDB production-ready?

LaminarDB is pre-1.0. Single-node embedded and standalone server deployments are stable, including exactly-once sinks via barrier checkpoints.

Can I use LaminarDB from Python?

Yes. Install the Python package with pip install laminardb. It exposes the same streaming SQL engine as the Rust crate, with a Python-native API for pushing events and subscribing to results.

What SQL features does LaminarDB support?

LaminarDB supports standard SQL through DataFusion plus streaming extensions: tumbling, sliding, hopping, and session windows; ASOF, temporal, temporal probe, interval, and lookup joins; EMIT clauses (ON WATERMARK, ON WINDOW CLOSE, PERIODICALLY, CHANGES, FINAL); watermarks with ALLOW LATENESS; and late data routing via LATE DATA TO.

How does LaminarDB handle exactly-once delivery?

LaminarDB uses Chandy-Lamport style barrier checkpoints with two-phase commit. Source offsets and sink commit status are stored in a checkpoint manifest. On recovery, the engine restores operator state and rewinds sources to the last committed checkpoint.

What is an ASOF join in LaminarDB?

An ASOF join matches each row in the left stream to the single closest row in the right stream by timestamp within the same key partition. LaminarDB supports three directions: Backward (most recent prior event), Forward (next event), and Nearest (minimum absolute time difference).

What is a temporal probe join?

A temporal probe join produces multiple output rows for each left event by probing the right stream at a list of fixed time offsets. Offsets are specified with RANGE FROM x TO y STEP z or LIST of explicit millisecond values. It is designed for impact-curve analysis and lagged feature lookups.

Does LaminarDB require a JVM?

No. LaminarDB is written in Rust and ships as a native binary. It has no JVM or other runtime dependencies beyond the operating system.

Is LaminarDB open source?

Yes. LaminarDB is licensed under Apache 2.0 and the source is hosted at github.com/laminardb/laminardb.

Contributing

LaminarDB is Apache 2.0 licensed and contributions are welcome. You will need Rust 1.85 or newer. Read the contribution guide before starting: the data path has strict constraints on allocations, locks, and syscalls.