Configuration Reference

Every field on this page is verified against config.rs. TOML examples use exact field names from the Rust structs.

[server]

FieldTypeDefaultDescription
mode String "embedded" "embedded" (single-node) or "delta" (multi-node, coming soon)
bind String "127.0.0.1:8080" HTTP API bind address
workers usize 0 Worker threads. 0 = auto-detect CPU count.
log_level String "info" trace, debug, info, warn, error
laminardb.toml
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"

[state]

FieldTypeDefaultDescription
backend String "memory" "memory", "mmap", or "disaggregated"
path String "./data/state" Persistent state directory (mmap backend)

[checkpoint]

FieldTypeDefaultDescription
url String "file:///tmp/laminardb/checkpoints" Storage URL. file:///, s3://, gs://
interval Duration "10s" Checkpoint interval. Supports humantime: "10s", "1m", "500ms"
storage Map {} Cloud storage credentials. Keys: aws_access_key_id, aws_region, etc.
tiering Section none Optional S3 storage class tiering. See Tiering.
S3 checkpoint example
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"

[checkpoint.storage]
aws_region = "us-east-1"
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

REST API

The server exposes an HTTP API on the configured bind address. All responses are JSON except /metrics (Prometheus text format).

GET /health Liveness probe
GET /ready Readiness probe (pipelines started)
GET /metrics Prometheus text metrics
GET /api/v1/sources List configured sources
GET /api/v1/sinks List configured sinks
GET /api/v1/streams List running streams
GET /api/v1/streams/{name} Stream detail by name
POST /api/v1/checkpoint Trigger an immediate checkpoint
POST /api/v1/sql Execute ad-hoc SQL query
POST /api/v1/reload Hot-reload configuration
GET /api/v1/cluster Cluster status (delta mode only)

[[source]]

FieldTypeDefaultDescription
name String required Unique source name. Referenced in SQL and sinks.
connector String required "kafka", "postgres_cdc", "mysql_cdc", "websocket", "files", "delta-lake", "generator"
format String "json" "json", "avro", "protobuf", "csv"
properties Table {} Connector-specific properties. See connector sections below.
schema Array [] Column definitions: {name, type, nullable}
watermark Section none Event-time watermark: {column, max_out_of_orderness}
Source with schema and watermark
[[source]]
name = "trades"
connector = "kafka"
format = "json"

[source.properties]
bootstrap.servers = "localhost:9092"
topic = "market-trades"
group.id = "laminar-analytics"

