Skip to main content

Module postgres

Module postgres 

Source
Expand description

PostgreSQL sink connector. PostgreSQL sink connector.

Writes Arrow RecordBatch to PostgreSQL tables using two strategies:

  • Append mode: COPY BINARY for maximum throughput (>500K rows/sec)
  • Upsert mode: INSERT ... ON CONFLICT DO UPDATE with UNNEST arrays

Exactly-once semantics use co-transactional offset storage — data and epoch markers committed in the same PostgreSQL transaction.

§Architecture

Ring 0 (Hot Path):  SPSC push only (~5ns, zero sink code)
Ring 1 (Background): Batch buffering → COPY/INSERT → transaction mgmt
Ring 2 (Control):    Connection pool, table creation, epoch recovery

§Module Structure

  • sink_config - Configuration and enums
  • sink - PostgresSink implementing SinkConnector
  • sink_metrics - Lock-free atomic metrics
  • types - Arrow → PostgreSQL type mapping

§Usage

use laminar_connectors::postgres::{PostgresSink, PostgresSinkConfig, WriteMode};

let config = PostgresSinkConfig {
    hostname: "localhost".to_string(),
    database: "mydb".to_string(),
    table_name: "events".to_string(),
    write_mode: WriteMode::Upsert,
    primary_key_columns: vec!["id".to_string()],
    ..Default::default()
};

let sink = PostgresSink::new(schema, config);

Re-exports§

pub use sink::PostgresSink;
pub use sink_config::DeliveryGuarantee;
pub use sink_config::PostgresSinkConfig;
pub use sink_config::SslMode;
pub use sink_config::WriteMode;
pub use sink_metrics::PostgresSinkMetrics;

Modules§

sink
PostgreSQL sink connector implementation.
sink_config
PostgreSQL sink connector configuration.
sink_metrics
PostgreSQL sink connector metrics.
types
Arrow to PostgreSQL type mapping for sink operations.

Functions§

register_postgres_sink
Registers the PostgreSQL sink connector with the given registry.