Standalone Server & Configuration Reference

The laminardb binary reads a TOML configuration file, constructs streaming pipelines, and serves a REST API with Prometheus metrics. Every field on this page is verified against config.rs. Last updated: 25 May 2026.

Install

Install & run
# Pre-built binary (Linux x86_64) — resolve the latest release tag
VERSION=$(curl -s https://api.github.com/repos/laminardb/laminardb/releases/latest | grep tag_name | cut -d '"' -f4)
curl -LO "https://github.com/laminardb/laminardb/releases/download/${VERSION}/laminardb-server-x86_64-unknown-linux-gnu-${VERSION}.tar.gz"
tar xzf laminardb-server-*.tar.gz

# Or build from source
cargo install laminar-server

# Or Docker (multi-arch — Docker Hub or GHCR)
docker run -d -p 8080:8080 laminardb/laminardb-server:latest
# docker run -d -p 8080:8080 ghcr.io/laminardb/laminardb-server:latest

# Run the binary
./laminardb --config laminardb.toml

CLI Options

FlagDefaultDescription
--config <FILE>laminardb.tomlTOML configuration file path
--log-level <LEVEL>infotrace, debug, info, warn, error
--admin-bind <ADDR>Override HTTP bind address from config
--validate-checkpointsValidate all stored checkpoints and exit

[server]

The [server] section configures standalone server behaviors, control plane security, and pgwire network parameters.

FieldTypeDefaultDescription
mode String "embedded" Run mode. "embedded" (standalone single-node) or "cluster" (multi-node deployment).
bind String "127.0.0.1:8080" HTTP API and Console UI control-plane bind address.
console_token String none Bearer token gating HTTP control-plane API access and WebSockets. If omitted, API is unauthenticated.
console_cors_allowed_origins Array<String> none CORS origins allowed to access control-plane REST/WS endpoints.
pgwire_bind String none Postgres wire protocol listener bind address. Disabled if omitted.
pgwire_allow_remote bool false Must be set to true to allow binding pgwire on non-loopback interfaces.
pgwire_users Map {} A map of usernames to MD5 or plain-text passwords for pgwire client authentication.
pgwire_tls_cert String none Path to PEM certificate file for pgwire TLS connection support.
pgwire_tls_key String none Path to PEM private key (PKCS#8/RSA) for pgwire TLS.
pgwire_tls_client_ca String none Path to PEM CA bundle for enforcing mutual TLS (mTLS) verification.
pgwire_max_connections usize 256 Maximum active pgwire sessions before connections are closed.
pgwire_max_auth_failures_per_min u32 10 Failed authentications per IP per minute before temporary ban. 0 disables.
pgwire_tls_min_version String "1.2" Minimum TLS protocol version ("1.2" or "1.3").

Top-Level Config Keys:

FieldTypeDefaultDescription
node_id String none Unique node identity (required if mode = "cluster"; falls back to {hostname}-{port}).
laminardb.toml
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
console_token = "supersecret"
pgwire_bind = "0.0.0.0:5433"
pgwire_allow_remote = true
log_level = "info"

Cluster Config

When mode = "cluster", the server requires both the [discovery] and [coordination] sections to be defined.

[discovery]

FieldTypeDefaultDescription
strategy String required Discovery mode. Either "gossip" (Chitchat p2p membership) or "static".
seeds Array<String> [] Gossip/Static seed peer addresses to join the cluster (e.g. ["10.0.0.1:7946", "10.0.0.2:7946"]). Required for static strategy.
gossip_port u16 7946 Gossip communications port.
advertise_host String none The host/IP address this node advertises to peers. Falls back to loopback if DNS resolution fails.

[coordination]

FieldTypeDefaultDescription
strategy String "raft" Consensus coordinator strategy. Currently supports "raft".
raft_port u16 8888 The port used for Raft consensus group communications.
election_timeout Duration "3s" Raft leader election timeout (supports humantime format).
heartbeat_interval Duration "500ms" Raft leader heartbeat interval.
laminardb.toml (Cluster sections)
node_id = "node-1"

[discovery]
strategy = "gossip"
gossip_port = 7946
advertise_host = "10.0.0.1"
seeds = ["10.0.0.1:7946", "10.0.0.2:7946"]

[coordination]
strategy = "raft"
raft_port = 8888
election_timeout = "3s"
heartbeat_interval = "500ms"

[state]

The [state] section configures the engine's query state backend (used for aggregates, joins, and windowing). It is a tagged union using the backend key.

In-Process State (Non-durable, in-memory)

Configure with backend = "in_process" (the default).

FieldTypeDefaultDescription
vnode_capacity u32 256 Number of virtual nodes to divide the hash ring into.

Local State (Durable, local filesystem)

Configure with backend = "local".

FieldTypeDefaultDescription
path String required Local directory path where state partitions are stored.
instance_id String "local" Auditable instance identity written to checkpoint metadata.
vnode_capacity u32 256 Number of virtual nodes.

Object Store State (Durable, shared storage)

Configure with backend = "object_store". Essential for cluster mode deployments.

FieldTypeDefaultDescription
url String required Shared storage URL (currently "file://..."; cloud schemes such as "s3://"/"gs://"/"az://" are not yet implemented).
instance_id String required This node's unique ID, used for writer fencing.
vnode_capacity u32 256 Total virtual nodes for the cluster layout.
vnodes Array<u32> none Static list of vnodes assigned to this node (only used for static/non-cluster layouts).
merger_instance String none The node that compiles partial sink outputs (only used in static modes).
discovery String "static" Membership discovery mode: "static" or "dynamic" (gossip).
seed_peers Array<String> [] Seed peer addresses for dynamic chitchat discovery.

[checkpoint]

FieldTypeDefaultDescription
url String "file:///tmp/laminardb/checkpoints" Storage URL. file:///, s3://, gs://
interval Duration "10s" Checkpoint interval. Supports humantime: "10s", "1m", "500ms"
storage Map {} Cloud storage credentials. Keys: aws_access_key_id, aws_region, etc.
tiering Section none Optional S3 storage class tiering. See Tiering.
S3 checkpoint example
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"

[checkpoint.storage]
aws_region = "us-east-1"
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

Console UI & Control API

The Console UI is a web client that lets you inspect your streaming SQL nodes, view lineage dependency graphs, and author queries. You can access the hosted console at laminardb.github.io/laminardb-console-ui.

The console connects to the control-plane APIs exposed by the server. All endpoints are protected by token authentication if server.console_token is configured.

Security & Gating

  • HTTP Authentication: Send the configured token in the header: Authorization: Bearer <token>.
  • WebSocket Authentication: Pass the token as a query parameter: /ws/{name}?token=<token>.
  • CORS Gating: Restrict client origins using the server.console_cors_allowed_origins array in TOML.

Lineage Dependency Graph

GET /api/v1/graph parses SQL statements and queries the engine schema to return a directed acyclic graph (DAG) of the data pipeline.

GET /api/v1/graph Response Format
{
  "nodes": [
    { "id": "trades", "type": "source", "label": "trades" },
    { "id": "vwap", "type": "stream", "label": "vwap" },
    { "id": "trade_archive", "type": "sink", "label": "trade_archive" }
  ],
  "edges": [
    { "source": "trades", "target": "vwap" },
    { "source": "vwap", "target": "trade_archive" }
  ]
}

Ad-hoc Query & Ephemeral Stream Lifecycle

To run an ad-hoc query and tail it in the browser:

  1. Post the SQL query to POST /api/v1/queries:
    POST /api/v1/queries
    { "sql": "SELECT symbol, price FROM trades WHERE volume > 1000" }
  2. The server registers an ephemeral stream named __console_<uuid> and returns:
    { "stream_id": "__console_<uuid>", "ws_url": "/ws/__console_<uuid>" }
  3. The client connects to the WebSocket at ws_url to receive the data stream.
  4. When the WebSocket connection closes or times out, the server automatically DROPs the ephemeral stream and releases resources.

Realtime Telemetry & WebSocket Protocol

WebSocket client connects to GET /ws/{name}. Messages are serialized using Arrow JSON format:

{
  "type": "data",
  "subscription_id": 1,
  "data": [ { "symbol": "AAPL", "price": 178.5 } ],
  "sequence": 42
}

A heartbeat ping frame is sent by the server every 15 seconds; the client must respond to prevent connection termination.

Cluster Management Endpoints

  • GET /api/v1/cluster/nodes — Returns list of cluster members, including their node IDs, network addresses, and states (Active, Draining, Suspected, Left).
  • GET /api/v1/cluster/vnodes — Returns the current assignment map mapping the 256 virtual nodes (vnodes) to active cluster nodes.
  • GET /api/v1/cluster/leader — Returns node details for the active Raft leader lease holder.
  • GET /api/v1/cluster/checkpoints — Returns history and completion metadata of recent coordinator checkpoints.

REST API

The server exposes an HTTP API on the configured bind address. All responses are JSON except /metrics (Prometheus text format).

GET /health Liveness probe
GET /ready Readiness probe (pipelines started)
GET /metrics Prometheus text metrics
GET /api/v1/sources List configured sources
GET /api/v1/sinks List configured sinks
GET /api/v1/streams List running streams
GET /api/v1/streams/{name} Stream detail by name
GET /api/v1/mvs List materialized views
GET /api/v1/connectors List available connector types and config options
POST /api/v1/queries Create ephemeral query stream for ad-hoc tailing
POST /api/v1/checkpoint Trigger immediate coordinator checkpoint
POST /api/v1/sql Execute DDL/SQL query (returns JSON rows or metadata)
POST /api/v1/reload Hot-reload configuration and certificates
GET /api/v1/graph Generate pipeline dependency graph (nodes & edges)
GET /api/v1/cluster Retrieve basic engine cluster status
GET /api/v1/cluster/nodes List nodes and their membership state
GET /api/v1/cluster/vnodes Get current vnode instance assignments
GET /api/v1/cluster/leader Get current leader node details
GET /api/v1/cluster/checkpoints List completed checkpoint metadata
POST /api/v1/pipeline/start Resume execution of a specific pipeline
POST /api/v1/pipeline/stop Stop execution of a specific pipeline
GET /api/v1/pipeline/status Query individual pipeline state
GET /ws/{name} WebSocket query/stream realtime subscription channel

Hot Reload

Edit the TOML config file while the server is running. A file watcher detects changes (500ms debounce, content-hash comparison), diffs the old and new configs, and applies incremental DDL. Or trigger manually via POST /api/v1/reload.

What reloads without restart: sources, pipelines, sinks, lookups (add, remove, or change).

What requires restart: [server], [state], [checkpoint] sections. A warning is returned if these sections change.

Reload order: remove changed/deleted sinks → pipelines → lookups → sources, then recreate sources → lookups → pipelines → sinks. Disable file watcher with LAMINAR_DISABLE_FILE_WATCH=1.

Prometheus Metrics

GET /metrics returns Prometheus text format. Key metrics:

MetricTypeDescription
laminardb_events_ingested_totalcounterTotal events ingested across all sources
laminardb_events_emitted_totalcounterTotal events emitted to sinks
laminardb_events_dropped_totalcounterTotal events dropped (late data, errors)
laminardb_source_events_totalcounterEvents per source (label: source)
laminardb_source_countgaugeNumber of configured sources
laminardb_stream_countgaugeNumber of running streams
laminardb_sink_countgaugeNumber of configured sinks
laminardb_checkpoints_completed_totalcounterSuccessful checkpoints
laminardb_checkpoints_failed_totalcounterFailed checkpoints
laminardb_checkpoint_epochgaugeCurrent checkpoint epoch
laminardb_checkpoint_last_duration_msgaugeLast checkpoint duration (ms)
laminardb_cycle_duration_p50_nsgaugep50 processing cycle latency (ns)
laminardb_cycle_duration_p95_nsgaugep95 processing cycle latency (ns)
laminardb_cycle_duration_p99_nsgaugep99 processing cycle latency (ns)
laminardb_uptime_secondsgaugeServer uptime
laminardb_pipeline_state_infogaugePipeline state (label: state)
laminardb_reload_totalcounterConfig reload count
laminardb_reload_last_timestampgaugeUnix timestamp of last reload

[[source]]

FieldTypeDefaultDescription
name String required Unique source name. Referenced in SQL and sinks.
connector String required "kafka", "postgres_cdc", "mysql_cdc", "mongodb-cdc", "nats", "websocket", "files", "delta-lake", "iceberg", "otel", "parquet-lookup", "postgres-lookup"
format String "json" "json", "avro", "protobuf", "csv"
properties Table {} Connector-specific properties. See connector sections below.
schema Array [] Column definitions: {name, type, nullable}
watermark Section none Event-time watermark: {column, max_out_of_orderness}
Source with schema and watermark
[[source]]
name = "trades"
connector = "kafka"
format = "json"

[source.properties]
bootstrap.servers = "localhost:9092"
topic = "market-trades"
group.id = "laminar-analytics"

[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false

[[source.schema]]
name = "price"
type = "DOUBLE"

[[source.schema]]
name = "ts"
type = "BIGINT"

[source.watermark]
column = "ts"
max_out_of_orderness = "5s"

[[pipeline]]

FieldTypeDefaultDescription
name String required Unique pipeline name
sql String required SQL query. Multi-line supported with triple-quoted strings.
parallelism usize none Optional parallelism override (default: server worker count)

[[sink]]

FieldTypeDefaultDescription
name String required Unique sink name
pipeline String required Pipeline name this sink reads from. Must exist.
connector String required "kafka", "postgres", "mongodb", "nats", "delta-lake", "iceberg", "websocket", "files", "stdout"
delivery String "at_least_once" "at_least_once" or "exactly_once"
properties Table {} Connector-specific properties

[[lookup]]

FieldTypeDefaultDescription
name String required Unique lookup table name. Used in SQL JOINs.
connector String required "postgres", "parquet"
strategy String "poll" "poll", "cdc", "manual"
pushdown bool true Push predicates to source for filtered lookups
cache.size_bytes u64 104857600 Cache size in bytes (100 MB)
cache.ttl Duration "300s" Cache entry TTL
cache.hybrid bool false Enable in-memory caching via quick_cache

Connectors

All connectors are feature-gated. Enable via cargo add laminar-db --features kafka,delta-lake.

Kafka

Source + Sink
kafka

NATS

Source + Sink
nats

PostgreSQL CDC

Source
postgres-cdc

MySQL CDC

Source
mysql-cdc

MongoDB CDC

Source + Sink
mongodb-cdc

PostgreSQL Sink

Sink
postgres-sink

Delta Lake

Source + Sink
delta-lake

Apache Iceberg

Source + Sink
iceberg

OpenTelemetry

Source (OTLP/gRPC)
otel

WebSocket

Source + Sink
websocket

File Auto-Loader

Source + Sink
files

Kafka Source

Required

PropertyDescription
bootstrap.serversKafka broker addresses (comma-separated)
group.idConsumer group ID
topic or topic.patternTopic names (comma-separated) or regex pattern. Mutually exclusive.

Format & Schema

PropertyDefaultDescription
formatjsonjson, avro, protobuf
schema.registry.urlConfluent Schema Registry URL
schema.registry.usernameSchema Registry auth
schema.registry.passwordSchema Registry auth

Security

PropertyDefaultDescription
security.protocolPLAINTEXTPLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
sasl.mechanismPLAIN, SCRAM-SHA-256, SCRAM-SHA-512
sasl.usernameSASL username
sasl.passwordSASL password
ssl.ca.locationCA certificate path

Consumer Tuning

PropertyDefaultDescription
startup.modeearliestearliest, latest, none
max.poll.records1000Records per poll batch
isolation.levelread_committedread_uncommitted, read_committed
partition.assignment.strategyrangerange, roundrobin, sticky, cooperative-sticky
max.out.of.orderness.ms5000Watermark out-of-order tolerance
idle.timeout.ms30000Idle watermark advancement timeout

Any property prefixed with kafka. is passed through to the underlying rdkafka consumer.

Kafka Sink

PropertyDefaultDescription
bootstrap.serversrequiredKafka broker addresses
topicrequiredTarget topic
formatjsonjson, avro, protobuf
delivery.guaranteeat-least-onceat-least-once, exactly-once
key.columnColumn for partition key
partitionerkey-hashkey-hash, round-robin, sticky
compression.typenonenone, gzip, snappy, lz4, zstd
acksallall, 1, 0
linger.ms5Batch linger time
batch.size16384Max batch bytes
transactional.idFor exactly-once producer

NATS Source

PropertyDefaultDescription
serversrequiredNATS server URLs (comma-separated)
modejetstreamjetstream, core
formatjsonjson, avro, protobuf
auth.modenonenone, user_pass, token, creds_file
userAuth username (required for user_pass)
passwordAuth password (required for user_pass)
tokenAuth token (required for token)
creds.filePath to creds file (required for creds_file)
tls.enabledfalseEnable TLS
tls.ca.locationCA cert path
tls.cert.locationClient certificate path
tls.key.locationClient key path
streamNATS JetStream stream name (required in jetstream mode)
subjectSubscription subject (required in core mode)
subject.filtersComma-separated JetStream subject filters
consumerPull consumer durable name (required in jetstream mode)
deliver.policyallall, new, by_start_sequence, by_start_time
start.sequenceStart sequence (required if deliver.policy=by_start_sequence)
start.timeStart RFC3339 timestamp (required if deliver.policy=by_start_time)
ack.policyexplicitexplicit, none
ack.wait.ms60000Ack wait timeout in milliseconds
max.deliver5Max deliveries
max.ack.pending10000Max outstanding unacknowledged messages
fetch.batch500Batch size for fetch operations
fetch.max.wait.ms500Max wait for fetch operations in milliseconds
queue.groupQueue group (core mode only)

NATS Sink

Source: nats/sink.rs
PropertyDefaultDescription
serversrequiredNATS server URLs (comma-separated)
modejetstreamjetstream, core
formatjsonjson, avro, protobuf
auth.modenonenone, user_pass, token, creds_file
userAuth username (required for user_pass)
passwordAuth password (required for user_pass)
tokenAuth token (required for token)
creds.filePath to creds file (required for creds_file)
tls.enabledfalseEnable TLS
tls.ca.locationCA cert path
tls.cert.locationClient certificate path
tls.key.locationClient key path
streamTarget JetStream stream (required if delivery.guarantee=exactly_once)
subjectLiteral publishing subject. Mutually exclusive with subject.column.
subject.columnPer-row subject column name. Mutually exclusive with subject.
expected.streamOptional stream name assertion header
delivery.guaranteeat-least-onceat-least-once, exactly-once, at-most-once
dedup.id.columnColumn to use for message deduplication ID (required if exactly_once)
min.duplicate.window.ms120000Minimum duplicate window in milliseconds
max.pending4096Maximum pending publish futures
ack.timeout.ms30000Ack timeout in milliseconds
header.columnsComma-separated column names to map to NATS headers

PostgreSQL CDC Source

PropertyDefaultDescription
hostrequiredPostgreSQL hostname
port5432Port
databaserequiredDatabase name
usernamepostgresAuth username
passwordAuth password
slot.namerequiredLogical replication slot
publicationrequiredPublication name
ssl.modepreferdisable, prefer, require, verify-ca, verify-full
snapshot.modeinitialinitial, never, always
table.includeComma-separated table filter
table.excludeComma-separated table exclusions
max.poll.records1000Records per poll

MySQL CDC Source

PropertyDefaultDescription
hostrequiredMySQL hostname
port3306Port
usernamerequiredAuth username
passwordAuth password
databaseDatabase filter
server.id1001Unique replica server ID
use.gtidtrueGTID-based replication
gtid.setStarting GTID set
snapshot.modeinitialinitial, never, always
table.includeComma-separated tables (db.table format)

PostgreSQL Sink

PropertyDefaultDescription
hostnamerequiredPostgreSQL hostname
port5432Port
databaserequiredDatabase name
usernamerequiredAuth username
passwordAuth password
table.namerequiredTarget table
schema.namepublicPostgreSQL schema
write.modeappendappend (COPY BINARY) or upsert (INSERT ON CONFLICT)
primary.keyPK columns for upsert (comma-separated). Required if write.mode=upsert.
batch.size4096Records per flush
pool.size4Connection pool size
delivery.guaranteeat_least_onceat_least_once, exactly_once (2PC)
auto.create.tablefalseCreate table if missing

MongoDB CDC Source

Consumes MongoDB change streams. Resume tokens provide at-least-once delivery across restarts. Requires MongoDB 4.0+ (replica set) or 4.2+ (sharded cluster).

PropertyDefaultDescription
connection.urimongodb://localhost:27017MongoDB connection string
databaserequiredDatabase name
collectionrequiredCollection name, or * for all collections in database
full.document.modedeltadelta, update_lookup, required, when_available
max.await.time.ms1000getMore await timeout (milliseconds)
batch.size1000Cursor batch size hint
max.poll.records1000Max records per poll batch
max.buffered.events100000Max buffered events before backpressure

Full document modes: delta sends the update description only (default). update_lookup fetches the current document on updates. required needs changeStreamPreAndPostImages enabled on the collection. when_available returns the post-image when present, otherwise null.

CDC envelope schema: _namespace (Utf8), _op (Utf8: I/U/R/D/DROP), _document_key (Utf8), _cluster_time_s (UInt32), _cluster_time_i (UInt32), _wall_time_ms (Int64), _full_document (Utf8, nullable), _update_desc (Utf8, nullable), _resume_token (Utf8).

MongoDB Sink

PropertyDefaultDescription
connection.urimongodb://localhost:27017MongoDB connection string
databaserequiredTarget database name
collectionrequiredTarget collection name
write.modeinsertinsert, upsert, replace, cdc_replay
batch.size500Max documents per bulk write
flush.interval.ms1000Max time between flushes (ms)
orderedtruetrue = fail-fast ordered writes; false = unordered (higher throughput)
write.concern.wmajoritymajority or integer node count
write.concern.journaltrueWait for journal commit

Write modes: insert is append-only. upsert replaces documents by the fields listed in upsert.key.fields. replace rewrites the full document. cdc_replay routes by the _op column (I→insert, U→update, R→replace, D→delete). Time series collections only support insert.

Delta Lake Source

PropertyDefaultDescription
table.pathrequiredDelta table path (local, s3://, az://, gs://)
read.modeincrementalsnapshot (full read) or incremental (change feed)
starting.versionVersion to start from
poll.interval.ms1000Version poll interval
partition.filterPartition filter (SQL expression)
catalog.typenonenone, glue, unity

Delta Lake Sink

PropertyDefaultDescription
table.pathrequiredDelta table path
write.modeappendappend, overwrite, upsert
merge.key.columnsKey columns for upsert (comma-separated)
partition.columnsPartition columns (comma-separated)
target.file.size134217728Target Parquet file size (128 MB)
max.buffer.records100000Records before flush
max.buffer.duration.ms60000Time before flush
delivery.guaranteeat-least-onceat-least-once, exactly-once
writer.idWriter ID (required for exactly-once)
schema.evolutionfalseAuto-merge new columns
compaction.enabledfalseEnable automatic compaction
vacuum.retention.hours168Retention hours (7 days)
catalog.typenonenone, glue, unity

Cloud storage credentials: use storage.aws_access_key_id, storage.aws_secret_access_key, storage.aws_region, etc.

WebSocket Source

Note: WebSocket is non-replayable. At-most-once delivery on recovery.

PropertyDefaultDescription
modeclientclient (connect out) or server (accept connections)
urlWebSocket URL (client mode, comma-separated for multi)
bind.addressBind address (server mode)
formatjsonjson, jsonlines, binary, csv
subscribe.messageMessage sent after handshake (client mode)
reconnect.enabledtrueAuto-reconnect (client mode)
reconnect.max.delay.ms30000Max backoff delay
ping.interval.ms30000Keepalive ping interval
auth.typebearer, basic, hmac

WebSocket Sink

PropertyDefaultDescription
modeserverserver (accept clients) or client (push to remote)
bind.addressBind address (server mode)
urlRemote URL (client mode)
formatjsonjson, jsonlines, arrow_ipc, binary
max.connections10000Max concurrent clients (server mode)
slow.client.policydrop_oldestdrop_oldest, drop_newest, disconnect
replay.buffer.sizeMessages replayed to new clients

File Source (Auto-Loader)

PropertyDefaultDescription
pathrequiredDirectory, glob, or cloud URL
formatcsv, json, text, parquet (auto-detected if omitted)
poll_interval10000Directory poll interval (ms)
max_files_per_poll100Files per poll cycle
csv.delimiterCSV delimiter character
csv.has.headertrueCSV has header row
include_metadatafalseInclude _metadata column

File Sink

PropertyDefaultDescription
pathrequiredOutput directory
formatrequiredcsv, json, text, parquet
moderollingappend (single file) or rolling (rotate)
prefixpartFile name prefix (rolling mode)
compressionsnappyParquet: snappy, gzip, brotli, lz4, zstd, uncompressed

Join Semantics

LaminarDB supports five join types. All are stateful and operate on streaming data. Join types are detected from SQL syntax automatically.

Join TypeTime BoundMatchUse Case
Interval Required All pairs within window Stream-stream correlation
ASOF Optional tolerance One closest per key (backward / forward / nearest) Trade/quote enrichment
Temporal Probe Explicit offsets N rows per left event at fixed offsets Impact curves, lagged feature lookups
Temporal Implicit (version time) Point-in-time lookup Versioned dimension tables
Lookup None Key equality Static reference enrichment

Interval Join (Stream-Stream)

Matches all pairs of rows from two streams where timestamps fall within a time bound: |left_ts - right_ts| <= time_bound. Both sides are buffered in state. Expired rows are evicted when the watermark advances past the bound.

Supported types: INNER, LEFT, RIGHT, FULL, LEFT SEMI, LEFT ANTI, RIGHT SEMI, RIGHT ANTI

Interval join — match orders to payments within 1 hour
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p
    ON o.order_id = p.order_id
    AND p.ts BETWEEN o.ts AND o.ts + 3600000;
Semi join — orders that have at least one payment
SELECT o.order_id, o.amount
FROM orders o
LEFT SEMI JOIN payments p
    ON o.order_id = p.order_id
    AND p.ts BETWEEN o.ts AND o.ts + 3600000;

State: Both sides buffered as Arrow batches with per-key BTreeMap<timestamp> indices. Time columns are extracted automatically from the BETWEEN clause. New left rows probe all right state; new right rows probe only old left state (avoids double-emit). Checkpointed via Arrow IPC serialization.

ASOF Join

For each left row, finds the one closest right row by timestamp within the same key partition. Uses Snowflake-compatible MATCH_CONDITION syntax.

Supported types: INNER, LEFT

Three directions

DirectionSyntaxBehavior
Backward t.ts >= q.ts Most recent prior right event
Forward t.ts <= q.ts Next future right event
Nearest NEAREST(t.ts, q.ts) Closest by absolute time difference
ASOF backward — enrich trades with most recent quote
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
    MATCH_CONDITION(t.ts >= q.ts)
    ON t.symbol = q.symbol;
ASOF with tolerance — max 5 second gap
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
    MATCH_CONDITION(t.ts >= q.ts AND t.ts - q.ts <= INTERVAL '5' SECOND)
    ON t.symbol = q.symbol;
ASOF nearest — closest event by absolute time
SELECT s.sensor_id, s.reading, c.calibration
FROM sensor_data s
ASOF JOIN calibrations c
    MATCH_CONDITION(NEAREST(s.ts, c.ts))
    ON s.sensor_id = c.sensor_id;

State: Right-side events stored in per-key BTreeMap<timestamp, Vec<row_index>> for O(log n) temporal lookups. Multiple rows at the same timestamp are handled correctly. Key columns support VARCHAR and BIGINT types.

Temporal Probe Join

For each left event, produces N output rows by probing the right stream at a fixed list of time offsets. Each probe returns the ASOF-matched right value at left.event_time + offset. Useful for lagged feature lookups and impact curves.

Offset specification

SyntaxDescription
RANGE FROM <start> TO <end> STEP <step> Uniform range (all in milliseconds)
LIST (<ms>, <ms>, ...) Explicit list of offsets in milliseconds

Directions: Backward, Forward, and Nearest. Each probed offset is resolved using the same semantics as a standalone ASOF join in that direction.

Temporal Join (FOR SYSTEM_TIME AS OF)

Point-in-time lookup against a versioned table. For each stream event, looks up the table row valid at the event's timestamp. Uses SQL:2011 FOR SYSTEM_TIME AS OF syntax.

Supported types: INNER, LEFT

Temporal join — currency rate valid at order time
SELECT o.order_id, o.amount, r.rate,
       o.amount * r.rate AS converted
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
    ON o.currency = r.currency;

Versioned tables: The right-side table must have a timestamp column marking version validity. Event-time semantics are deterministic; process-time (PROCTIME()) is non-deterministic.

Lookup Join

Enriches stream events against a static or slowly-changing reference table. Matches by key equality with no time bound. Results are cached with a configurable TTL.

Supported types: INNER, LEFT

Lookup join — enrich orders with product info
SELECT o.order_id, o.amount, p.name, p.category
FROM orders o
LEFT JOIN products p
    ON o.product_id = p.id;

Cache: quick_cache-backed in-memory cache. Configure via [[lookup]] section: cache.size_bytes, cache.ttl, cache.hybrid. Supports predicate pushdown to the lookup connector.

Environment Variables

All string values in the TOML configuration file support ${VAR} substitution. Use ${VAR:-default} for optional variables with fallbacks. Missing variables without defaults cause a startup error.

SQL Queries: Because substitution is performed on the raw configuration file contents before parsing, environment variables can be embedded directly within SQL statements defined in the TOML file (such as in [[pipeline]] SQL queries or the top-level sql string).

Dynamic Queries: Environment variables are not expanded in queries submitted dynamically after startup (e.g., via the pgwire protocol or the POST /api/v1/sql HTTP endpoint).

Environment variable examples
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS:-localhost:9092}"
sasl.username = "${KAFKA_USER}"
sasl.password = "${KAFKA_PASSWORD}"

[checkpoint]
url = "s3://${S3_BUCKET}/checkpoints"

[checkpoint.storage]
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

S3 Storage Class Tiering

Active checkpoints use the hot tier. Older checkpoints automatically tier down via S3 Lifecycle rules.

FieldDefaultDescription
hot_class"STANDARD"Storage class for active checkpoints (e.g. "EXPRESS_ONE_ZONE")
warm_class"STANDARD"Storage class for older checkpoints
cold_class""Archive class (e.g. "GLACIER_IR"). Empty = no cold tier.
hot_retention"24h"Time before hot → warm
warm_retention"7d"Time before warm → cold

Delivery Guarantees

GuaranteeHow it worksRequirements
at_least_once Default. On recovery, sources replay from last checkpoint offset. Some records may be reprocessed. Checkpointing enabled. Works with all connectors.
exactly_once Barrier-aligned checkpoints with two-phase commit to sinks. No duplicates in output. All sources must support replay. Sinks must support 2PC. Currently: Kafka sink and PostgreSQL sink.

WebSocket sources are non-replayable. Under exactly-once mode, they degrade to at-most-once delivery with a warning.

Full Config Example

laminardb.toml
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"

[state]
backend = "in_process"

[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"

[checkpoint.storage]
aws_region = "us-east-1"

# --- Sources ---

[[source]]
name = "trades"
connector = "kafka"
format = "avro"

[source.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "market-trades"
group.id = "laminar-vwap"
schema.registry.url = "${SCHEMA_REGISTRY_URL}"
startup.mode = "earliest"

[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false

[[source.schema]]
name = "price"
type = "DOUBLE"

[[source.schema]]
name = "volume"
type = "BIGINT"

[[source.schema]]
name = "ts"
type = "BIGINT"

[source.watermark]
column = "ts"
max_out_of_orderness = "5s"

# --- Lookups ---

[[lookup]]
name = "instruments"
connector = "postgres"
strategy = "poll"
pushdown = true

[lookup.properties]
hostname = "db.internal"
database = "reference_data"
username = "reader"
table.name = "instruments"

[lookup.cache]
size_bytes = 104857600
ttl = "5m"

# --- Pipelines ---

[[pipeline]]
name = "vwap"
sql = """
SELECT t.symbol,
       SUM(t.price * CAST(t.volume AS DOUBLE))
         / SUM(CAST(t.volume AS DOUBLE)) AS vwap,
       i.exchange
FROM trades t
JOIN instruments i ON t.symbol = i.symbol
GROUP BY t.symbol, i.exchange, tumble(t.ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
"""

# --- Sinks ---

[[sink]]
name = "kafka_output"
pipeline = "vwap"
connector = "kafka"
delivery = "exactly_once"

[sink.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "vwap-1m"
format = "json"
key.column = "symbol"
compression.type = "lz4"

[[sink]]
name = "lake"
pipeline = "vwap"
connector = "delta-lake"
delivery = "exactly_once"

[sink.properties]
table.path = "s3://my-bucket/delta/vwap"
partition.columns = "exchange"
writer.id = "vwap-writer"