[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false

[[source.schema]]
name = "price"
type = "DOUBLE"

[[source.schema]]
name = "ts"
type = "BIGINT"

[source.watermark]
column = "ts"
max_out_of_orderness = "5s"

[[pipeline]]

FieldTypeDefaultDescription
name String required Unique pipeline name
sql String required SQL query. Multi-line supported with triple-quoted strings.
parallelism usize none Optional parallelism override (default: server worker count)

[[sink]]

FieldTypeDefaultDescription
name String required Unique sink name
pipeline String required Pipeline name this sink reads from. Must exist.
connector String required "kafka", "postgres", "delta-lake", "websocket", "files", "stdout"
delivery String "at_least_once" "at_least_once" or "exactly_once"
properties Table {} Connector-specific properties

[[lookup]]

FieldTypeDefaultDescription
name String required Unique lookup table name. Used in SQL JOINs.
connector String required "postgres", "mysql", "redis", "csv"
strategy String "poll" "poll", "cdc", "manual"
pushdown bool true Push predicates to source for filtered lookups
cache.size_bytes u64 104857600 Cache size in bytes (100 MB)
cache.ttl Duration "300s" Cache entry TTL
cache.hybrid bool false Enable memory + disk caching via foyer

Connectors

All connectors are feature-gated. Enable via cargo add laminar-db --features kafka,delta-lake.

Kafka

Source + Sink
kafka

PostgreSQL CDC

Source
postgres-cdc

MySQL CDC

Source
mysql-cdc

PostgreSQL Sink

Sink
postgres-sink

Delta Lake

Source + Sink
delta-lake

WebSocket

Source + Sink
websocket

File Auto-Loader

Source + Sink
files

Kafka Source

Required

PropertyDescription
bootstrap.serversKafka broker addresses (comma-separated)
group.idConsumer group ID
topic or topic.patternTopic names (comma-separated) or regex pattern. Mutually exclusive.

Format & Schema

PropertyDefaultDescription
formatjsonjson, avro, protobuf
schema.registry.urlConfluent Schema Registry URL
schema.registry.usernameSchema Registry auth
schema.registry.passwordSchema Registry auth

Security

PropertyDefaultDescription
security.protocolPLAINTEXTPLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
sasl.mechanismPLAIN, SCRAM-SHA-256, SCRAM-SHA-512
sasl.usernameSASL username
sasl.passwordSASL password
ssl.ca.locationCA certificate path

Consumer Tuning

PropertyDefaultDescription
startup.modeearliestearliest, latest, none
max.poll.records1000Records per poll batch
isolation.levelread_committedread_uncommitted, read_committed
partition.assignment.strategyrangerange, roundrobin, sticky, cooperative-sticky
max.out.of.orderness.ms5000Watermark out-of-order tolerance
idle.timeout.ms30000Idle watermark advancement timeout

Any property prefixed with kafka. is passed through to the underlying rdkafka consumer.

Kafka Sink

PropertyDefaultDescription
bootstrap.serversrequiredKafka broker addresses
topicrequiredTarget topic
formatjsonjson, avro, protobuf
delivery.guaranteeat-least-onceat-least-once, exactly-once
key.columnColumn for partition key
partitionerkey-hashkey-hash, round-robin, sticky
compression.typenonenone, gzip, snappy, lz4, zstd
acksallall, 1, 0
linger.ms5Batch linger time
batch.size16384Max batch bytes
transactional.idFor exactly-once producer

PostgreSQL CDC Source

PropertyDefaultDescription
hostrequiredPostgreSQL hostname
port5432Port
databaserequiredDatabase name
usernamepostgresAuth username
passwordAuth password
slot.namerequiredLogical replication slot
publicationrequiredPublication name
ssl.modepreferdisable, allow, prefer, require, verify-ca, verify-full
snapshot.modeinitialinitial, never, always
table.includeComma-separated table filter
table.excludeComma-separated table exclusions
max.poll.records1000Records per poll

MySQL CDC Source

PropertyDefaultDescription
hostrequiredMySQL hostname
port3306Port
usernamerequiredAuth username
passwordAuth password
databaseDatabase filter
server.id1001Unique replica server ID
use.gtidtrueGTID-based replication
gtid.setStarting GTID set
snapshot.modeinitialinitial, never, always
table.includeComma-separated tables (db.table format)

PostgreSQL Sink

PropertyDefaultDescription
hostnamerequiredPostgreSQL hostname
port5432Port
databaserequiredDatabase name
usernamerequiredAuth username
passwordAuth password
table.namerequiredTarget table
schema.namepublicPostgreSQL schema
write.modeappendappend (COPY BINARY) or upsert (INSERT ON CONFLICT)
primary.keyPK columns for upsert (comma-separated). Required if write.mode=upsert.
batch.size4096Records per flush
pool.size4Connection pool size
delivery.guaranteeat_least_onceat_least_once, exactly_once (2PC)
auto.create.tablefalseCreate table if missing

Delta Lake Source

PropertyDefaultDescription
table.pathrequiredDelta table path (local, s3://, az://, gs://)
read.modeincrementalsnapshot (full read) or incremental (change feed)
starting.versionVersion to start from
poll.interval.ms1000Version poll interval
partition.filterPartition filter (SQL expression)
catalog.typenonenone, glue, unity

Delta Lake Sink

PropertyDefaultDescription
table.pathrequiredDelta table path
write.modeappendappend, overwrite, upsert
merge.key.columnsKey columns for upsert (comma-separated)
partition.columnsPartition columns (comma-separated)
target.file.size134217728Target Parquet file size (128 MB)
max.buffer.records100000Records before flush
max.buffer.duration.ms60000Time before flush
delivery.guaranteeat-least-onceat-least-once, exactly-once
writer.idWriter ID (required for exactly-once)
schema.evolutionfalseAuto-merge new columns
compaction.enabledfalseEnable automatic compaction
vacuum.retention.hours168Retention hours (7 days)
catalog.typenonenone, glue, unity

Cloud storage credentials: use storage.aws_access_key_id, storage.aws_secret_access_key, storage.aws_region, etc.

WebSocket Source

Note: WebSocket is non-replayable. At-most-once delivery on recovery.

PropertyDefaultDescription
modeclientclient (connect out) or server (accept connections)
urlWebSocket URL (client mode, comma-separated for multi)
bind.addressBind address (server mode)
formatjsonjson, jsonlines, binary, csv
subscribe.messageMessage sent after handshake (client mode)
reconnect.enabledtrueAuto-reconnect (client mode)
reconnect.max.delay.ms30000Max backoff delay
ping.interval.ms30000Keepalive ping interval
auth.typebearer, basic, hmac

WebSocket Sink

PropertyDefaultDescription
modeserverserver (accept clients) or client (push to remote)
bind.addressBind address (server mode)
urlRemote URL (client mode)
formatjsonjson, jsonlines, arrow_ipc, binary
max.connections10000Max concurrent clients (server mode)
slow.client.policydrop_oldestdrop_oldest, drop_newest, disconnect
replay.buffer.sizeMessages replayed to new clients

File Source (Auto-Loader)

PropertyDefaultDescription
pathrequiredDirectory, glob, or cloud URL
formatcsv, json, text, parquet (auto-detected if omitted)
poll_interval10000Directory poll interval (ms)
max_files_per_poll100Files per poll cycle
csv.delimiterCSV delimiter character
csv.has.headertrueCSV has header row
include_metadatafalseInclude _metadata column

File Sink

PropertyDefaultDescription
pathrequiredOutput directory
formatrequiredcsv, json, text, parquet
moderollingappend (single file) or rolling (rotate)
prefixpartFile name prefix (rolling mode)
compressionsnappyParquet: snappy, gzip, brotli, lz4, zstd, uncompressed

Join Semantics

LaminarDB supports four join types. All are stateful and operate on streaming data. Join types are detected from SQL syntax automatically.

Join TypeTime BoundMatchUse Case
Interval Required All pairs within window Stream-stream correlation
ASOF Optional tolerance One closest per key Trade/quote enrichment
Temporal Implicit (version time) Point-in-time lookup Versioned dimension tables
Lookup None Key equality Static reference enrichment

Interval Join (Stream-Stream)

Matches all pairs of rows from two streams where timestamps fall within a time bound: |left_ts - right_ts| <= time_bound. Both sides are buffered in state. Expired rows are evicted when the watermark advances past the bound.

Supported types: INNER, LEFT, RIGHT, FULL, LEFT SEMI, LEFT ANTI, RIGHT SEMI, RIGHT ANTI

Interval join — match orders to payments within 1 hour
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p
    ON o.order_id = p.order_id
    AND p.ts BETWEEN o.ts AND o.ts + 3600000;
Semi join — orders that have at least one payment
SELECT o.order_id, o.amount
FROM orders o
LEFT SEMI JOIN payments p
    ON o.order_id = p.order_id
    AND p.ts BETWEEN o.ts AND o.ts + 3600000;

State: Both sides buffered as Arrow batches with per-key BTreeMap<timestamp> indices. Time columns are extracted automatically from the BETWEEN clause. New left rows probe all right state; new right rows probe only old left state (avoids double-emit). Checkpointed via Arrow IPC serialization.

ASOF Join

For each left row, finds the one closest right row by timestamp within the same key partition. Uses Snowflake-compatible MATCH_CONDITION syntax.

Supported types: INNER, LEFT

Three directions

DirectionSyntaxBehavior
Backward t.ts >= q.ts Most recent prior right event
Forward t.ts <= q.ts Next future right event
Nearest NEAREST(t.ts, q.ts) Closest by absolute time difference
ASOF backward — enrich trades with most recent quote
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
    MATCH_CONDITION(t.ts >= q.ts)
    ON t.symbol = q.symbol;
ASOF with tolerance — max 5 second gap
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
    MATCH_CONDITION(t.ts >= q.ts AND t.ts - q.ts <= INTERVAL '5' SECOND)
    ON t.symbol = q.symbol;
ASOF nearest — closest event by absolute time
SELECT s.sensor_id, s.reading, c.calibration
FROM sensor_data s
ASOF JOIN calibrations c
    MATCH_CONDITION(NEAREST(s.ts, c.ts))
    ON s.sensor_id = c.sensor_id;

State: Right-side events stored in per-key BTreeMap<timestamp, Vec<row_index>> for O(log n) temporal lookups. Multiple rows at the same timestamp are handled correctly. Key columns support VARCHAR and BIGINT types.

Temporal Join (FOR SYSTEM_TIME AS OF)

Point-in-time lookup against a versioned table. For each stream event, looks up the table row valid at the event's timestamp. Uses SQL:2011 FOR SYSTEM_TIME AS OF syntax.

Supported types: INNER, LEFT

Temporal join — currency rate valid at order time
SELECT o.order_id, o.amount, r.rate,
       o.amount * r.rate AS converted
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
    ON o.currency = r.currency;

Versioned tables: The right-side table must have a timestamp column marking version validity. Event-time semantics are deterministic; process-time (PROCTIME()) is non-deterministic.

Lookup Join

Enriches stream events against a static or slowly-changing reference table. No time bound — matches by key equality only. Results are cached with configurable TTL.

Supported types: INNER, LEFT

Lookup join — enrich orders with product info
SELECT o.order_id, o.amount, p.name, p.category
FROM orders o
LEFT JOIN products p
    ON o.product_id = p.id;

Cache: Foyer-backed memory/hybrid cache. Configure via [[lookup]] section: cache.size_bytes, cache.ttl, cache.hybrid. Supports predicate pushdown to the lookup connector.

Environment Variables

All string values in TOML support ${VAR} substitution. Use ${VAR:-default} for optional variables with fallbacks. Missing variables without defaults cause a startup error.

Environment variable examples
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS:-localhost:9092}"
sasl.username = "${KAFKA_USER}"
sasl.password = "${KAFKA_PASSWORD}"

[checkpoint]
url = "s3://${S3_BUCKET}/checkpoints"

[checkpoint.storage]
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

S3 Storage Class Tiering

Active checkpoints use the hot tier. Older checkpoints automatically tier down via S3 Lifecycle rules.

FieldDefaultDescription
hot_class"STANDARD"Storage class for active checkpoints (e.g. "EXPRESS_ONE_ZONE")
warm_class"STANDARD"Storage class for older checkpoints
cold_class""Archive class (e.g. "GLACIER_IR"). Empty = no cold tier.
hot_retention"24h"Time before hot → warm
warm_retention"7d"Time before warm → cold

Delivery Guarantees

GuaranteeHow it worksRequirements
at_least_once Default. On recovery, sources replay from last checkpoint offset. Some records may be reprocessed. Checkpointing enabled. Works with all connectors.
exactly_once Barrier-aligned checkpoints with two-phase commit to sinks. No duplicates in output. All sources must support replay. Sinks must support 2PC. Currently: Kafka sink and PostgreSQL sink.

WebSocket sources are non-replayable. Under exactly-once mode, they degrade to at-most-once delivery with a warning.

Full Config Example

laminardb.toml
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"

[state]
backend = "memory"

[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"

[checkpoint.storage]
aws_region = "us-east-1"

# --- Sources ---

[[source]]
name = "trades"
connector = "kafka"
format = "avro"

[source.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "market-trades"
group.id = "laminar-vwap"
schema.registry.url = "${SCHEMA_REGISTRY_URL}"
startup.mode = "earliest"

[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false

[[source.schema]]
name = "price"
type = "DOUBLE"

[[source.schema]]
name = "volume"
type = "BIGINT"

[[source.schema]]
name = "ts"
type = "BIGINT"

[source.watermark]
column = "ts"
max_out_of_orderness = "5s"

# --- Lookups ---

[[lookup]]
name = "instruments"
connector = "postgres"
strategy = "poll"
pushdown = true

[lookup.properties]
hostname = "db.internal"
database = "reference_data"
username = "reader"
table.name = "instruments"

[lookup.cache]
size_bytes = 104857600
ttl = "5m"

# --- Pipelines ---

[[pipeline]]
name = "vwap"
sql = """
SELECT t.symbol,
       SUM(t.price * CAST(t.volume AS DOUBLE))
         / SUM(CAST(t.volume AS DOUBLE)) AS vwap,
       i.exchange
FROM trades t
JOIN instruments i ON t.symbol = i.symbol
GROUP BY t.symbol, i.exchange, tumble(t.ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
"""

# --- Sinks ---

[[sink]]
name = "kafka_output"
pipeline = "vwap"
connector = "kafka"
delivery = "exactly_once"

[sink.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "vwap-1m"
format = "json"
key.column = "symbol"
compression.type = "lz4"

[[sink]]
name = "lake"
pipeline = "vwap"
connector = "delta-lake"
delivery = "exactly_once"

[sink.properties]
table.path = "s3://my-bucket/delta/vwap"
partition.columns = "exchange"
writer.id = "vwap-writer"