Standalone Server & Configuration Reference
The laminardb binary reads a TOML configuration file, constructs streaming pipelines, and serves a REST API with Prometheus metrics.
Every field on this page is verified against
config.rs.
Last updated: 25 May 2026.
Install
# Pre-built binary (Linux x86_64) — resolve the latest release tag
VERSION=$(curl -s https://api.github.com/repos/laminardb/laminardb/releases/latest | grep tag_name | cut -d '"' -f4)
curl -LO "https://github.com/laminardb/laminardb/releases/download/${VERSION}/laminardb-server-x86_64-unknown-linux-gnu-${VERSION}.tar.gz"
tar xzf laminardb-server-*.tar.gz
# Or build from source
cargo install laminar-server
# Or Docker (multi-arch — Docker Hub or GHCR)
docker run -d -p 8080:8080 laminardb/laminardb-server:latest
# docker run -d -p 8080:8080 ghcr.io/laminardb/laminardb-server:latest
# Run the binary
./laminardb --config laminardb.toml
CLI Options
| Flag | Default | Description |
|---|---|---|
| --config <FILE> | laminardb.toml | TOML configuration file path |
| --log-level <LEVEL> | info | trace, debug, info, warn, error |
| --admin-bind <ADDR> | Override HTTP bind address from config | |
| --validate-checkpoints | Validate all stored checkpoints and exit |
[server]
The [server] section configures standalone server behaviors, control plane security, and pgwire network parameters.
| Field | Type | Default | Description |
|---|---|---|---|
| mode | String | "embedded" | Run mode. "embedded" (standalone single-node) or "cluster" (multi-node deployment). |
| bind | String | "127.0.0.1:8080" | HTTP API and Console UI control-plane bind address. |
| console_token | String | none | Bearer token gating HTTP control-plane API access and WebSockets. If omitted, API is unauthenticated. |
| console_cors_allowed_origins | Array<String> | none | CORS origins allowed to access control-plane REST/WS endpoints. |
| pgwire_bind | String | none | Postgres wire protocol listener bind address. Disabled if omitted. |
| pgwire_allow_remote | bool | false | Must be set to true to allow binding pgwire on non-loopback interfaces. |
| pgwire_users | Map | {} | A map of usernames to MD5 or plain-text passwords for pgwire client authentication. |
| pgwire_tls_cert | String | none | Path to PEM certificate file for pgwire TLS connection support. |
| pgwire_tls_key | String | none | Path to PEM private key (PKCS#8/RSA) for pgwire TLS. |
| pgwire_tls_client_ca | String | none | Path to PEM CA bundle for enforcing mutual TLS (mTLS) verification. |
| pgwire_max_connections | usize | 256 | Maximum active pgwire sessions before connections are closed. |
| pgwire_max_auth_failures_per_min | u32 | 10 | Failed authentications per IP per minute before temporary ban. 0 disables. |
| pgwire_tls_min_version | String | "1.2" | Minimum TLS protocol version ("1.2" or "1.3"). |
Top-Level Config Keys:
| Field | Type | Default | Description |
|---|---|---|---|
| node_id | String | none | Unique node identity (required if mode = "cluster"; falls back to {hostname}-{port}). |
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
console_token = "supersecret"
pgwire_bind = "0.0.0.0:5433"
pgwire_allow_remote = true
log_level = "info"
Cluster Config
When mode = "cluster", the server requires both the [discovery] and [coordination] sections to be defined.
[discovery]
| Field | Type | Default | Description |
|---|---|---|---|
| strategy | String | required | Discovery mode. Either "gossip" (Chitchat p2p membership) or "static". |
| seeds | Array<String> | [] | Gossip/Static seed peer addresses to join the cluster (e.g. ["10.0.0.1:7946", "10.0.0.2:7946"]). Required for static strategy. |
| gossip_port | u16 | 7946 | Gossip communications port. |
| advertise_host | String | none | The host/IP address this node advertises to peers. Falls back to loopback if DNS resolution fails. |
[coordination]
| Field | Type | Default | Description |
|---|---|---|---|
| strategy | String | "raft" | Consensus coordinator strategy. Currently supports "raft". |
| raft_port | u16 | 8888 | The port used for Raft consensus group communications. |
| election_timeout | Duration | "3s" | Raft leader election timeout (supports humantime format). |
| heartbeat_interval | Duration | "500ms" | Raft leader heartbeat interval. |
node_id = "node-1"
[discovery]
strategy = "gossip"
gossip_port = 7946
advertise_host = "10.0.0.1"
seeds = ["10.0.0.1:7946", "10.0.0.2:7946"]
[coordination]
strategy = "raft"
raft_port = 8888
election_timeout = "3s"
heartbeat_interval = "500ms"
[state]
The [state] section configures the engine's query state backend (used for aggregates, joins, and windowing). It is a tagged union using the backend key.
In-Process State (Non-durable, in-memory)
Configure with backend = "in_process" (the default).
| Field | Type | Default | Description |
|---|---|---|---|
| vnode_capacity | u32 | 256 | Number of virtual nodes to divide the hash ring into. |
Local State (Durable, local filesystem)
Configure with backend = "local".
| Field | Type | Default | Description |
|---|---|---|---|
| path | String | required | Local directory path where state partitions are stored. |
| instance_id | String | "local" | Auditable instance identity written to checkpoint metadata. |
| vnode_capacity | u32 | 256 | Number of virtual nodes. |
Object Store State (Durable, shared storage)
Configure with backend = "object_store". Essential for cluster mode deployments.
| Field | Type | Default | Description |
|---|---|---|---|
| url | String | required | Shared storage URL (currently "file://..."; cloud schemes such as "s3://"/"gs://"/"az://" are not yet implemented). |
| instance_id | String | required | This node's unique ID, used for writer fencing. |
| vnode_capacity | u32 | 256 | Total virtual nodes for the cluster layout. |
| vnodes | Array<u32> | none | Static list of vnodes assigned to this node (only used for static/non-cluster layouts). |
| merger_instance | String | none | The node that compiles partial sink outputs (only used in static modes). |
| discovery | String | "static" | Membership discovery mode: "static" or "dynamic" (gossip). |
| seed_peers | Array<String> | [] | Seed peer addresses for dynamic chitchat discovery. |
[checkpoint]
| Field | Type | Default | Description |
|---|---|---|---|
| url | String | "file:///tmp/laminardb/checkpoints" | Storage URL. file:///, s3://, gs:// |
| interval | Duration | "10s" | Checkpoint interval. Supports humantime: "10s", "1m", "500ms" |
| storage | Map | {} | Cloud storage credentials. Keys: aws_access_key_id, aws_region, etc. |
| tiering | Section | none | Optional S3 storage class tiering. See Tiering. |
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"
[checkpoint.storage]
aws_region = "us-east-1"
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
Console UI & Control API
The Console UI is a web client that lets you inspect your streaming SQL nodes, view lineage dependency graphs, and author queries. You can access the hosted console at laminardb.github.io/laminardb-console-ui.
The console connects to the control-plane APIs exposed by the server. All endpoints are protected by token authentication if server.console_token is configured.
Security & Gating
- HTTP Authentication: Send the configured token in the header:
Authorization: Bearer <token>. - WebSocket Authentication: Pass the token as a query parameter:
/ws/{name}?token=<token>. - CORS Gating: Restrict client origins using the
server.console_cors_allowed_originsarray in TOML.
Lineage Dependency Graph
GET /api/v1/graph parses SQL statements and queries the engine schema to return a directed acyclic graph (DAG) of the data pipeline.
{
"nodes": [
{ "id": "trades", "type": "source", "label": "trades" },
{ "id": "vwap", "type": "stream", "label": "vwap" },
{ "id": "trade_archive", "type": "sink", "label": "trade_archive" }
],
"edges": [
{ "source": "trades", "target": "vwap" },
{ "source": "vwap", "target": "trade_archive" }
]
}
Ad-hoc Query & Ephemeral Stream Lifecycle
To run an ad-hoc query and tail it in the browser:
- Post the SQL query to
POST /api/v1/queries:POST /api/v1/queries { "sql": "SELECT symbol, price FROM trades WHERE volume > 1000" } - The server registers an ephemeral stream named
__console_<uuid>and returns:{ "stream_id": "__console_<uuid>", "ws_url": "/ws/__console_<uuid>" } - The client connects to the WebSocket at
ws_urlto receive the data stream. - When the WebSocket connection closes or times out, the server automatically
DROPs the ephemeral stream and releases resources.
Realtime Telemetry & WebSocket Protocol
WebSocket client connects to GET /ws/{name}. Messages are serialized using Arrow JSON format:
{
"type": "data",
"subscription_id": 1,
"data": [ { "symbol": "AAPL", "price": 178.5 } ],
"sequence": 42
}
A heartbeat ping frame is sent by the server every 15 seconds; the client must respond to prevent connection termination.
Cluster Management Endpoints
GET /api/v1/cluster/nodes— Returns list of cluster members, including their node IDs, network addresses, and states (Active,Draining,Suspected,Left).GET /api/v1/cluster/vnodes— Returns the current assignment map mapping the 256 virtual nodes (vnodes) to active cluster nodes.GET /api/v1/cluster/leader— Returns node details for the active Raft leader lease holder.GET /api/v1/cluster/checkpoints— Returns history and completion metadata of recent coordinator checkpoints.
REST API
The server exposes an HTTP API on the configured bind address. All responses are JSON except /metrics (Prometheus text format).
Hot Reload
Edit the TOML config file while the server is running. A file watcher detects changes
(500ms debounce, content-hash comparison), diffs the old and new configs, and applies
incremental DDL. Or trigger manually via POST /api/v1/reload.
What reloads without restart: sources, pipelines, sinks, lookups (add, remove, or change).
What requires restart: [server], [state], [checkpoint] sections. A warning is returned if these sections change.
Reload order: remove changed/deleted sinks → pipelines → lookups → sources,
then recreate sources → lookups → pipelines → sinks.
Disable file watcher with LAMINAR_DISABLE_FILE_WATCH=1.
Prometheus Metrics
GET /metrics returns Prometheus text format. Key metrics:
| Metric | Type | Description |
|---|---|---|
| laminardb_events_ingested_total | counter | Total events ingested across all sources |
| laminardb_events_emitted_total | counter | Total events emitted to sinks |
| laminardb_events_dropped_total | counter | Total events dropped (late data, errors) |
| laminardb_source_events_total | counter | Events per source (label: source) |
| laminardb_source_count | gauge | Number of configured sources |
| laminardb_stream_count | gauge | Number of running streams |
| laminardb_sink_count | gauge | Number of configured sinks |
| laminardb_checkpoints_completed_total | counter | Successful checkpoints |
| laminardb_checkpoints_failed_total | counter | Failed checkpoints |
| laminardb_checkpoint_epoch | gauge | Current checkpoint epoch |
| laminardb_checkpoint_last_duration_ms | gauge | Last checkpoint duration (ms) |
| laminardb_cycle_duration_p50_ns | gauge | p50 processing cycle latency (ns) |
| laminardb_cycle_duration_p95_ns | gauge | p95 processing cycle latency (ns) |
| laminardb_cycle_duration_p99_ns | gauge | p99 processing cycle latency (ns) |
| laminardb_uptime_seconds | gauge | Server uptime |
| laminardb_pipeline_state_info | gauge | Pipeline state (label: state) |
| laminardb_reload_total | counter | Config reload count |
| laminardb_reload_last_timestamp | gauge | Unix timestamp of last reload |
[[source]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique source name. Referenced in SQL and sinks. |
| connector | String | required | "kafka", "postgres_cdc", "mysql_cdc", "mongodb-cdc", "nats", "websocket", "files", "delta-lake", "iceberg", "otel", "parquet-lookup", "postgres-lookup" |
| format | String | "json" | "json", "avro", "protobuf", "csv" |
| properties | Table | {} | Connector-specific properties. See connector sections below. |
| schema | Array | [] | Column definitions: {name, type, nullable} |
| watermark | Section | none | Event-time watermark: {column, max_out_of_orderness} |
[[source]]
name = "trades"
connector = "kafka"
format = "json"
[source.properties]
bootstrap.servers = "localhost:9092"
topic = "market-trades"
group.id = "laminar-analytics"
[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false
[[source.schema]]
name = "price"
type = "DOUBLE"
[[source.schema]]
name = "ts"
type = "BIGINT"
[source.watermark]
column = "ts"
max_out_of_orderness = "5s"
[[pipeline]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique pipeline name |
| sql | String | required | SQL query. Multi-line supported with triple-quoted strings. |
| parallelism | usize | none | Optional parallelism override (default: server worker count) |
[[sink]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique sink name |
| pipeline | String | required | Pipeline name this sink reads from. Must exist. |
| connector | String | required | "kafka", "postgres", "mongodb", "nats", "delta-lake", "iceberg", "websocket", "files", "stdout" |
| delivery | String | "at_least_once" | "at_least_once" or "exactly_once" |
| properties | Table | {} | Connector-specific properties |
[[lookup]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique lookup table name. Used in SQL JOINs. |
| connector | String | required | "postgres", "parquet" |
| strategy | String | "poll" | "poll", "cdc", "manual" |
| pushdown | bool | true | Push predicates to source for filtered lookups |
| cache.size_bytes | u64 | 104857600 | Cache size in bytes (100 MB) |
| cache.ttl | Duration | "300s" | Cache entry TTL |
| cache.hybrid | bool | false | Enable in-memory caching via quick_cache |
Connectors
All connectors are feature-gated. Enable via cargo add laminar-db --features kafka,delta-lake.
Apache Iceberg
OpenTelemetry
Kafka Source
Required
| Property | Description |
|---|---|
| bootstrap.servers | Kafka broker addresses (comma-separated) |
| group.id | Consumer group ID |
| topic or topic.pattern | Topic names (comma-separated) or regex pattern. Mutually exclusive. |
Format & Schema
| Property | Default | Description |
|---|---|---|
| format | json | json, avro, protobuf |
| schema.registry.url | Confluent Schema Registry URL | |
| schema.registry.username | Schema Registry auth | |
| schema.registry.password | Schema Registry auth |
Security
| Property | Default | Description |
|---|---|---|
| security.protocol | PLAINTEXT | PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
| sasl.mechanism | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | |
| sasl.username | SASL username | |
| sasl.password | SASL password | |
| ssl.ca.location | CA certificate path |
Consumer Tuning
| Property | Default | Description |
|---|---|---|
| startup.mode | earliest | earliest, latest, none |
| max.poll.records | 1000 | Records per poll batch |
| isolation.level | read_committed | read_uncommitted, read_committed |
| partition.assignment.strategy | range | range, roundrobin, sticky, cooperative-sticky |
| max.out.of.orderness.ms | 5000 | Watermark out-of-order tolerance |
| idle.timeout.ms | 30000 | Idle watermark advancement timeout |
Any property prefixed with kafka. is passed through to the underlying rdkafka consumer.
Kafka Sink
| Property | Default | Description |
|---|---|---|
| bootstrap.servers | required | Kafka broker addresses |
| topic | required | Target topic |
| format | json | json, avro, protobuf |
| delivery.guarantee | at-least-once | at-least-once, exactly-once |
| key.column | Column for partition key | |
| partitioner | key-hash | key-hash, round-robin, sticky |
| compression.type | none | none, gzip, snappy, lz4, zstd |
| acks | all | all, 1, 0 |
| linger.ms | 5 | Batch linger time |
| batch.size | 16384 | Max batch bytes |
| transactional.id | For exactly-once producer |
NATS Source
| Property | Default | Description |
|---|---|---|
| servers | required | NATS server URLs (comma-separated) |
| mode | jetstream | jetstream, core |
| format | json | json, avro, protobuf |
| auth.mode | none | none, user_pass, token, creds_file |
| user | Auth username (required for user_pass) | |
| password | Auth password (required for user_pass) | |
| token | Auth token (required for token) | |
| creds.file | Path to creds file (required for creds_file) | |
| tls.enabled | false | Enable TLS |
| tls.ca.location | CA cert path | |
| tls.cert.location | Client certificate path | |
| tls.key.location | Client key path | |
| stream | NATS JetStream stream name (required in jetstream mode) | |
| subject | Subscription subject (required in core mode) | |
| subject.filters | Comma-separated JetStream subject filters | |
| consumer | Pull consumer durable name (required in jetstream mode) | |
| deliver.policy | all | all, new, by_start_sequence, by_start_time |
| start.sequence | Start sequence (required if deliver.policy=by_start_sequence) | |
| start.time | Start RFC3339 timestamp (required if deliver.policy=by_start_time) | |
| ack.policy | explicit | explicit, none |
| ack.wait.ms | 60000 | Ack wait timeout in milliseconds |
| max.deliver | 5 | Max deliveries |
| max.ack.pending | 10000 | Max outstanding unacknowledged messages |
| fetch.batch | 500 | Batch size for fetch operations |
| fetch.max.wait.ms | 500 | Max wait for fetch operations in milliseconds |
| queue.group | Queue group (core mode only) |
NATS Sink
| Property | Default | Description |
|---|---|---|
| servers | required | NATS server URLs (comma-separated) |
| mode | jetstream | jetstream, core |
| format | json | json, avro, protobuf |
| auth.mode | none | none, user_pass, token, creds_file |
| user | Auth username (required for user_pass) | |
| password | Auth password (required for user_pass) | |
| token | Auth token (required for token) | |
| creds.file | Path to creds file (required for creds_file) | |
| tls.enabled | false | Enable TLS |
| tls.ca.location | CA cert path | |
| tls.cert.location | Client certificate path | |
| tls.key.location | Client key path | |
| stream | Target JetStream stream (required if delivery.guarantee=exactly_once) | |
| subject | Literal publishing subject. Mutually exclusive with subject.column. | |
| subject.column | Per-row subject column name. Mutually exclusive with subject. | |
| expected.stream | Optional stream name assertion header | |
| delivery.guarantee | at-least-once | at-least-once, exactly-once, at-most-once |
| dedup.id.column | Column to use for message deduplication ID (required if exactly_once) | |
| min.duplicate.window.ms | 120000 | Minimum duplicate window in milliseconds |
| max.pending | 4096 | Maximum pending publish futures |
| ack.timeout.ms | 30000 | Ack timeout in milliseconds |
| header.columns | Comma-separated column names to map to NATS headers |
PostgreSQL CDC Source
| Property | Default | Description |
|---|---|---|
| host | required | PostgreSQL hostname |
| port | 5432 | Port |
| database | required | Database name |
| username | postgres | Auth username |
| password | Auth password | |
| slot.name | required | Logical replication slot |
| publication | required | Publication name |
| ssl.mode | prefer | disable, prefer, require, verify-ca, verify-full |
| snapshot.mode | initial | initial, never, always |
| table.include | Comma-separated table filter | |
| table.exclude | Comma-separated table exclusions | |
| max.poll.records | 1000 | Records per poll |
MySQL CDC Source
| Property | Default | Description |
|---|---|---|
| host | required | MySQL hostname |
| port | 3306 | Port |
| username | required | Auth username |
| password | Auth password | |
| database | Database filter | |
| server.id | 1001 | Unique replica server ID |
| use.gtid | true | GTID-based replication |
| gtid.set | Starting GTID set | |
| snapshot.mode | initial | initial, never, always |
| table.include | Comma-separated tables (db.table format) |
PostgreSQL Sink
| Property | Default | Description |
|---|---|---|
| hostname | required | PostgreSQL hostname |
| port | 5432 | Port |
| database | required | Database name |
| username | required | Auth username |
| password | Auth password | |
| table.name | required | Target table |
| schema.name | public | PostgreSQL schema |
| write.mode | append | append (COPY BINARY) or upsert (INSERT ON CONFLICT) |
| primary.key | PK columns for upsert (comma-separated). Required if write.mode=upsert. | |
| batch.size | 4096 | Records per flush |
| pool.size | 4 | Connection pool size |
| delivery.guarantee | at_least_once | at_least_once, exactly_once (2PC) |
| auto.create.table | false | Create table if missing |
MongoDB CDC Source
Consumes MongoDB change streams. Resume tokens provide at-least-once delivery across restarts. Requires MongoDB 4.0+ (replica set) or 4.2+ (sharded cluster).
| Property | Default | Description |
|---|---|---|
| connection.uri | mongodb://localhost:27017 | MongoDB connection string |
| database | required | Database name |
| collection | required | Collection name, or * for all collections in database |
| full.document.mode | delta | delta, update_lookup, required, when_available |
| max.await.time.ms | 1000 | getMore await timeout (milliseconds) |
| batch.size | 1000 | Cursor batch size hint |
| max.poll.records | 1000 | Max records per poll batch |
| max.buffered.events | 100000 | Max buffered events before backpressure |
Full document modes:
delta sends the update description only (default).
update_lookup fetches the current document on updates.
required needs changeStreamPreAndPostImages enabled on the collection.
when_available returns the post-image when present, otherwise null.
CDC envelope schema:
_namespace (Utf8), _op (Utf8: I/U/R/D/DROP),
_document_key (Utf8), _cluster_time_s (UInt32),
_cluster_time_i (UInt32), _wall_time_ms (Int64),
_full_document (Utf8, nullable), _update_desc (Utf8, nullable),
_resume_token (Utf8).
MongoDB Sink
| Property | Default | Description |
|---|---|---|
| connection.uri | mongodb://localhost:27017 | MongoDB connection string |
| database | required | Target database name |
| collection | required | Target collection name |
| write.mode | insert | insert, upsert, replace, cdc_replay |
| batch.size | 500 | Max documents per bulk write |
| flush.interval.ms | 1000 | Max time between flushes (ms) |
| ordered | true | true = fail-fast ordered writes; false = unordered (higher throughput) |
| write.concern.w | majority | majority or integer node count |
| write.concern.journal | true | Wait for journal commit |
Write modes:
insert is append-only.
upsert replaces documents by the fields listed in upsert.key.fields.
replace rewrites the full document.
cdc_replay routes by the _op column (I→insert, U→update, R→replace, D→delete).
Time series collections only support insert.
Delta Lake Source
| Property | Default | Description |
|---|---|---|
| table.path | required | Delta table path (local, s3://, az://, gs://) |
| read.mode | incremental | snapshot (full read) or incremental (change feed) |
| starting.version | Version to start from | |
| poll.interval.ms | 1000 | Version poll interval |
| partition.filter | Partition filter (SQL expression) | |
| catalog.type | none | none, glue, unity |
Delta Lake Sink
| Property | Default | Description |
|---|---|---|
| table.path | required | Delta table path |
| write.mode | append | append, overwrite, upsert |
| merge.key.columns | Key columns for upsert (comma-separated) | |
| partition.columns | Partition columns (comma-separated) | |
| target.file.size | 134217728 | Target Parquet file size (128 MB) |
| max.buffer.records | 100000 | Records before flush |
| max.buffer.duration.ms | 60000 | Time before flush |
| delivery.guarantee | at-least-once | at-least-once, exactly-once |
| writer.id | Writer ID (required for exactly-once) | |
| schema.evolution | false | Auto-merge new columns |
| compaction.enabled | false | Enable automatic compaction |
| vacuum.retention.hours | 168 | Retention hours (7 days) |
| catalog.type | none | none, glue, unity |
Cloud storage credentials: use storage.aws_access_key_id, storage.aws_secret_access_key, storage.aws_region, etc.
WebSocket Source
Note: WebSocket is non-replayable. At-most-once delivery on recovery.
| Property | Default | Description |
|---|---|---|
| mode | client | client (connect out) or server (accept connections) |
| url | WebSocket URL (client mode, comma-separated for multi) | |
| bind.address | Bind address (server mode) | |
| format | json | json, jsonlines, binary, csv |
| subscribe.message | Message sent after handshake (client mode) | |
| reconnect.enabled | true | Auto-reconnect (client mode) |
| reconnect.max.delay.ms | 30000 | Max backoff delay |
| ping.interval.ms | 30000 | Keepalive ping interval |
| auth.type | bearer, basic, hmac |
WebSocket Sink
| Property | Default | Description |
|---|---|---|
| mode | server | server (accept clients) or client (push to remote) |
| bind.address | Bind address (server mode) | |
| url | Remote URL (client mode) | |
| format | json | json, jsonlines, arrow_ipc, binary |
| max.connections | 10000 | Max concurrent clients (server mode) |
| slow.client.policy | drop_oldest | drop_oldest, drop_newest, disconnect |
| replay.buffer.size | Messages replayed to new clients |
File Source (Auto-Loader)
| Property | Default | Description |
|---|---|---|
| path | required | Directory, glob, or cloud URL |
| format | csv, json, text, parquet (auto-detected if omitted) | |
| poll_interval | 10000 | Directory poll interval (ms) |
| max_files_per_poll | 100 | Files per poll cycle |
| csv.delimiter | CSV delimiter character | |
| csv.has.header | true | CSV has header row |
| include_metadata | false | Include _metadata column |
File Sink
| Property | Default | Description |
|---|---|---|
| path | required | Output directory |
| format | required | csv, json, text, parquet |
| mode | rolling | append (single file) or rolling (rotate) |
| prefix | part | File name prefix (rolling mode) |
| compression | snappy | Parquet: snappy, gzip, brotli, lz4, zstd, uncompressed |
Join Semantics
LaminarDB supports five join types. All are stateful and operate on streaming data. Join types are detected from SQL syntax automatically.
| Join Type | Time Bound | Match | Use Case |
|---|---|---|---|
| Interval | Required | All pairs within window | Stream-stream correlation |
| ASOF | Optional tolerance | One closest per key (backward / forward / nearest) | Trade/quote enrichment |
| Temporal Probe | Explicit offsets | N rows per left event at fixed offsets | Impact curves, lagged feature lookups |
| Temporal | Implicit (version time) | Point-in-time lookup | Versioned dimension tables |
| Lookup | None | Key equality | Static reference enrichment |
Interval Join (Stream-Stream)
Matches all pairs of rows from two streams where timestamps fall within a
time bound: |left_ts - right_ts| <= time_bound.
Both sides are buffered in state. Expired rows are evicted when the
watermark advances past the bound.
Supported types: INNER, LEFT, RIGHT, FULL, LEFT SEMI, LEFT ANTI, RIGHT SEMI, RIGHT ANTI
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
SELECT o.order_id, o.amount
FROM orders o
LEFT SEMI JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
State: Both sides buffered as Arrow batches with per-key
BTreeMap<timestamp> indices. Time columns are extracted
automatically from the BETWEEN clause. New left rows probe all
right state; new right rows probe only old left state (avoids double-emit).
Checkpointed via Arrow IPC serialization.
ASOF Join
For each left row, finds the one closest right row by timestamp
within the same key partition. Uses Snowflake-compatible MATCH_CONDITION syntax.
Supported types: INNER, LEFT
Three directions
| Direction | Syntax | Behavior |
|---|---|---|
| Backward | t.ts >= q.ts |
Most recent prior right event |
| Forward | t.ts <= q.ts |
Next future right event |
| Nearest | NEAREST(t.ts, q.ts) |
Closest by absolute time difference |
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts)
ON t.symbol = q.symbol;
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts AND t.ts - q.ts <= INTERVAL '5' SECOND)
ON t.symbol = q.symbol;
SELECT s.sensor_id, s.reading, c.calibration
FROM sensor_data s
ASOF JOIN calibrations c
MATCH_CONDITION(NEAREST(s.ts, c.ts))
ON s.sensor_id = c.sensor_id;
State: Right-side events stored in per-key
BTreeMap<timestamp, Vec<row_index>> for O(log n) temporal lookups.
Multiple rows at the same timestamp are handled correctly.
Key columns support VARCHAR and BIGINT types.
Temporal Probe Join
For each left event, produces N output rows by probing the right stream at
a fixed list of time offsets. Each probe returns the ASOF-matched right
value at left.event_time + offset. Useful for lagged feature
lookups and impact curves.
Offset specification
| Syntax | Description |
|---|---|
| RANGE FROM <start> TO <end> STEP <step> | Uniform range (all in milliseconds) |
| LIST (<ms>, <ms>, ...) | Explicit list of offsets in milliseconds |
Directions: Backward, Forward,
and Nearest. Each probed offset is resolved using the same
semantics as a standalone ASOF join in that direction.
Temporal Join (FOR SYSTEM_TIME AS OF)
Point-in-time lookup against a versioned table. For each stream event,
looks up the table row valid at the event's timestamp. Uses SQL:2011
FOR SYSTEM_TIME AS OF syntax.
Supported types: INNER, LEFT
SELECT o.order_id, o.amount, r.rate,
o.amount * r.rate AS converted
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
Versioned tables: The right-side table must have a timestamp
column marking version validity. Event-time semantics are deterministic;
process-time (PROCTIME()) is non-deterministic.
Lookup Join
Enriches stream events against a static or slowly-changing reference table. Matches by key equality with no time bound. Results are cached with a configurable TTL.
Supported types: INNER, LEFT
SELECT o.order_id, o.amount, p.name, p.category
FROM orders o
LEFT JOIN products p
ON o.product_id = p.id;
Cache: quick_cache-backed in-memory cache. Configure via
[[lookup]] section: cache.size_bytes,
cache.ttl, cache.hybrid.
Supports predicate pushdown to the lookup connector.
Environment Variables
All string values in the TOML configuration file support ${VAR} substitution.
Use ${VAR:-default} for optional variables with fallbacks.
Missing variables without defaults cause a startup error.
SQL Queries: Because substitution is performed on the raw configuration file contents before parsing, environment variables can be embedded directly within SQL statements defined in the TOML file (such as in [[pipeline]] SQL queries or the top-level sql string).
Dynamic Queries: Environment variables are not expanded in queries submitted dynamically after startup (e.g., via the pgwire protocol or the POST /api/v1/sql HTTP endpoint).
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS:-localhost:9092}"
sasl.username = "${KAFKA_USER}"
sasl.password = "${KAFKA_PASSWORD}"
[checkpoint]
url = "s3://${S3_BUCKET}/checkpoints"
[checkpoint.storage]
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
S3 Storage Class Tiering
Active checkpoints use the hot tier. Older checkpoints automatically tier down via S3 Lifecycle rules.
| Field | Default | Description |
|---|---|---|
| hot_class | "STANDARD" | Storage class for active checkpoints (e.g. "EXPRESS_ONE_ZONE") |
| warm_class | "STANDARD" | Storage class for older checkpoints |
| cold_class | "" | Archive class (e.g. "GLACIER_IR"). Empty = no cold tier. |
| hot_retention | "24h" | Time before hot → warm |
| warm_retention | "7d" | Time before warm → cold |
Delivery Guarantees
| Guarantee | How it works | Requirements |
|---|---|---|
| at_least_once | Default. On recovery, sources replay from last checkpoint offset. Some records may be reprocessed. | Checkpointing enabled. Works with all connectors. |
| exactly_once | Barrier-aligned checkpoints with two-phase commit to sinks. No duplicates in output. | All sources must support replay. Sinks must support 2PC. Currently: Kafka sink and PostgreSQL sink. |
WebSocket sources are non-replayable. Under exactly-once mode, they degrade to at-most-once delivery with a warning.
Full Config Example
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"
[state]
backend = "in_process"
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"
[checkpoint.storage]
aws_region = "us-east-1"
# --- Sources ---
[[source]]
name = "trades"
connector = "kafka"
format = "avro"
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "market-trades"
group.id = "laminar-vwap"
schema.registry.url = "${SCHEMA_REGISTRY_URL}"
startup.mode = "earliest"
[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false
[[source.schema]]
name = "price"
type = "DOUBLE"
[[source.schema]]
name = "volume"
type = "BIGINT"
[[source.schema]]
name = "ts"
type = "BIGINT"
[source.watermark]
column = "ts"
max_out_of_orderness = "5s"
# --- Lookups ---
[[lookup]]
name = "instruments"
connector = "postgres"
strategy = "poll"
pushdown = true
[lookup.properties]
hostname = "db.internal"
database = "reference_data"
username = "reader"
table.name = "instruments"
[lookup.cache]
size_bytes = 104857600
ttl = "5m"
# --- Pipelines ---
[[pipeline]]
name = "vwap"
sql = """
SELECT t.symbol,
SUM(t.price * CAST(t.volume AS DOUBLE))
/ SUM(CAST(t.volume AS DOUBLE)) AS vwap,
i.exchange
FROM trades t
JOIN instruments i ON t.symbol = i.symbol
GROUP BY t.symbol, i.exchange, tumble(t.ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
"""
# --- Sinks ---
[[sink]]
name = "kafka_output"
pipeline = "vwap"
connector = "kafka"
delivery = "exactly_once"
[sink.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "vwap-1m"
format = "json"
key.column = "symbol"
compression.type = "lz4"
[[sink]]
name = "lake"
pipeline = "vwap"
connector = "delta-lake"
delivery = "exactly_once"
[sink.properties]
table.path = "s3://my-bucket/delta/vwap"
partition.columns = "exchange"
writer.id = "vwap-writer"