Configuration Reference
Every field on this page is verified against
config.rs.
TOML examples use exact field names from the Rust structs.
[server]
| Field | Type | Default | Description |
| mode |
String |
"embedded" |
"embedded" (single-node) or "delta" (multi-node, coming soon) |
| bind |
String |
"127.0.0.1:8080" |
HTTP API bind address |
| workers |
usize |
0 |
Worker threads. 0 = auto-detect CPU count. |
| log_level |
String |
"info" |
trace, debug, info, warn, error |
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"
[state]
| Field | Type | Default | Description |
| backend |
String |
"memory" |
"memory", "mmap", or "disaggregated" |
| path |
String |
"./data/state" |
Persistent state directory (mmap backend) |
[checkpoint]
| Field | Type | Default | Description |
| url |
String |
"file:///tmp/laminardb/checkpoints" |
Storage URL. file:///, s3://, gs:// |
| interval |
Duration |
"10s" |
Checkpoint interval. Supports humantime: "10s", "1m", "500ms" |
| storage |
Map |
{} |
Cloud storage credentials. Keys: aws_access_key_id, aws_region, etc. |
| tiering |
Section |
none |
Optional S3 storage class tiering. See Tiering. |
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"
[checkpoint.storage]
aws_region = "us-east-1"
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
REST API
The server exposes an HTTP API on the configured bind address. All responses are JSON except /metrics (Prometheus text format).
GET
/health
Liveness probe
GET
/ready
Readiness probe (pipelines started)
GET
/metrics
Prometheus text metrics
GET
/api/v1/sources
List configured sources
GET
/api/v1/sinks
List configured sinks
GET
/api/v1/streams
List running streams
GET
/api/v1/streams/{name}
Stream detail by name
POST
/api/v1/checkpoint
Trigger an immediate checkpoint
POST
/api/v1/sql
Execute ad-hoc SQL query
POST
/api/v1/reload
Hot-reload configuration
GET
/api/v1/cluster
Cluster status (delta mode only)
[[source]]
| Field | Type | Default | Description |
| name |
String |
required |
Unique source name. Referenced in SQL and sinks. |
| connector |
String |
required |
"kafka", "postgres_cdc", "mysql_cdc", "websocket", "files", "delta-lake", "generator" |
| format |
String |
"json" |
"json", "avro", "protobuf", "csv" |
| properties |
Table |
{} |
Connector-specific properties. See connector sections below. |
| schema |
Array |
[] |
Column definitions: {name, type, nullable} |
| watermark |
Section |
none |
Event-time watermark: {column, max_out_of_orderness} |
[[source]]
name = "trades"
connector = "kafka"
format = "json"
[source.properties]
bootstrap.servers = "localhost:9092"
topic = "market-trades"
group.id = "laminar-analytics"
[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false
[[source.schema]]
name = "price"
type = "DOUBLE"
[[source.schema]]
name = "ts"
type = "BIGINT"
[source.watermark]
column = "ts"
max_out_of_orderness = "5s"
[[pipeline]]
| Field | Type | Default | Description |
| name |
String |
required |
Unique pipeline name |
| sql |
String |
required |
SQL query. Multi-line supported with triple-quoted strings. |
| parallelism |
usize |
none |
Optional parallelism override (default: server worker count) |
[[sink]]
| Field | Type | Default | Description |
| name |
String |
required |
Unique sink name |
| pipeline |
String |
required |
Pipeline name this sink reads from. Must exist. |
| connector |
String |
required |
"kafka", "postgres", "delta-lake", "websocket", "files", "stdout" |
| delivery |
String |
"at_least_once" |
"at_least_once" or "exactly_once" |
| properties |
Table |
{} |
Connector-specific properties |
[[lookup]]
| Field | Type | Default | Description |
| name |
String |
required |
Unique lookup table name. Used in SQL JOINs. |
| connector |
String |
required |
"postgres", "mysql", "redis", "csv" |
| strategy |
String |
"poll" |
"poll", "cdc", "manual" |
| pushdown |
bool |
true |
Push predicates to source for filtered lookups |
| cache.size_bytes |
u64 |
104857600 |
Cache size in bytes (100 MB) |
| cache.ttl |
Duration |
"300s" |
Cache entry TTL |
| cache.hybrid |
bool |
false |
Enable memory + disk caching via foyer |
Connectors
All connectors are feature-gated. Enable via cargo add laminar-db --features kafka,delta-lake.
Kafka Source
Required
| Property | Description |
| bootstrap.servers | Kafka broker addresses (comma-separated) |
| group.id | Consumer group ID |
| topic or topic.pattern | Topic names (comma-separated) or regex pattern. Mutually exclusive. |
Format & Schema
| Property | Default | Description |
| format | json | json, avro, protobuf |
| schema.registry.url | | Confluent Schema Registry URL |
| schema.registry.username | | Schema Registry auth |
| schema.registry.password | | Schema Registry auth |
Security
| Property | Default | Description |
| security.protocol | PLAINTEXT | PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
| sasl.mechanism | | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 |
| sasl.username | | SASL username |
| sasl.password | | SASL password |
| ssl.ca.location | | CA certificate path |
Consumer Tuning
| Property | Default | Description |
| startup.mode | earliest | earliest, latest, none |
| max.poll.records | 1000 | Records per poll batch |
| isolation.level | read_committed | read_uncommitted, read_committed |
| partition.assignment.strategy | range | range, roundrobin, sticky, cooperative-sticky |
| max.out.of.orderness.ms | 5000 | Watermark out-of-order tolerance |
| idle.timeout.ms | 30000 | Idle watermark advancement timeout |
Any property prefixed with kafka. is passed through to the underlying rdkafka consumer.
Kafka Sink
| Property | Default | Description |
| bootstrap.servers | required | Kafka broker addresses |
| topic | required | Target topic |
| format | json | json, avro, protobuf |
| delivery.guarantee | at-least-once | at-least-once, exactly-once |
| key.column | | Column for partition key |
| partitioner | key-hash | key-hash, round-robin, sticky |
| compression.type | none | none, gzip, snappy, lz4, zstd |
| acks | all | all, 1, 0 |
| linger.ms | 5 | Batch linger time |
| batch.size | 16384 | Max batch bytes |
| transactional.id | | For exactly-once producer |
PostgreSQL CDC Source
| Property | Default | Description |
| host | required | PostgreSQL hostname |
| port | 5432 | Port |
| database | required | Database name |
| username | postgres | Auth username |
| password | | Auth password |
| slot.name | required | Logical replication slot |
| publication | required | Publication name |
| ssl.mode | prefer | disable, allow, prefer, require, verify-ca, verify-full |
| snapshot.mode | initial | initial, never, always |
| table.include | | Comma-separated table filter |
| table.exclude | | Comma-separated table exclusions |
| max.poll.records | 1000 | Records per poll |
MySQL CDC Source
| Property | Default | Description |
| host | required | MySQL hostname |
| port | 3306 | Port |
| username | required | Auth username |
| password | | Auth password |
| database | | Database filter |
| server.id | 1001 | Unique replica server ID |
| use.gtid | true | GTID-based replication |
| gtid.set | | Starting GTID set |
| snapshot.mode | initial | initial, never, always |
| table.include | | Comma-separated tables (db.table format) |
PostgreSQL Sink
| Property | Default | Description |
| hostname | required | PostgreSQL hostname |
| port | 5432 | Port |
| database | required | Database name |
| username | required | Auth username |
| password | | Auth password |
| table.name | required | Target table |
| schema.name | public | PostgreSQL schema |
| write.mode | append | append (COPY BINARY) or upsert (INSERT ON CONFLICT) |
| primary.key | | PK columns for upsert (comma-separated). Required if write.mode=upsert. |
| batch.size | 4096 | Records per flush |
| pool.size | 4 | Connection pool size |
| delivery.guarantee | at_least_once | at_least_once, exactly_once (2PC) |
| auto.create.table | false | Create table if missing |
Delta Lake Source
| Property | Default | Description |
| table.path | required | Delta table path (local, s3://, az://, gs://) |
| read.mode | incremental | snapshot (full read) or incremental (change feed) |
| starting.version | | Version to start from |
| poll.interval.ms | 1000 | Version poll interval |
| partition.filter | | Partition filter (SQL expression) |
| catalog.type | none | none, glue, unity |
Delta Lake Sink
| Property | Default | Description |
| table.path | required | Delta table path |
| write.mode | append | append, overwrite, upsert |
| merge.key.columns | | Key columns for upsert (comma-separated) |
| partition.columns | | Partition columns (comma-separated) |
| target.file.size | 134217728 | Target Parquet file size (128 MB) |
| max.buffer.records | 100000 | Records before flush |
| max.buffer.duration.ms | 60000 | Time before flush |
| delivery.guarantee | at-least-once | at-least-once, exactly-once |
| writer.id | | Writer ID (required for exactly-once) |
| schema.evolution | false | Auto-merge new columns |
| compaction.enabled | false | Enable automatic compaction |
| vacuum.retention.hours | 168 | Retention hours (7 days) |
| catalog.type | none | none, glue, unity |
Cloud storage credentials: use storage.aws_access_key_id, storage.aws_secret_access_key, storage.aws_region, etc.
WebSocket Source
Note: WebSocket is non-replayable. At-most-once delivery on recovery.
| Property | Default | Description |
| mode | client | client (connect out) or server (accept connections) |
| url | | WebSocket URL (client mode, comma-separated for multi) |
| bind.address | | Bind address (server mode) |
| format | json | json, jsonlines, binary, csv |
| subscribe.message | | Message sent after handshake (client mode) |
| reconnect.enabled | true | Auto-reconnect (client mode) |
| reconnect.max.delay.ms | 30000 | Max backoff delay |
| ping.interval.ms | 30000 | Keepalive ping interval |
| auth.type | | bearer, basic, hmac |
WebSocket Sink
| Property | Default | Description |
| mode | server | server (accept clients) or client (push to remote) |
| bind.address | | Bind address (server mode) |
| url | | Remote URL (client mode) |
| format | json | json, jsonlines, arrow_ipc, binary |
| max.connections | 10000 | Max concurrent clients (server mode) |
| slow.client.policy | drop_oldest | drop_oldest, drop_newest, disconnect |
| replay.buffer.size | | Messages replayed to new clients |
File Source (Auto-Loader)
| Property | Default | Description |
| path | required | Directory, glob, or cloud URL |
| format | | csv, json, text, parquet (auto-detected if omitted) |
| poll_interval | 10000 | Directory poll interval (ms) |
| max_files_per_poll | 100 | Files per poll cycle |
| csv.delimiter | | CSV delimiter character |
| csv.has.header | true | CSV has header row |
| include_metadata | false | Include _metadata column |
File Sink
| Property | Default | Description |
| path | required | Output directory |
| format | required | csv, json, text, parquet |
| mode | rolling | append (single file) or rolling (rotate) |
| prefix | part | File name prefix (rolling mode) |
| compression | snappy | Parquet: snappy, gzip, brotli, lz4, zstd, uncompressed |
Join Semantics
LaminarDB supports four join types. All are stateful and operate on
streaming data. Join types are detected from SQL syntax automatically.
| Join Type | Time Bound | Match | Use Case |
| Interval |
Required |
All pairs within window |
Stream-stream correlation |
| ASOF |
Optional tolerance |
One closest per key |
Trade/quote enrichment |
| Temporal |
Implicit (version time) |
Point-in-time lookup |
Versioned dimension tables |
| Lookup |
None |
Key equality |
Static reference enrichment |
Interval Join (Stream-Stream)
Matches all pairs of rows from two streams where timestamps fall within a
time bound: |left_ts - right_ts| <= time_bound.
Both sides are buffered in state. Expired rows are evicted when the
watermark advances past the bound.
Supported types: INNER, LEFT, RIGHT, FULL, LEFT SEMI, LEFT ANTI, RIGHT SEMI, RIGHT ANTI
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
SELECT o.order_id, o.amount
FROM orders o
LEFT SEMI JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
State: Both sides buffered as Arrow batches with per-key
BTreeMap<timestamp> indices. Time columns are extracted
automatically from the BETWEEN clause. New left rows probe all
right state; new right rows probe only old left state (avoids double-emit).
Checkpointed via Arrow IPC serialization.
ASOF Join
For each left row, finds the one closest right row by timestamp
within the same key partition. Uses Snowflake-compatible MATCH_CONDITION syntax.
Supported types: INNER, LEFT
Three directions
| Direction | Syntax | Behavior |
| Backward |
t.ts >= q.ts |
Most recent prior right event |
| Forward |
t.ts <= q.ts |
Next future right event |
| Nearest |
NEAREST(t.ts, q.ts) |
Closest by absolute time difference |
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts)
ON t.symbol = q.symbol;
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts AND t.ts - q.ts <= INTERVAL '5' SECOND)
ON t.symbol = q.symbol;
SELECT s.sensor_id, s.reading, c.calibration
FROM sensor_data s
ASOF JOIN calibrations c
MATCH_CONDITION(NEAREST(s.ts, c.ts))
ON s.sensor_id = c.sensor_id;
State: Right-side events stored in per-key
BTreeMap<timestamp, Vec<row_index>> for O(log n) temporal lookups.
Multiple rows at the same timestamp are handled correctly.
Key columns support VARCHAR and BIGINT types.
Temporal Join (FOR SYSTEM_TIME AS OF)
Point-in-time lookup against a versioned table. For each stream event,
looks up the table row valid at the event's timestamp. Uses SQL:2011
FOR SYSTEM_TIME AS OF syntax.
Supported types: INNER, LEFT
SELECT o.order_id, o.amount, r.rate,
o.amount * r.rate AS converted
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
Versioned tables: The right-side table must have a timestamp
column marking version validity. Event-time semantics are deterministic;
process-time (PROCTIME()) is non-deterministic.
Lookup Join
Enriches stream events against a static or slowly-changing reference table.
No time bound — matches by key equality only. Results are cached with
configurable TTL.
Supported types: INNER, LEFT
SELECT o.order_id, o.amount, p.name, p.category
FROM orders o
LEFT JOIN products p
ON o.product_id = p.id;
Cache: Foyer-backed memory/hybrid cache. Configure via
[[lookup]] section: cache.size_bytes,
cache.ttl, cache.hybrid.
Supports predicate pushdown to the lookup connector.
Environment Variables
All string values in TOML support ${VAR} substitution.
Use ${VAR:-default} for optional variables with fallbacks.
Missing variables without defaults cause a startup error.
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS:-localhost:9092}"
sasl.username = "${KAFKA_USER}"
sasl.password = "${KAFKA_PASSWORD}"
[checkpoint]
url = "s3://${S3_BUCKET}/checkpoints"
[checkpoint.storage]
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
S3 Storage Class Tiering
Active checkpoints use the hot tier. Older checkpoints automatically tier down via S3 Lifecycle rules.
| Field | Default | Description |
| hot_class | "STANDARD" | Storage class for active checkpoints (e.g. "EXPRESS_ONE_ZONE") |
| warm_class | "STANDARD" | Storage class for older checkpoints |
| cold_class | "" | Archive class (e.g. "GLACIER_IR"). Empty = no cold tier. |
| hot_retention | "24h" | Time before hot → warm |
| warm_retention | "7d" | Time before warm → cold |
Delivery Guarantees
| Guarantee | How it works | Requirements |
| at_least_once |
Default. On recovery, sources replay from last checkpoint offset. Some records may be reprocessed. |
Checkpointing enabled. Works with all connectors. |
| exactly_once |
Barrier-aligned checkpoints with two-phase commit to sinks. No duplicates in output. |
All sources must support replay. Sinks must support 2PC. Currently: Kafka sink and PostgreSQL sink. |
WebSocket sources are non-replayable. Under exactly-once mode,
they degrade to at-most-once delivery with a warning.
Full Config Example
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"
[state]
backend = "memory"
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"
[checkpoint.storage]
aws_region = "us-east-1"
# --- Sources ---
[[source]]
name = "trades"
connector = "kafka"
format = "avro"
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "market-trades"
group.id = "laminar-vwap"
schema.registry.url = "${SCHEMA_REGISTRY_URL}"
startup.mode = "earliest"
[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false
[[source.schema]]
name = "price"
type = "DOUBLE"
[[source.schema]]
name = "volume"
type = "BIGINT"
[[source.schema]]
name = "ts"
type = "BIGINT"
[source.watermark]
column = "ts"
max_out_of_orderness = "5s"
# --- Lookups ---
[[lookup]]
name = "instruments"
connector = "postgres"
strategy = "poll"
pushdown = true
[lookup.properties]
hostname = "db.internal"
database = "reference_data"
username = "reader"
table.name = "instruments"
[lookup.cache]
size_bytes = 104857600
ttl = "5m"
# --- Pipelines ---
[[pipeline]]
name = "vwap"
sql = """
SELECT t.symbol,
SUM(t.price * CAST(t.volume AS DOUBLE))
/ SUM(CAST(t.volume AS DOUBLE)) AS vwap,
i.exchange
FROM trades t
JOIN instruments i ON t.symbol = i.symbol
GROUP BY t.symbol, i.exchange, tumble(t.ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
"""
# --- Sinks ---
[[sink]]
name = "kafka_output"
pipeline = "vwap"
connector = "kafka"
delivery = "exactly_once"
[sink.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "vwap-1m"
format = "json"
key.column = "symbol"
compression.type = "lz4"
[[sink]]
name = "lake"
pipeline = "vwap"
connector = "delta-lake"
delivery = "exactly_once"
[sink.properties]
table.path = "s3://my-bucket/delta/vwap"
partition.columns = "exchange"
writer.id = "vwap-writer"