Standalone Server & Configuration Reference
The laminardb binary reads a TOML configuration file, constructs streaming pipelines, and serves a REST API with Prometheus metrics.
Every field on this page is verified against
config.rs.
Last updated: 9 April 2026.
Install
# Pre-built binary (Linux x86_64)
curl -LO "https://github.com/laminardb/laminardb/releases/latest/download/laminardb-server-x86_64-unknown-linux-gnu-v0.20.0.tar.gz"
tar xzf laminardb-server-*.tar.gz
# Or build from source
cargo install laminar-server
# Or Docker
docker run -d -p 8080:8080 ghcr.io/laminardb/laminardb-server:latest
# Run
./laminardb --config laminardb.toml
CLI Options
| Flag | Default | Description |
|---|---|---|
| --config <FILE> | laminardb.toml | TOML configuration file path |
| --log-level <LEVEL> | info | trace, debug, info, warn, error |
| --admin-bind <ADDR> | Override HTTP bind address from config | |
| --validate-checkpoints | Validate all stored checkpoints and exit |
[server]
| Field | Type | Default | Description |
|---|---|---|---|
| mode | String | "embedded" | "embedded" (single-node) or "delta" (multi-node, coming soon) |
| bind | String | "127.0.0.1:8080" | HTTP API bind address |
| workers | usize | 0 | Worker threads. 0 = auto-detect CPU count. |
| log_level | String | "info" | trace, debug, info, warn, error |
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"
[state]
| Field | Type | Default | Description |
|---|---|---|---|
| backend | String | "memory" | "memory", "mmap", or "disaggregated" |
| path | String | "./data/state" | Persistent state directory (mmap backend) |
[checkpoint]
| Field | Type | Default | Description |
|---|---|---|---|
| url | String | "file:///tmp/laminardb/checkpoints" | Storage URL. file:///, s3://, gs:// |
| interval | Duration | "10s" | Checkpoint interval. Supports humantime: "10s", "1m", "500ms" |
| storage | Map | {} | Cloud storage credentials. Keys: aws_access_key_id, aws_region, etc. |
| tiering | Section | none | Optional S3 storage class tiering. See Tiering. |
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"
[checkpoint.storage]
aws_region = "us-east-1"
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
REST API
The server exposes an HTTP API on the configured bind address. All responses are JSON except /metrics (Prometheus text format).
Hot Reload
Edit the TOML config file while the server is running. A file watcher detects changes
(500ms debounce, content-hash comparison), diffs the old and new configs, and applies
incremental DDL. Or trigger manually via POST /api/v1/reload.
What reloads without restart: sources, pipelines, sinks, lookups (add, remove, or change).
What requires restart: [server], [state], [checkpoint] sections. A warning is returned if these sections change.
Reload order: remove changed/deleted sinks → pipelines → lookups → sources,
then recreate sources → lookups → pipelines → sinks.
Disable file watcher with LAMINAR_DISABLE_FILE_WATCH=1.
Prometheus Metrics
GET /metrics returns Prometheus text format. Key metrics:
| Metric | Type | Description |
|---|---|---|
| laminardb_events_ingested_total | counter | Total events ingested across all sources |
| laminardb_events_emitted_total | counter | Total events emitted to sinks |
| laminardb_events_dropped_total | counter | Total events dropped (late data, errors) |
| laminardb_source_events_total | counter | Events per source (label: source) |
| laminardb_source_count | gauge | Number of configured sources |
| laminardb_stream_count | gauge | Number of running streams |
| laminardb_sink_count | gauge | Number of configured sinks |
| laminardb_checkpoints_completed_total | counter | Successful checkpoints |
| laminardb_checkpoints_failed_total | counter | Failed checkpoints |
| laminardb_checkpoint_epoch | gauge | Current checkpoint epoch |
| laminardb_checkpoint_last_duration_ms | gauge | Last checkpoint duration (ms) |
| laminardb_cycle_duration_p50_ns | gauge | p50 processing cycle latency (ns) |
| laminardb_cycle_duration_p95_ns | gauge | p95 processing cycle latency (ns) |
| laminardb_cycle_duration_p99_ns | gauge | p99 processing cycle latency (ns) |
| laminardb_uptime_seconds | gauge | Server uptime |
| laminardb_pipeline_state_info | gauge | Pipeline state (label: state) |
| laminardb_reload_total | counter | Config reload count |
| laminardb_reload_last_timestamp | gauge | Unix timestamp of last reload |
[[source]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique source name. Referenced in SQL and sinks. |
| connector | String | required | "kafka", "postgres_cdc", "mysql_cdc", "mongodb-cdc", "websocket", "files", "delta-lake", "iceberg", "otel", "parquet-lookup", "postgres-lookup" |
| format | String | "json" | "json", "avro", "protobuf", "csv" |
| properties | Table | {} | Connector-specific properties. See connector sections below. |
| schema | Array | [] | Column definitions: {name, type, nullable} |
| watermark | Section | none | Event-time watermark: {column, max_out_of_orderness} |
[[source]]
name = "trades"
connector = "kafka"
format = "json"
[source.properties]
bootstrap.servers = "localhost:9092"
topic = "market-trades"
group.id = "laminar-analytics"
[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false
[[source.schema]]
name = "price"
type = "DOUBLE"
[[source.schema]]
name = "ts"
type = "BIGINT"
[source.watermark]
column = "ts"
max_out_of_orderness = "5s"
[[pipeline]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique pipeline name |
| sql | String | required | SQL query. Multi-line supported with triple-quoted strings. |
| parallelism | usize | none | Optional parallelism override (default: server worker count) |
[[sink]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique sink name |
| pipeline | String | required | Pipeline name this sink reads from. Must exist. |
| connector | String | required | "kafka", "postgres", "mongodb", "delta-lake", "iceberg", "websocket", "files", "stdout" |
| delivery | String | "at_least_once" | "at_least_once" or "exactly_once" |
| properties | Table | {} | Connector-specific properties |
[[lookup]]
| Field | Type | Default | Description |
|---|---|---|---|
| name | String | required | Unique lookup table name. Used in SQL JOINs. |
| connector | String | required | "postgres", "parquet" |
| strategy | String | "poll" | "poll", "cdc", "manual" |
| pushdown | bool | true | Push predicates to source for filtered lookups |
| cache.size_bytes | u64 | 104857600 | Cache size in bytes (100 MB) |
| cache.ttl | Duration | "300s" | Cache entry TTL |
| cache.hybrid | bool | false | Enable memory + disk caching via foyer |
Connectors
All connectors are feature-gated. Enable via cargo add laminar-db --features kafka,delta-lake.
Apache Iceberg
OpenTelemetry
Kafka Source
Required
| Property | Description |
|---|---|
| bootstrap.servers | Kafka broker addresses (comma-separated) |
| group.id | Consumer group ID |
| topic or topic.pattern | Topic names (comma-separated) or regex pattern. Mutually exclusive. |
Format & Schema
| Property | Default | Description |
|---|---|---|
| format | json | json, avro, protobuf |
| schema.registry.url | Confluent Schema Registry URL | |
| schema.registry.username | Schema Registry auth | |
| schema.registry.password | Schema Registry auth |
Security
| Property | Default | Description |
|---|---|---|
| security.protocol | PLAINTEXT | PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
| sasl.mechanism | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | |
| sasl.username | SASL username | |
| sasl.password | SASL password | |
| ssl.ca.location | CA certificate path |
Consumer Tuning
| Property | Default | Description |
|---|---|---|
| startup.mode | earliest | earliest, latest, none |
| max.poll.records | 1000 | Records per poll batch |
| isolation.level | read_committed | read_uncommitted, read_committed |
| partition.assignment.strategy | range | range, roundrobin, sticky, cooperative-sticky |
| max.out.of.orderness.ms | 5000 | Watermark out-of-order tolerance |
| idle.timeout.ms | 30000 | Idle watermark advancement timeout |
Any property prefixed with kafka. is passed through to the underlying rdkafka consumer.
Kafka Sink
| Property | Default | Description |
|---|---|---|
| bootstrap.servers | required | Kafka broker addresses |
| topic | required | Target topic |
| format | json | json, avro, protobuf |
| delivery.guarantee | at-least-once | at-least-once, exactly-once |
| key.column | Column for partition key | |
| partitioner | key-hash | key-hash, round-robin, sticky |
| compression.type | none | none, gzip, snappy, lz4, zstd |
| acks | all | all, 1, 0 |
| linger.ms | 5 | Batch linger time |
| batch.size | 16384 | Max batch bytes |
| transactional.id | For exactly-once producer |
PostgreSQL CDC Source
| Property | Default | Description |
|---|---|---|
| host | required | PostgreSQL hostname |
| port | 5432 | Port |
| database | required | Database name |
| username | postgres | Auth username |
| password | Auth password | |
| slot.name | required | Logical replication slot |
| publication | required | Publication name |
| ssl.mode | prefer | disable, prefer, require, verify-ca, verify-full |
| snapshot.mode | initial | initial, never, always |
| table.include | Comma-separated table filter | |
| table.exclude | Comma-separated table exclusions | |
| max.poll.records | 1000 | Records per poll |
MySQL CDC Source
| Property | Default | Description |
|---|---|---|
| host | required | MySQL hostname |
| port | 3306 | Port |
| username | required | Auth username |
| password | Auth password | |
| database | Database filter | |
| server.id | 1001 | Unique replica server ID |
| use.gtid | true | GTID-based replication |
| gtid.set | Starting GTID set | |
| snapshot.mode | initial | initial, never, always |
| table.include | Comma-separated tables (db.table format) |
PostgreSQL Sink
| Property | Default | Description |
|---|---|---|
| hostname | required | PostgreSQL hostname |
| port | 5432 | Port |
| database | required | Database name |
| username | required | Auth username |
| password | Auth password | |
| table.name | required | Target table |
| schema.name | public | PostgreSQL schema |
| write.mode | append | append (COPY BINARY) or upsert (INSERT ON CONFLICT) |
| primary.key | PK columns for upsert (comma-separated). Required if write.mode=upsert. | |
| batch.size | 4096 | Records per flush |
| pool.size | 4 | Connection pool size |
| delivery.guarantee | at_least_once | at_least_once, exactly_once (2PC) |
| auto.create.table | false | Create table if missing |
MongoDB CDC Source
Consumes MongoDB change streams. Resume tokens provide at-least-once delivery across restarts. Requires MongoDB 4.0+ (replica set) or 4.2+ (sharded cluster).
| Property | Default | Description |
|---|---|---|
| connection.uri | mongodb://localhost:27017 | MongoDB connection string |
| database | required | Database name |
| collection | required | Collection name, or * for all collections in database |
| full.document.mode | delta | delta, update_lookup, required, when_available |
| max.await.time.ms | 1000 | getMore await timeout (milliseconds) |
| batch.size | 1000 | Cursor batch size hint |
| max.poll.records | 1000 | Max records per poll batch |
| max.buffered.events | 100000 | Max buffered events before backpressure |
Full document modes:
delta sends the update description only (default).
update_lookup fetches the current document on updates.
required needs changeStreamPreAndPostImages enabled on the collection.
when_available returns the post-image when present, otherwise null.
CDC envelope schema:
_namespace (Utf8), _op (Utf8: I/U/R/D/DROP),
_document_key (Utf8), _cluster_time_s (UInt32),
_cluster_time_i (UInt32), _wall_time_ms (Int64),
_full_document (Utf8, nullable), _update_desc (Utf8, nullable),
_resume_token (Utf8).
MongoDB Sink
| Property | Default | Description |
|---|---|---|
| connection.uri | mongodb://localhost:27017 | MongoDB connection string |
| database | required | Target database name |
| collection | required | Target collection name |
| write.mode | insert | insert, upsert, replace, cdc_replay |
| batch.size | 500 | Max documents per bulk write |
| flush.interval.ms | 1000 | Max time between flushes (ms) |
| ordered | true | true = fail-fast ordered writes; false = unordered (higher throughput) |
| write.concern.w | majority | majority or integer node count |
| write.concern.journal | true | Wait for journal commit |
Write modes:
insert is append-only.
upsert replaces documents by the fields listed in upsert.key.fields.
replace rewrites the full document.
cdc_replay routes by the _op column (I→insert, U→update, R→replace, D→delete).
Time series collections only support insert.
Delta Lake Source
| Property | Default | Description |
|---|---|---|
| table.path | required | Delta table path (local, s3://, az://, gs://) |
| read.mode | incremental | snapshot (full read) or incremental (change feed) |
| starting.version | Version to start from | |
| poll.interval.ms | 1000 | Version poll interval |
| partition.filter | Partition filter (SQL expression) | |
| catalog.type | none | none, glue, unity |
Delta Lake Sink
| Property | Default | Description |
|---|---|---|
| table.path | required | Delta table path |
| write.mode | append | append, overwrite, upsert |
| merge.key.columns | Key columns for upsert (comma-separated) | |
| partition.columns | Partition columns (comma-separated) | |
| target.file.size | 134217728 | Target Parquet file size (128 MB) |
| max.buffer.records | 100000 | Records before flush |
| max.buffer.duration.ms | 60000 | Time before flush |
| delivery.guarantee | at-least-once | at-least-once, exactly-once |
| writer.id | Writer ID (required for exactly-once) | |
| schema.evolution | false | Auto-merge new columns |
| compaction.enabled | false | Enable automatic compaction |
| vacuum.retention.hours | 168 | Retention hours (7 days) |
| catalog.type | none | none, glue, unity |
Cloud storage credentials: use storage.aws_access_key_id, storage.aws_secret_access_key, storage.aws_region, etc.
WebSocket Source
Note: WebSocket is non-replayable. At-most-once delivery on recovery.
| Property | Default | Description |
|---|---|---|
| mode | client | client (connect out) or server (accept connections) |
| url | WebSocket URL (client mode, comma-separated for multi) | |
| bind.address | Bind address (server mode) | |
| format | json | json, jsonlines, binary, csv |
| subscribe.message | Message sent after handshake (client mode) | |
| reconnect.enabled | true | Auto-reconnect (client mode) |
| reconnect.max.delay.ms | 30000 | Max backoff delay |
| ping.interval.ms | 30000 | Keepalive ping interval |
| auth.type | bearer, basic, hmac |
WebSocket Sink
| Property | Default | Description |
|---|---|---|
| mode | server | server (accept clients) or client (push to remote) |
| bind.address | Bind address (server mode) | |
| url | Remote URL (client mode) | |
| format | json | json, jsonlines, arrow_ipc, binary |
| max.connections | 10000 | Max concurrent clients (server mode) |
| slow.client.policy | drop_oldest | drop_oldest, drop_newest, disconnect |
| replay.buffer.size | Messages replayed to new clients |
File Source (Auto-Loader)
| Property | Default | Description |
|---|---|---|
| path | required | Directory, glob, or cloud URL |
| format | csv, json, text, parquet (auto-detected if omitted) | |
| poll_interval | 10000 | Directory poll interval (ms) |
| max_files_per_poll | 100 | Files per poll cycle |
| csv.delimiter | CSV delimiter character | |
| csv.has.header | true | CSV has header row |
| include_metadata | false | Include _metadata column |
File Sink
| Property | Default | Description |
|---|---|---|
| path | required | Output directory |
| format | required | csv, json, text, parquet |
| mode | rolling | append (single file) or rolling (rotate) |
| prefix | part | File name prefix (rolling mode) |
| compression | snappy | Parquet: snappy, gzip, brotli, lz4, zstd, uncompressed |
Join Semantics
LaminarDB supports five join types. All are stateful and operate on streaming data. Join types are detected from SQL syntax automatically.
| Join Type | Time Bound | Match | Use Case |
|---|---|---|---|
| Interval | Required | All pairs within window | Stream-stream correlation |
| ASOF | Optional tolerance | One closest per key (backward / forward / nearest) | Trade/quote enrichment |
| Temporal Probe | Explicit offsets | N rows per left event at fixed offsets | Impact curves, lagged feature lookups |
| Temporal | Implicit (version time) | Point-in-time lookup | Versioned dimension tables |
| Lookup | None | Key equality | Static reference enrichment |
Interval Join (Stream-Stream)
Matches all pairs of rows from two streams where timestamps fall within a
time bound: |left_ts - right_ts| <= time_bound.
Both sides are buffered in state. Expired rows are evicted when the
watermark advances past the bound.
Supported types: INNER, LEFT, RIGHT, FULL, LEFT SEMI, LEFT ANTI, RIGHT SEMI, RIGHT ANTI
CREATE STREAM matched AS
SELECT o.order_id, o.amount, p.status
FROM orders o
INNER JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
SELECT o.order_id, o.amount
FROM orders o
LEFT SEMI JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + 3600000;
State: Both sides buffered as Arrow batches with per-key
BTreeMap<timestamp> indices. Time columns are extracted
automatically from the BETWEEN clause. New left rows probe all
right state; new right rows probe only old left state (avoids double-emit).
Checkpointed via Arrow IPC serialization.
ASOF Join
For each left row, finds the one closest right row by timestamp
within the same key partition. Uses Snowflake-compatible MATCH_CONDITION syntax.
Supported types: INNER, LEFT
Three directions
| Direction | Syntax | Behavior |
|---|---|---|
| Backward | t.ts >= q.ts |
Most recent prior right event |
| Forward | t.ts <= q.ts |
Next future right event |
| Nearest | NEAREST(t.ts, q.ts) |
Closest by absolute time difference |
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts)
ON t.symbol = q.symbol;
SELECT t.symbol, t.price, q.bid, q.ask
FROM trades t
ASOF JOIN quotes q
MATCH_CONDITION(t.ts >= q.ts AND t.ts - q.ts <= INTERVAL '5' SECOND)
ON t.symbol = q.symbol;
SELECT s.sensor_id, s.reading, c.calibration
FROM sensor_data s
ASOF JOIN calibrations c
MATCH_CONDITION(NEAREST(s.ts, c.ts))
ON s.sensor_id = c.sensor_id;
State: Right-side events stored in per-key
BTreeMap<timestamp, Vec<row_index>> for O(log n) temporal lookups.
Multiple rows at the same timestamp are handled correctly.
Key columns support VARCHAR and BIGINT types.
Temporal Probe Join
For each left event, produces N output rows by probing the right stream at
a fixed list of time offsets. Each probe returns the ASOF-matched right
value at left.event_time + offset. Useful for lagged feature
lookups and impact curves.
Offset specification
| Syntax | Description |
|---|---|
| RANGE FROM <start> TO <end> STEP <step> | Uniform range (all in milliseconds) |
| LIST (<ms>, <ms>, ...) | Explicit list of offsets in milliseconds |
Directions: Backward, Forward,
and Nearest. Each probed offset is resolved using the same
semantics as a standalone ASOF join in that direction.
Temporal Join (FOR SYSTEM_TIME AS OF)
Point-in-time lookup against a versioned table. For each stream event,
looks up the table row valid at the event's timestamp. Uses SQL:2011
FOR SYSTEM_TIME AS OF syntax.
Supported types: INNER, LEFT
SELECT o.order_id, o.amount, r.rate,
o.amount * r.rate AS converted
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
Versioned tables: The right-side table must have a timestamp
column marking version validity. Event-time semantics are deterministic;
process-time (PROCTIME()) is non-deterministic.
Lookup Join
Enriches stream events against a static or slowly-changing reference table. Matches by key equality with no time bound. Results are cached with a configurable TTL.
Supported types: INNER, LEFT
SELECT o.order_id, o.amount, p.name, p.category
FROM orders o
LEFT JOIN products p
ON o.product_id = p.id;
Cache: Foyer-backed memory/hybrid cache. Configure via
[[lookup]] section: cache.size_bytes,
cache.ttl, cache.hybrid.
Supports predicate pushdown to the lookup connector.
Environment Variables
All string values in TOML support ${VAR} substitution.
Use ${VAR:-default} for optional variables with fallbacks.
Missing variables without defaults cause a startup error.
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS:-localhost:9092}"
sasl.username = "${KAFKA_USER}"
sasl.password = "${KAFKA_PASSWORD}"
[checkpoint]
url = "s3://${S3_BUCKET}/checkpoints"
[checkpoint.storage]
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
S3 Storage Class Tiering
Active checkpoints use the hot tier. Older checkpoints automatically tier down via S3 Lifecycle rules.
| Field | Default | Description |
|---|---|---|
| hot_class | "STANDARD" | Storage class for active checkpoints (e.g. "EXPRESS_ONE_ZONE") |
| warm_class | "STANDARD" | Storage class for older checkpoints |
| cold_class | "" | Archive class (e.g. "GLACIER_IR"). Empty = no cold tier. |
| hot_retention | "24h" | Time before hot → warm |
| warm_retention | "7d" | Time before warm → cold |
Delivery Guarantees
| Guarantee | How it works | Requirements |
|---|---|---|
| at_least_once | Default. On recovery, sources replay from last checkpoint offset. Some records may be reprocessed. | Checkpointing enabled. Works with all connectors. |
| exactly_once | Barrier-aligned checkpoints with two-phase commit to sinks. No duplicates in output. | All sources must support replay. Sinks must support 2PC. Currently: Kafka sink and PostgreSQL sink. |
WebSocket sources are non-replayable. Under exactly-once mode, they degrade to at-most-once delivery with a warning.
Full Config Example
[server]
mode = "embedded"
bind = "0.0.0.0:8080"
workers = 4
log_level = "info"
[state]
backend = "memory"
[checkpoint]
url = "s3://my-bucket/laminardb/checkpoints"
interval = "30s"
[checkpoint.storage]
aws_region = "us-east-1"
# --- Sources ---
[[source]]
name = "trades"
connector = "kafka"
format = "avro"
[source.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "market-trades"
group.id = "laminar-vwap"
schema.registry.url = "${SCHEMA_REGISTRY_URL}"
startup.mode = "earliest"
[[source.schema]]
name = "symbol"
type = "VARCHAR"
nullable = false
[[source.schema]]
name = "price"
type = "DOUBLE"
[[source.schema]]
name = "volume"
type = "BIGINT"
[[source.schema]]
name = "ts"
type = "BIGINT"
[source.watermark]
column = "ts"
max_out_of_orderness = "5s"
# --- Lookups ---
[[lookup]]
name = "instruments"
connector = "postgres"
strategy = "poll"
pushdown = true
[lookup.properties]
hostname = "db.internal"
database = "reference_data"
username = "reader"
table.name = "instruments"
[lookup.cache]
size_bytes = 104857600
ttl = "5m"
# --- Pipelines ---
[[pipeline]]
name = "vwap"
sql = """
SELECT t.symbol,
SUM(t.price * CAST(t.volume AS DOUBLE))
/ SUM(CAST(t.volume AS DOUBLE)) AS vwap,
i.exchange
FROM trades t
JOIN instruments i ON t.symbol = i.symbol
GROUP BY t.symbol, i.exchange, tumble(t.ts, INTERVAL '1' MINUTE)
EMIT ON WINDOW CLOSE
"""
# --- Sinks ---
[[sink]]
name = "kafka_output"
pipeline = "vwap"
connector = "kafka"
delivery = "exactly_once"
[sink.properties]
bootstrap.servers = "${KAFKA_BROKERS}"
topic = "vwap-1m"
format = "json"
key.column = "symbol"
compression.type = "lz4"
[[sink]]
name = "lake"
pipeline = "vwap"
connector = "delta-lake"
delivery = "exactly_once"
[sink.properties]
table.path = "s3://my-bucket/delta/vwap"
partition.columns = "exchange"
writer.id = "vwap-writer"