Expand description
PostgreSQL sink connector.
PostgreSQL sink connector.
Writes Arrow RecordBatch to PostgreSQL tables using two strategies:
- Append mode: COPY BINARY for maximum throughput (>500K rows/sec)
- Upsert mode:
INSERT ... ON CONFLICT DO UPDATEwith UNNEST arrays
Exactly-once semantics use co-transactional offset storage — data and
epoch markers committed in the same PostgreSQL transaction.
§Architecture
Ring 0 (Hot Path): SPSC push only (~5ns, zero sink code)
Ring 1 (Background): Batch buffering → COPY/INSERT → transaction mgmt
Ring 2 (Control): Connection pool, table creation, epoch recovery§Module Structure
sink_config- Configuration and enumssink-PostgresSinkimplementingSinkConnectorsink_metrics- Lock-free atomic metricstypes- Arrow →PostgreSQLtype mapping
§Usage
ⓘ
use laminar_connectors::postgres::{PostgresSink, PostgresSinkConfig, WriteMode};
let config = PostgresSinkConfig {
hostname: "localhost".to_string(),
database: "mydb".to_string(),
table_name: "events".to_string(),
write_mode: WriteMode::Upsert,
primary_key_columns: vec!["id".to_string()],
..Default::default()
};
let sink = PostgresSink::new(schema, config);Re-exports§
pub use sink::PostgresSink;pub use sink_config::DeliveryGuarantee;pub use sink_config::PostgresSinkConfig;pub use sink_config::SslMode;pub use sink_config::WriteMode;pub use sink_metrics::PostgresSinkMetrics;
Modules§
- sink
PostgreSQLsink connector implementation.- sink_
config PostgreSQLsink connector configuration.- sink_
metrics PostgreSQLsink connector metrics.- types
- Arrow to
PostgreSQLtype mapping for sink operations.
Functions§
- register_
postgres_ sink - Registers the
PostgreSQLsink connector with the given registry.