Skip to main content

laminar_connectors/postgres/
sink.rs

1//! `PostgreSQL` sink connector implementation.
2//!
3//! [`PostgresSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to `PostgreSQL` tables via two strategies:
5//!
6//! - **Append mode**: COPY BINARY for maximum throughput (>500K rows/sec)
7//! - **Upsert mode**: `INSERT ... ON CONFLICT DO UPDATE` with UNNEST arrays
8//!
9//! Exactly-once semantics use co-transactional offset storage: data and epoch
10//! markers are committed in the same `PostgreSQL` transaction.
11//!
12//! # Ring Architecture
13//!
14//! - **Ring 0**: No sink code. Data arrives via SPSC channel (~5ns push).
15//! - **Ring 1**: Batch buffering, COPY/INSERT writes, transaction management.
16//! - **Ring 2**: Connection pool, table creation, epoch recovery.
17
18use std::sync::Arc;
19use std::time::Instant;
20
21use arrow_array::{Array, RecordBatch};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use async_trait::async_trait;
24use tracing::{debug, info, warn};
25
26use crate::config::{ConnectorConfig, ConnectorState};
27use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
28use crate::error::ConnectorError;
29use crate::health::HealthStatus;
30use crate::metrics::ConnectorMetrics;
31
32use super::sink_config::{DeliveryGuarantee, PostgresSinkConfig, WriteMode};
33use super::sink_metrics::PostgresSinkMetrics;
34use super::types::{arrow_to_pg_ddl_type, arrow_type_to_pg_array_cast, arrow_type_to_pg_sql};
35
36#[cfg(feature = "postgres-sink")]
37use super::types::arrow_column_to_pg_array;
38#[cfg(feature = "postgres-sink")]
39use bytes::BytesMut;
40#[cfg(feature = "postgres-sink")]
41use deadpool_postgres::Pool;
42
43/// `PostgreSQL` sink connector.
44///
45/// Writes Arrow `RecordBatch` to `PostgreSQL` tables using COPY BINARY
46/// (append) or UNNEST-based upsert, with optional exactly-once semantics
47/// via co-transactional epoch storage.
48pub struct PostgresSink {
49    /// Sink configuration.
50    config: PostgresSinkConfig,
51    /// Arrow schema for input batches.
52    schema: SchemaRef,
53    /// User-visible schema (metadata columns stripped).
54    user_schema: SchemaRef,
55    /// Connector lifecycle state.
56    state: ConnectorState,
57    /// Current epoch (for exactly-once).
58    current_epoch: u64,
59    /// Last committed epoch.
60    last_committed_epoch: u64,
61    /// Buffered records awaiting flush.
62    buffer: Vec<RecordBatch>,
63    /// Total rows in buffer.
64    buffered_rows: usize,
65    /// Last flush time.
66    last_flush: Instant,
67    /// Sink metrics.
68    metrics: PostgresSinkMetrics,
69    /// Cached upsert SQL statement (for upsert mode).
70    upsert_sql: Option<String>,
71    /// Cached COPY SQL statement (for append mode).
72    copy_sql: Option<String>,
73    /// Cached CREATE TABLE SQL (for auto-create).
74    create_table_sql: Option<String>,
75    /// Cached DELETE SQL (for changelog mode).
76    delete_sql: Option<String>,
77    /// Connection pool (`None` until `open()` is called).
78    #[cfg(feature = "postgres-sink")]
79    pool: Option<Pool>,
80    /// Pre-allocated encode buffer for pgpq COPY BINARY output.
81    #[cfg(feature = "postgres-sink")]
82    encode_buf: BytesMut,
83}
84
85impl PostgresSink {
86    /// Creates a new `PostgreSQL` sink connector.
87    #[must_use]
88    pub fn new(schema: SchemaRef, config: PostgresSinkConfig) -> Self {
89        let user_schema = build_user_schema(&schema);
90        // Pre-allocate buffer to avoid reallocation during first epoch.
91        let buf_capacity = (config.batch_size / 1024).max(4);
92        Self {
93            config,
94            schema,
95            user_schema,
96            state: ConnectorState::Created,
97            current_epoch: 0,
98            last_committed_epoch: 0,
99            buffer: Vec::with_capacity(buf_capacity),
100            buffered_rows: 0,
101            last_flush: Instant::now(),
102            metrics: PostgresSinkMetrics::new(),
103            upsert_sql: None,
104            copy_sql: None,
105            create_table_sql: None,
106            delete_sql: None,
107            #[cfg(feature = "postgres-sink")]
108            pool: None,
109            #[cfg(feature = "postgres-sink")]
110            encode_buf: BytesMut::with_capacity(64 * 1024),
111        }
112    }
113
114    /// Returns the current connector state.
115    #[must_use]
116    pub fn state(&self) -> ConnectorState {
117        self.state
118    }
119
120    /// Returns the current epoch.
121    #[must_use]
122    pub fn current_epoch(&self) -> u64 {
123        self.current_epoch
124    }
125
126    /// Returns the last committed epoch.
127    #[must_use]
128    pub fn last_committed_epoch(&self) -> u64 {
129        self.last_committed_epoch
130    }
131
132    /// Returns the number of buffered rows pending flush.
133    #[must_use]
134    pub fn buffered_rows(&self) -> usize {
135        self.buffered_rows
136    }
137
138    /// Returns a reference to the sink metrics.
139    #[must_use]
140    pub fn sink_metrics(&self) -> &PostgresSinkMetrics {
141        &self.metrics
142    }
143
144    // ── SQL Generation ──────────────────────────────────────────────
145
146    /// Builds the COPY BINARY SQL statement.
147    ///
148    /// ```sql
149    /// COPY public.events (id, value, ts) FROM STDIN BINARY
150    /// ```
151    #[must_use]
152    pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
153        let columns = user_columns(schema);
154        let col_list = columns.join(", ");
155        format!(
156            "COPY {} ({}) FROM STDIN BINARY",
157            config.qualified_table_name(),
158            col_list,
159        )
160    }
161
162    /// Builds the UNNEST-based upsert SQL statement.
163    ///
164    /// ```sql
165    /// INSERT INTO public.target (id, value, updated_at)
166    /// SELECT * FROM UNNEST($1::int8[], $2::text[], $3::timestamptz[])
167    /// ON CONFLICT (id) DO UPDATE SET
168    ///     value = EXCLUDED.value,
169    ///     updated_at = EXCLUDED.updated_at
170    /// ```
171    #[must_use]
172    pub fn build_upsert_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
173        let fields = user_fields(schema);
174
175        let columns: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
176
177        let unnest_params: Vec<String> = fields
178            .iter()
179            .enumerate()
180            .map(|(i, f)| arrow_type_to_pg_array_cast(f.data_type(), i + 1))
181            .collect();
182
183        let non_key_columns: Vec<&str> = columns
184            .iter()
185            .copied()
186            .filter(|c| {
187                !config
188                    .primary_key_columns
189                    .iter()
190                    .any(|pk| pk.as_str() == *c)
191            })
192            .collect();
193
194        let update_clause: Vec<String> = non_key_columns
195            .iter()
196            .map(|c| format!("{c} = EXCLUDED.{c}"))
197            .collect();
198
199        let pk_list = config.primary_key_columns.join(", ");
200
201        if update_clause.is_empty() {
202            // Key-only table: use DO NOTHING
203            format!(
204                "INSERT INTO {} ({}) \
205                 SELECT * FROM UNNEST({}) \
206                 ON CONFLICT ({}) DO NOTHING",
207                config.qualified_table_name(),
208                columns.join(", "),
209                unnest_params.join(", "),
210                pk_list,
211            )
212        } else {
213            format!(
214                "INSERT INTO {} ({}) \
215                 SELECT * FROM UNNEST({}) \
216                 ON CONFLICT ({}) DO UPDATE SET {}",
217                config.qualified_table_name(),
218                columns.join(", "),
219                unnest_params.join(", "),
220                pk_list,
221                update_clause.join(", "),
222            )
223        }
224    }
225
226    /// Builds the DELETE SQL for changelog deletes.
227    ///
228    /// ```sql
229    /// DELETE FROM public.events WHERE id = ANY($1::int8[])
230    /// ```
231    #[must_use]
232    pub fn build_delete_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
233        let pk_conditions: Vec<String> = config
234            .primary_key_columns
235            .iter()
236            .enumerate()
237            .map(|(i, col)| {
238                let dt = schema
239                    .field_with_name(col)
240                    .map(|f| f.data_type().clone())
241                    .unwrap_or(DataType::Utf8);
242                let pg_type = arrow_type_to_pg_sql(&dt);
243                format!("{col} = ANY(${}::{}[])", i + 1, pg_type)
244            })
245            .collect();
246
247        format!(
248            "DELETE FROM {} WHERE {}",
249            config.qualified_table_name(),
250            pk_conditions.join(" AND "),
251        )
252    }
253
254    /// Builds CREATE TABLE DDL from the Arrow schema.
255    ///
256    /// ```sql
257    /// CREATE TABLE IF NOT EXISTS public.events (
258    ///     id BIGINT NOT NULL,
259    ///     value TEXT,
260    ///     ts TIMESTAMPTZ,
261    ///     PRIMARY KEY (id)
262    /// )
263    /// ```
264    #[must_use]
265    pub fn build_create_table_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
266        let fields = user_fields(schema);
267
268        let column_defs: Vec<String> = fields
269            .iter()
270            .map(|f| {
271                let pg_type = arrow_to_pg_ddl_type(f.data_type());
272                let nullable = if f.is_nullable() { "" } else { " NOT NULL" };
273                format!("    {} {}{}", f.name(), pg_type, nullable)
274            })
275            .collect();
276
277        let mut ddl = format!(
278            "CREATE TABLE IF NOT EXISTS {} (\n{}\n",
279            config.qualified_table_name(),
280            column_defs.join(",\n"),
281        );
282
283        if !config.primary_key_columns.is_empty() {
284            use std::fmt::Write;
285            let _ = write!(
286                ddl,
287                ",\n    PRIMARY KEY ({})\n",
288                config.primary_key_columns.join(", ")
289            );
290        }
291
292        ddl.push(')');
293        ddl
294    }
295
296    /// Builds CREATE TABLE DDL for the offset tracking table.
297    #[must_use]
298    pub fn build_offset_table_sql() -> &'static str {
299        "CREATE TABLE IF NOT EXISTS _laminardb_sink_offsets (\
300         \n    sink_id TEXT PRIMARY KEY,\
301         \n    epoch BIGINT NOT NULL,\
302         \n    source_offsets JSONB,\
303         \n    watermark BIGINT,\
304         \n    updated_at TIMESTAMPTZ DEFAULT NOW()\
305         \n)"
306    }
307
308    /// Builds the epoch commit SQL.
309    #[must_use]
310    pub fn build_epoch_commit_sql() -> &'static str {
311        "INSERT INTO _laminardb_sink_offsets (sink_id, epoch, updated_at) \
312         VALUES ($1, $2, NOW()) \
313         ON CONFLICT (sink_id) DO UPDATE SET epoch = $2, updated_at = NOW()"
314    }
315
316    /// Builds the epoch recovery SQL.
317    #[must_use]
318    pub fn build_epoch_recover_sql() -> &'static str {
319        "SELECT epoch FROM _laminardb_sink_offsets WHERE sink_id = $1"
320    }
321
322    // ── Changelog/Retraction ────────────────────────────────────────
323
324    /// Splits a changelog `RecordBatch` into insert and delete batches.
325    ///
326    /// Uses the `_op` metadata column:
327    /// - `"I"` (insert), `"U"` (update-after), `"r"` (snapshot read) → insert batch
328    /// - `"D"` (delete) → delete batch
329    ///
330    /// The returned batches exclude metadata columns (those starting with `_`).
331    ///
332    /// # Errors
333    ///
334    /// Returns `ConnectorError::ConfigurationError` if the `_op` column is
335    /// missing or not a string type.
336    pub fn split_changelog_batch(
337        batch: &RecordBatch,
338    ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
339        let op_idx = batch.schema().index_of("_op").map_err(|_| {
340            ConnectorError::ConfigurationError(
341                "changelog mode requires '_op' column in input schema".into(),
342            )
343        })?;
344
345        let op_array = batch
346            .column(op_idx)
347            .as_any()
348            .downcast_ref::<arrow_array::StringArray>()
349            .ok_or_else(|| {
350                ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
351            })?;
352
353        let mut insert_indices = Vec::new();
354        let mut delete_indices = Vec::new();
355
356        for i in 0..op_array.len() {
357            if op_array.is_null(i) {
358                continue;
359            }
360            match op_array.value(i) {
361                "I" | "U" | "r" => {
362                    insert_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
363                }
364                "D" => {
365                    delete_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
366                }
367                _ => {} // Skip unknown ops
368            }
369        }
370
371        let insert_batch = filter_batch_by_indices(batch, &insert_indices)?;
372        let delete_batch = filter_batch_by_indices(batch, &delete_indices)?;
373
374        Ok((insert_batch, delete_batch))
375    }
376
377    // ── Internal helpers ────────────────────────────────────────────
378
379    /// Prepares cached SQL statements based on schema and config.
380    fn prepare_statements(&mut self) {
381        self.copy_sql = Some(Self::build_copy_sql(&self.schema, &self.config));
382
383        if self.config.write_mode == WriteMode::Upsert {
384            self.upsert_sql = Some(Self::build_upsert_sql(&self.schema, &self.config));
385        }
386
387        if self.config.auto_create_table {
388            self.create_table_sql = Some(Self::build_create_table_sql(&self.schema, &self.config));
389        }
390
391        if self.config.changelog_mode {
392            self.delete_sql = Some(Self::build_delete_sql(&self.schema, &self.config));
393        }
394    }
395
396    /// Returns a reference to the connection pool, or an error if not initialized.
397    #[cfg(feature = "postgres-sink")]
398    fn pool(&self) -> Result<&Pool, ConnectorError> {
399        self.pool.as_ref().ok_or(ConnectorError::InvalidState {
400            expected: "pool initialized (call open() first)".into(),
401            actual: "pool not initialized".into(),
402        })
403    }
404
405    /// Concatenates all buffered batches and strips metadata columns.
406    #[cfg(feature = "postgres-sink")]
407    fn concat_buffer(&self) -> Result<RecordBatch, ConnectorError> {
408        if self.buffer.is_empty() {
409            return Ok(RecordBatch::new_empty(self.user_schema.clone()));
410        }
411
412        let stripped: Result<Vec<RecordBatch>, ConnectorError> =
413            self.buffer.iter().map(strip_metadata_columns).collect();
414        let stripped = stripped?;
415
416        arrow_select::concat::concat_batches(&self.user_schema, &stripped)
417            .map_err(|e| ConnectorError::Internal(format!("batch concat failed: {e}")))
418    }
419
420    /// Flushes buffered data to `PostgreSQL` using the COPY BINARY protocol.
421    #[cfg(feature = "postgres-sink")]
422    async fn flush_append(
423        &mut self,
424        client: &tokio_postgres::Client,
425    ) -> Result<WriteResult, ConnectorError> {
426        let user_batch = self.concat_buffer()?;
427        if user_batch.num_rows() == 0 {
428            return Ok(WriteResult::new(0, 0));
429        }
430
431        let mut encoder = pgpq::ArrowToPostgresBinaryEncoder::try_new(self.user_schema.as_ref())
432            .map_err(|e| ConnectorError::Internal(format!("pgpq encoder init: {e}")))?;
433
434        self.encode_buf.clear();
435        encoder.write_header(&mut self.encode_buf);
436        encoder
437            .write_batch(&user_batch, &mut self.encode_buf)
438            .map_err(|e| ConnectorError::Internal(format!("pgpq encode: {e}")))?;
439        encoder
440            .write_footer(&mut self.encode_buf)
441            .map_err(|e| ConnectorError::Internal(format!("pgpq footer: {e}")))?;
442
443        let encoded_bytes = self.encode_buf.len();
444        let bytes_to_send = self.encode_buf.split().freeze();
445
446        let copy_sql = self
447            .copy_sql
448            .as_deref()
449            .ok_or_else(|| ConnectorError::Internal("COPY SQL not prepared".into()))?;
450
451        let sink = client
452            .copy_in(copy_sql)
453            .await
454            .map_err(|e| ConnectorError::WriteError(format!("COPY start: {e}")))?;
455
456        {
457            use futures_util::SinkExt;
458            futures_util::pin_mut!(sink);
459            sink.send(bytes_to_send)
460                .await
461                .map_err(|e| ConnectorError::WriteError(format!("COPY send: {e}")))?;
462            sink.close()
463                .await
464                .map_err(|e| ConnectorError::WriteError(format!("COPY finish: {e}")))?;
465        }
466
467        let rows = user_batch.num_rows();
468        self.metrics.record_write(rows as u64, encoded_bytes as u64);
469        self.metrics.record_flush();
470        self.metrics.record_copy();
471
472        Ok(WriteResult::new(rows, encoded_bytes as u64))
473    }
474
475    /// Flushes buffered data to `PostgreSQL` using UNNEST-based upsert.
476    #[cfg(feature = "postgres-sink")]
477    #[allow(clippy::cast_possible_truncation)]
478    async fn flush_upsert(
479        &mut self,
480        client: &tokio_postgres::Client,
481    ) -> Result<WriteResult, ConnectorError> {
482        if self.config.changelog_mode {
483            return self.flush_changelog(client).await;
484        }
485
486        let user_batch = self.concat_buffer()?;
487        if user_batch.num_rows() == 0 {
488            return Ok(WriteResult::new(0, 0));
489        }
490
491        let upsert_sql = self
492            .upsert_sql
493            .as_deref()
494            .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
495
496        let rows = execute_unnest(client, upsert_sql, &user_batch).await?;
497
498        let byte_estimate = estimate_batch_bytes(&user_batch);
499        self.metrics.record_write(rows, byte_estimate);
500        self.metrics.record_flush();
501        self.metrics.record_upsert();
502
503        Ok(WriteResult::new(rows as usize, byte_estimate))
504    }
505
506    /// Flushes changelog batches: deletes first, then upserts.
507    #[cfg(feature = "postgres-sink")]
508    #[allow(clippy::cast_possible_truncation)]
509    async fn flush_changelog(
510        &mut self,
511        client: &tokio_postgres::Client,
512    ) -> Result<WriteResult, ConnectorError> {
513        let mut total_rows: u64 = 0;
514        let mut total_bytes: u64 = 0;
515
516        // Split each buffered batch into inserts/deletes.
517        let mut all_inserts = Vec::new();
518        let mut all_deletes = Vec::new();
519        for batch in &self.buffer {
520            let (ins, del) = Self::split_changelog_batch(batch)?;
521            if ins.num_rows() > 0 {
522                all_inserts.push(ins);
523            }
524            if del.num_rows() > 0 {
525                all_deletes.push(del);
526            }
527        }
528
529        // Process inserts/updates first, then deletes.
530        // This handles the common CDC pattern where a key is inserted then
531        // later deleted within the same epoch. For the rare case where a key
532        // is deleted then re-inserted in the same batch, the upsert after
533        // delete produces the correct final state.
534        if !all_inserts.is_empty() {
535            let insert_batch =
536                arrow_select::concat::concat_batches(&self.user_schema, &all_inserts)
537                    .map_err(|e| ConnectorError::Internal(format!("concat inserts: {e}")))?;
538
539            let upsert_sql = self
540                .upsert_sql
541                .as_deref()
542                .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
543
544            let rows = execute_unnest(client, upsert_sql, &insert_batch).await?;
545            let bytes = estimate_batch_bytes(&insert_batch);
546            self.metrics.record_write(rows, bytes);
547            self.metrics.record_upsert();
548            total_rows += rows;
549            total_bytes += bytes;
550        }
551
552        if !all_deletes.is_empty() {
553            let delete_batch =
554                arrow_select::concat::concat_batches(&self.user_schema, &all_deletes)
555                    .map_err(|e| ConnectorError::Internal(format!("concat deletes: {e}")))?;
556            let deleted = self.execute_deletes(client, &delete_batch).await?;
557            self.metrics.record_deletes(deleted as u64);
558            total_rows += deleted as u64;
559        }
560
561        self.metrics.record_flush();
562        Ok(WriteResult::new(total_rows as usize, total_bytes))
563    }
564
565    /// Executes batched DELETE for changelog delete records.
566    #[cfg(feature = "postgres-sink")]
567    #[allow(clippy::cast_possible_truncation)]
568    async fn execute_deletes(
569        &self,
570        client: &tokio_postgres::Client,
571        delete_batch: &RecordBatch,
572    ) -> Result<usize, ConnectorError> {
573        if delete_batch.num_rows() == 0 {
574            return Ok(0);
575        }
576
577        let delete_sql = self
578            .delete_sql
579            .as_deref()
580            .ok_or_else(|| ConnectorError::Internal("DELETE SQL not prepared".into()))?;
581
582        let pk_params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = self
583            .config
584            .primary_key_columns
585            .iter()
586            .map(|col| {
587                let idx = delete_batch.schema().index_of(col).map_err(|_| {
588                    ConnectorError::ConfigurationError(format!(
589                        "primary key column '{col}' not in delete batch"
590                    ))
591                })?;
592                arrow_column_to_pg_array(delete_batch.column(idx))
593            })
594            .collect::<Result<_, _>>()?;
595
596        let pk_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = pk_params
597            .iter()
598            .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
599            .collect();
600
601        let rows = client
602            .execute(delete_sql, &pk_refs)
603            .await
604            .map_err(|e| ConnectorError::WriteError(format!("DELETE: {e}")))?;
605
606        Ok(rows as usize)
607    }
608
609    /// Dispatches flush to the appropriate write mode.
610    #[cfg(feature = "postgres-sink")]
611    async fn flush_to_client(
612        &mut self,
613        client: &tokio_postgres::Client,
614    ) -> Result<WriteResult, ConnectorError> {
615        match self.config.write_mode {
616            WriteMode::Append => self.flush_append(client).await,
617            WriteMode::Upsert => self.flush_upsert(client).await,
618        }
619    }
620
621    /// Checks whether the given epoch has already been committed to `PostgreSQL`.
622    #[cfg(feature = "postgres-sink")]
623    #[allow(clippy::cast_sign_loss)]
624    async fn is_epoch_committed(
625        &self,
626        client: &tokio_postgres::Client,
627        epoch: u64,
628    ) -> Result<bool, ConnectorError> {
629        let sink_id = self.config.effective_sink_id();
630        let row = client
631            .query_opt(Self::build_epoch_recover_sql(), &[&sink_id])
632            .await
633            .map_err(|e| ConnectorError::Internal(format!("epoch recovery query: {e}")))?;
634
635        if let Some(row) = row {
636            let committed: i64 = row.get(0);
637            Ok(committed as u64 >= epoch)
638        } else {
639            Ok(false)
640        }
641    }
642}
643
644// ── SinkConnector implementation ────────────────────────────────────
645
646// When the postgres-sink feature is enabled, provide the real implementation.
647#[cfg(feature = "postgres-sink")]
648#[async_trait]
649impl SinkConnector for PostgresSink {
650    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
651        self.state = ConnectorState::Initializing;
652
653        if !config.properties().is_empty() {
654            self.config = PostgresSinkConfig::from_config(config)?;
655        }
656
657        // Validate changelog requires upsert.
658        if self.config.changelog_mode && self.config.write_mode != WriteMode::Upsert {
659            return Err(ConnectorError::ConfigurationError(
660                "changelog mode requires write.mode = 'upsert'".into(),
661            ));
662        }
663
664        info!(
665            table = %self.config.qualified_table_name(),
666            mode = %self.config.write_mode,
667            guarantee = %self.config.delivery_guarantee,
668            "opening PostgreSQL sink connector"
669        );
670
671        self.prepare_statements();
672
673        if self.config.sink_id.is_empty() {
674            self.config.sink_id = self.config.effective_sink_id();
675        }
676
677        // Build connection pool.
678        let mut pool_cfg = deadpool_postgres::Config::new();
679        pool_cfg.host = Some(self.config.hostname.clone());
680        pool_cfg.port = Some(self.config.port);
681        pool_cfg.dbname = Some(self.config.database.clone());
682        pool_cfg.user = Some(self.config.username.clone());
683        pool_cfg.password = Some(self.config.password.clone());
684        pool_cfg.pool = Some(deadpool_postgres::PoolConfig::new(self.config.pool_size));
685
686        let pool = pool_cfg
687            .create_pool(
688                Some(deadpool_postgres::Runtime::Tokio1),
689                tokio_postgres::NoTls,
690            )
691            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool creation failed: {e}")))?;
692
693        // Validate connectivity.
694        let client = pool.get().await.map_err(|e| {
695            ConnectorError::ConnectionFailed(format!("initial connection failed: {e}"))
696        })?;
697
698        // Auto-create target table.
699        if self.config.auto_create_table {
700            if let Some(ddl) = &self.create_table_sql {
701                client.batch_execute(ddl.as_str()).await.map_err(|e| {
702                    ConnectorError::Internal(format!("auto-create table failed: {e}"))
703                })?;
704                debug!(table = %self.config.qualified_table_name(), "target table ensured");
705            }
706        }
707
708        // Create offset tracking table for exactly-once.
709        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
710            client
711                .batch_execute(Self::build_offset_table_sql())
712                .await
713                .map_err(|e| {
714                    ConnectorError::Internal(format!("create offset table failed: {e}"))
715                })?;
716
717            // Recover last committed epoch.
718            {
719                let recover_sink_id = self.config.effective_sink_id();
720                let row = client
721                    .query_opt(Self::build_epoch_recover_sql(), &[&recover_sink_id])
722                    .await
723                    .map_err(|e| ConnectorError::Internal(format!("epoch recovery: {e}")))?;
724                if let Some(row) = row {
725                    let epoch: i64 = row.get(0);
726                    #[allow(clippy::cast_sign_loss)]
727                    {
728                        self.last_committed_epoch = epoch as u64;
729                    }
730                    info!(epoch, "recovered last committed epoch");
731                }
732            }
733        }
734
735        self.pool = Some(pool);
736        self.state = ConnectorState::Running;
737
738        info!(
739            table = %self.config.qualified_table_name(),
740            pool_size = self.config.pool_size,
741            "PostgreSQL sink connector opened"
742        );
743
744        Ok(())
745    }
746
747    #[allow(clippy::cast_possible_truncation)]
748    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
749        if self.state != ConnectorState::Running {
750            return Err(ConnectorError::InvalidState {
751                expected: "Running".into(),
752                actual: self.state.to_string(),
753            });
754        }
755
756        if batch.num_rows() == 0 {
757            return Ok(WriteResult::new(0, 0));
758        }
759
760        self.buffer.push(batch.clone());
761        self.buffered_rows += batch.num_rows();
762
763        // In exactly-once mode, defer all flushing to pre_commit.
764        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
765            return Ok(WriteResult::new(0, 0));
766        }
767
768        // At-least-once: auto-flush when thresholds are reached.
769        let should_flush = self.buffered_rows >= self.config.batch_size
770            || self.last_flush.elapsed() >= self.config.flush_interval;
771
772        if should_flush {
773            let client = self
774                .pool()?
775                .get()
776                .await
777                .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
778            let result = self.flush_to_client(&client).await?;
779            self.buffer.clear();
780            self.buffered_rows = 0;
781            self.last_flush = Instant::now();
782            return Ok(result);
783        }
784
785        Ok(WriteResult::new(0, 0))
786    }
787
788    fn schema(&self) -> SchemaRef {
789        self.schema.clone()
790    }
791
792    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
793        self.current_epoch = epoch;
794        debug!(epoch, "PostgreSQL sink epoch started");
795        Ok(())
796    }
797
798    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
799        if epoch != self.current_epoch {
800            return Err(ConnectorError::TransactionError(format!(
801                "epoch mismatch in pre_commit: expected {}, got {epoch}",
802                self.current_epoch
803            )));
804        }
805
806        if self.buffer.is_empty() {
807            return Ok(());
808        }
809
810        let client = self
811            .pool()?
812            .get()
813            .await
814            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
815
816        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
817            // Check for already-committed epoch (recovery replay).
818            if self.is_epoch_committed(&client, epoch).await? {
819                info!(epoch, "epoch already committed in PostgreSQL, skipping");
820                self.buffer.clear();
821                self.buffered_rows = 0;
822                return Ok(());
823            }
824
825            // Co-transactional: data + epoch marker in one PG transaction.
826            client
827                .batch_execute("BEGIN")
828                .await
829                .map_err(|e| ConnectorError::WriteError(format!("BEGIN: {e}")))?;
830
831            match self.flush_to_client(&client).await {
832                Ok(_result) => {
833                    let sink_id = self.config.effective_sink_id();
834                    #[allow(clippy::cast_possible_wrap)]
835                    let epoch_i64 = epoch as i64;
836                    let params: Vec<&(dyn postgres_types::ToSql + Sync)> =
837                        vec![&sink_id, &epoch_i64];
838                    client
839                        .execute(Self::build_epoch_commit_sql(), &params)
840                        .await
841                        .map_err(|e| {
842                            ConnectorError::WriteError(format!("epoch marker write: {e}"))
843                        })?;
844
845                    client
846                        .batch_execute("COMMIT")
847                        .await
848                        .map_err(|e| ConnectorError::WriteError(format!("COMMIT: {e}")))?;
849                }
850                Err(e) => {
851                    let _ = client.batch_execute("ROLLBACK").await;
852                    return Err(e);
853                }
854            }
855        } else {
856            // At-least-once: just flush remaining buffer.
857            self.flush_to_client(&client).await?;
858        }
859
860        self.buffer.clear();
861        self.buffered_rows = 0;
862        self.last_flush = Instant::now();
863
864        debug!(epoch, "PostgreSQL sink pre-committed");
865        Ok(())
866    }
867
868    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
869        if epoch != self.current_epoch {
870            return Err(ConnectorError::TransactionError(format!(
871                "epoch mismatch: expected {}, got {epoch}",
872                self.current_epoch
873            )));
874        }
875
876        self.last_committed_epoch = epoch;
877        self.metrics.record_commit();
878
879        debug!(epoch, "PostgreSQL sink epoch committed");
880        Ok(())
881    }
882
883    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
884        self.buffer.clear();
885        self.buffered_rows = 0;
886
887        self.metrics.record_rollback();
888        warn!(epoch, "PostgreSQL sink epoch rolled back");
889        Ok(())
890    }
891
892    fn health_check(&self) -> HealthStatus {
893        match self.state {
894            ConnectorState::Running => {
895                if let Some(pool) = &self.pool {
896                    let status = pool.status();
897                    if status.available > 0 {
898                        HealthStatus::Healthy
899                    } else {
900                        HealthStatus::Degraded(format!(
901                            "no available connections ({} in use)",
902                            status.size
903                        ))
904                    }
905                } else {
906                    HealthStatus::Unhealthy("pool not initialized".into())
907                }
908            }
909            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
910            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
911            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
912            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
913            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
914        }
915    }
916
917    fn metrics(&self) -> ConnectorMetrics {
918        self.metrics.to_connector_metrics()
919    }
920
921    fn capabilities(&self) -> SinkConnectorCapabilities {
922        let mut caps = SinkConnectorCapabilities::default().with_idempotent();
923
924        if self.config.write_mode == WriteMode::Upsert {
925            caps = caps.with_upsert();
926        }
927        if self.config.changelog_mode {
928            caps = caps.with_changelog();
929        }
930        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
931            caps = caps.with_exactly_once().with_two_phase_commit();
932        }
933
934        caps
935    }
936
937    async fn flush(&mut self) -> Result<(), ConnectorError> {
938        if self.buffer.is_empty() {
939            return Ok(());
940        }
941        let client = self
942            .pool()?
943            .get()
944            .await
945            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
946        self.flush_to_client(&client).await?;
947        self.buffer.clear();
948        self.buffered_rows = 0;
949        self.last_flush = Instant::now();
950        Ok(())
951    }
952
953    async fn close(&mut self) -> Result<(), ConnectorError> {
954        info!("closing PostgreSQL sink connector");
955
956        if !self.buffer.is_empty() {
957            if let Some(pool) = &self.pool {
958                if let Ok(client) = pool.get().await {
959                    let _ = self.flush_to_client(&client).await;
960                }
961            }
962        }
963
964        self.buffer.clear();
965        self.buffered_rows = 0;
966        self.pool = None;
967        self.state = ConnectorState::Closed;
968
969        info!(
970            table = %self.config.qualified_table_name(),
971            records = self.metrics.records_written.load(
972                std::sync::atomic::Ordering::Relaxed
973            ),
974            epochs = self.metrics.epochs_committed.load(
975                std::sync::atomic::Ordering::Relaxed
976            ),
977            "PostgreSQL sink connector closed"
978        );
979
980        Ok(())
981    }
982}
983
984// When postgres-sink feature is NOT enabled, provide a stub that returns UnsupportedOperation.
985#[cfg(not(feature = "postgres-sink"))]
986#[async_trait]
987impl SinkConnector for PostgresSink {
988    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
989        Err(ConnectorError::UnsupportedOperation(
990            "PostgreSQL sink requires the 'postgres-sink' feature".into(),
991        ))
992    }
993
994    async fn write_batch(&mut self, _batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
995        Err(ConnectorError::UnsupportedOperation(
996            "PostgreSQL sink requires the 'postgres-sink' feature".into(),
997        ))
998    }
999
1000    fn schema(&self) -> SchemaRef {
1001        self.schema.clone()
1002    }
1003
1004    fn health_check(&self) -> HealthStatus {
1005        match self.state {
1006            ConnectorState::Running => HealthStatus::Healthy,
1007            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1008            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1009            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1010            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1011            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1012        }
1013    }
1014
1015    fn metrics(&self) -> ConnectorMetrics {
1016        self.metrics.to_connector_metrics()
1017    }
1018
1019    fn capabilities(&self) -> SinkConnectorCapabilities {
1020        let mut caps = SinkConnectorCapabilities::default().with_idempotent();
1021        if self.config.write_mode == WriteMode::Upsert {
1022            caps = caps.with_upsert();
1023        }
1024        if self.config.changelog_mode {
1025            caps = caps.with_changelog();
1026        }
1027        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1028            caps = caps.with_exactly_once().with_two_phase_commit();
1029        }
1030        caps
1031    }
1032
1033    async fn close(&mut self) -> Result<(), ConnectorError> {
1034        self.state = ConnectorState::Closed;
1035        Ok(())
1036    }
1037}
1038
1039impl std::fmt::Debug for PostgresSink {
1040    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1041        f.debug_struct("PostgresSink")
1042            .field("state", &self.state)
1043            .field("table", &self.config.qualified_table_name())
1044            .field("mode", &self.config.write_mode)
1045            .field("guarantee", &self.config.delivery_guarantee)
1046            .field("current_epoch", &self.current_epoch)
1047            .field("last_committed_epoch", &self.last_committed_epoch)
1048            .field("buffered_rows", &self.buffered_rows)
1049            .finish_non_exhaustive()
1050    }
1051}
1052
1053// ── Helper functions ────────────────────────────────────────────────
1054
1055/// Returns user-visible column names (excluding metadata columns starting with `_`).
1056fn user_columns(schema: &SchemaRef) -> Vec<String> {
1057    schema
1058        .fields()
1059        .iter()
1060        .filter(|f| !f.name().starts_with('_'))
1061        .map(|f| f.name().clone())
1062        .collect()
1063}
1064
1065/// Returns user-visible fields (excluding metadata columns starting with `_`).
1066fn user_fields(schema: &SchemaRef) -> Vec<&Arc<Field>> {
1067    schema
1068        .fields()
1069        .iter()
1070        .filter(|f| !f.name().starts_with('_'))
1071        .collect()
1072}
1073
1074/// Builds a schema containing only user-visible columns.
1075fn build_user_schema(schema: &SchemaRef) -> SchemaRef {
1076    Arc::new(Schema::new(
1077        schema
1078            .fields()
1079            .iter()
1080            .filter(|f| !f.name().starts_with('_'))
1081            .cloned()
1082            .collect::<Vec<_>>(),
1083    ))
1084}
1085
1086/// Filters a `RecordBatch` to include only rows at the given indices.
1087///
1088/// Also strips metadata columns (those starting with `_`) from the output.
1089fn filter_batch_by_indices(
1090    batch: &RecordBatch,
1091    indices: &[u32],
1092) -> Result<RecordBatch, ConnectorError> {
1093    if indices.is_empty() {
1094        let user_schema = Arc::new(Schema::new(
1095            batch
1096                .schema()
1097                .fields()
1098                .iter()
1099                .filter(|f| !f.name().starts_with('_'))
1100                .cloned()
1101                .collect::<Vec<_>>(),
1102        ));
1103        return Ok(RecordBatch::new_empty(user_schema));
1104    }
1105
1106    let indices_array = arrow_array::UInt32Array::from(indices.to_vec());
1107
1108    let user_schema = Arc::new(Schema::new(
1109        batch
1110            .schema()
1111            .fields()
1112            .iter()
1113            .filter(|f| !f.name().starts_with('_'))
1114            .cloned()
1115            .collect::<Vec<_>>(),
1116    ));
1117
1118    let filtered_columns: Vec<Arc<dyn arrow_array::Array>> = batch
1119        .schema()
1120        .fields()
1121        .iter()
1122        .enumerate()
1123        .filter(|(_, f)| !f.name().starts_with('_'))
1124        .map(|(i, _)| {
1125            arrow_select::take::take(batch.column(i), &indices_array, None)
1126                .map_err(|e| ConnectorError::Internal(format!("arrow take failed: {e}")))
1127        })
1128        .collect::<Result<Vec<_>, _>>()?;
1129
1130    RecordBatch::try_new(user_schema, filtered_columns)
1131        .map_err(|e| ConnectorError::Internal(format!("batch construction failed: {e}")))
1132}
1133
1134/// Strips metadata columns (starting with `_`) from a `RecordBatch`.
1135#[cfg(feature = "postgres-sink")]
1136fn strip_metadata_columns(batch: &RecordBatch) -> Result<RecordBatch, ConnectorError> {
1137    let schema = batch.schema();
1138    let user_indices: Vec<usize> = schema
1139        .fields()
1140        .iter()
1141        .enumerate()
1142        .filter(|(_, f)| !f.name().starts_with('_'))
1143        .map(|(i, _)| i)
1144        .collect();
1145
1146    if user_indices.len() == schema.fields().len() {
1147        return Ok(batch.clone());
1148    }
1149
1150    let user_schema = Arc::new(Schema::new(
1151        user_indices
1152            .iter()
1153            .map(|&i| schema.field(i).clone())
1154            .collect::<Vec<_>>(),
1155    ));
1156    let columns: Vec<Arc<dyn Array>> = user_indices
1157        .iter()
1158        .map(|&i| batch.column(i).clone())
1159        .collect();
1160
1161    RecordBatch::try_new(user_schema, columns)
1162        .map_err(|e| ConnectorError::Internal(format!("strip metadata: {e}")))
1163}
1164
1165/// Executes an UNNEST-based INSERT/UPSERT using Arrow column arrays as parameters.
1166#[cfg(feature = "postgres-sink")]
1167async fn execute_unnest(
1168    client: &tokio_postgres::Client,
1169    sql: &str,
1170    batch: &RecordBatch,
1171) -> Result<u64, ConnectorError> {
1172    let params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = (0..batch.num_columns())
1173        .map(|i| arrow_column_to_pg_array(batch.column(i)))
1174        .collect::<Result<_, _>>()?;
1175
1176    let param_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = params
1177        .iter()
1178        .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
1179        .collect();
1180
1181    client
1182        .execute(sql, &param_refs)
1183        .await
1184        .map_err(|e| ConnectorError::WriteError(format!("UNNEST execute: {e}")))
1185}
1186
1187/// Rough byte-size estimate for metrics (sum of column buffer sizes).
1188#[cfg(feature = "postgres-sink")]
1189fn estimate_batch_bytes(batch: &RecordBatch) -> u64 {
1190    (0..batch.num_columns())
1191        .map(|i| batch.column(i).get_buffer_memory_size() as u64)
1192        .sum()
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197    use super::*;
1198    use arrow_array::{Int64Array, StringArray};
1199    use arrow_schema::{DataType, Field, Schema};
1200
1201    fn test_schema() -> SchemaRef {
1202        Arc::new(Schema::new(vec![
1203            Field::new("id", DataType::Int64, false),
1204            Field::new("name", DataType::Utf8, true),
1205            Field::new("value", DataType::Float64, true),
1206        ]))
1207    }
1208
1209    fn test_config() -> PostgresSinkConfig {
1210        PostgresSinkConfig::new("localhost", "mydb", "events")
1211    }
1212
1213    fn upsert_config() -> PostgresSinkConfig {
1214        let mut cfg = test_config();
1215        cfg.write_mode = WriteMode::Upsert;
1216        cfg.primary_key_columns = vec!["id".to_string()];
1217        cfg
1218    }
1219
1220    fn test_batch(n: usize) -> RecordBatch {
1221        let ids: Vec<i64> = (0..n as i64).collect();
1222        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1223        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1224
1225        RecordBatch::try_new(
1226            test_schema(),
1227            vec![
1228                Arc::new(Int64Array::from(ids)),
1229                Arc::new(StringArray::from(names)),
1230                Arc::new(arrow_array::Float64Array::from(values)),
1231            ],
1232        )
1233        .expect("test batch creation")
1234    }
1235
1236    // ── Constructor tests ──
1237
1238    #[test]
1239    fn test_new_defaults() {
1240        let sink = PostgresSink::new(test_schema(), test_config());
1241        assert_eq!(sink.state(), ConnectorState::Created);
1242        assert_eq!(sink.current_epoch(), 0);
1243        assert_eq!(sink.last_committed_epoch(), 0);
1244        assert_eq!(sink.buffered_rows(), 0);
1245        assert!(sink.upsert_sql.is_none());
1246        assert!(sink.copy_sql.is_none());
1247    }
1248
1249    #[test]
1250    fn test_user_schema_strips_metadata() {
1251        let schema = Arc::new(Schema::new(vec![
1252            Field::new("id", DataType::Int64, false),
1253            Field::new("_op", DataType::Utf8, false),
1254            Field::new("value", DataType::Utf8, true),
1255        ]));
1256        let sink = PostgresSink::new(schema, test_config());
1257        assert_eq!(sink.user_schema.fields().len(), 2);
1258        assert_eq!(sink.user_schema.field(0).name(), "id");
1259        assert_eq!(sink.user_schema.field(1).name(), "value");
1260    }
1261
1262    #[test]
1263    fn test_schema_returned() {
1264        let schema = test_schema();
1265        let sink = PostgresSink::new(schema.clone(), test_config());
1266        assert_eq!(sink.schema(), schema);
1267    }
1268
1269    // ── SQL generation tests ──
1270
1271    #[test]
1272    fn test_build_copy_sql() {
1273        let schema = test_schema();
1274        let config = test_config();
1275        let sql = PostgresSink::build_copy_sql(&schema, &config);
1276        assert_eq!(
1277            sql,
1278            "COPY public.events (id, name, value) FROM STDIN BINARY"
1279        );
1280    }
1281
1282    #[test]
1283    fn test_build_copy_sql_custom_schema() {
1284        let schema = test_schema();
1285        let mut config = test_config();
1286        config.schema_name = "analytics".to_string();
1287        let sql = PostgresSink::build_copy_sql(&schema, &config);
1288        assert!(sql.starts_with("COPY analytics.events"));
1289    }
1290
1291    #[test]
1292    fn test_build_copy_sql_excludes_metadata_columns() {
1293        let schema = Arc::new(Schema::new(vec![
1294            Field::new("id", DataType::Int64, false),
1295            Field::new("_op", DataType::Utf8, false),
1296            Field::new("_ts_ms", DataType::Int64, false),
1297            Field::new("value", DataType::Utf8, true),
1298        ]));
1299        let config = test_config();
1300        let sql = PostgresSink::build_copy_sql(&schema, &config);
1301        assert_eq!(sql, "COPY public.events (id, value) FROM STDIN BINARY");
1302    }
1303
1304    #[test]
1305    fn test_build_upsert_sql() {
1306        let schema = test_schema();
1307        let config = upsert_config();
1308        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1309
1310        assert!(sql.starts_with("INSERT INTO public.events"));
1311        assert!(sql.contains("SELECT * FROM UNNEST"));
1312        assert!(sql.contains("$1::int8[]"));
1313        assert!(sql.contains("$2::text[]"));
1314        assert!(sql.contains("$3::float8[]"));
1315        assert!(sql.contains("ON CONFLICT (id)"));
1316        assert!(sql.contains("DO UPDATE SET"));
1317        assert!(sql.contains("name = EXCLUDED.name"));
1318        assert!(sql.contains("value = EXCLUDED.value"));
1319        assert!(!sql.contains("id = EXCLUDED.id"));
1320    }
1321
1322    #[test]
1323    fn test_build_upsert_sql_composite_key() {
1324        let schema = test_schema();
1325        let mut config = upsert_config();
1326        config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1327        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1328
1329        assert!(sql.contains("ON CONFLICT (id, name)"));
1330        assert!(sql.contains("value = EXCLUDED.value"));
1331        assert!(!sql.contains("id = EXCLUDED.id"));
1332        assert!(!sql.contains("name = EXCLUDED.name"));
1333    }
1334
1335    #[test]
1336    fn test_build_upsert_sql_key_only_table() {
1337        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1338        let mut config = test_config();
1339        config.write_mode = WriteMode::Upsert;
1340        config.primary_key_columns = vec!["id".to_string()];
1341
1342        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1343        assert!(sql.contains("DO NOTHING"), "sql: {sql}");
1344    }
1345
1346    #[test]
1347    fn test_build_delete_sql() {
1348        let schema = test_schema();
1349        let config = upsert_config();
1350        let sql = PostgresSink::build_delete_sql(&schema, &config);
1351
1352        assert_eq!(sql, "DELETE FROM public.events WHERE id = ANY($1::int8[])");
1353    }
1354
1355    #[test]
1356    fn test_build_delete_sql_composite_key() {
1357        let schema = test_schema();
1358        let mut config = upsert_config();
1359        config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1360        let sql = PostgresSink::build_delete_sql(&schema, &config);
1361
1362        assert!(sql.contains("id = ANY($1::int8[])"));
1363        assert!(sql.contains("name = ANY($2::text[])"));
1364        assert!(sql.contains(" AND "));
1365    }
1366
1367    #[test]
1368    fn test_build_create_table_sql() {
1369        let schema = test_schema();
1370        let config = upsert_config();
1371        let sql = PostgresSink::build_create_table_sql(&schema, &config);
1372
1373        assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS public.events"));
1374        assert!(sql.contains("id BIGINT NOT NULL"));
1375        assert!(sql.contains("name TEXT"));
1376        assert!(sql.contains("value DOUBLE PRECISION"));
1377        assert!(sql.contains("PRIMARY KEY (id)"));
1378    }
1379
1380    #[test]
1381    fn test_build_create_table_sql_no_pk() {
1382        let schema = test_schema();
1383        let config = test_config();
1384        let sql = PostgresSink::build_create_table_sql(&schema, &config);
1385
1386        assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS"));
1387        assert!(!sql.contains("PRIMARY KEY"));
1388    }
1389
1390    #[test]
1391    fn test_offset_table_sql() {
1392        let sql = PostgresSink::build_offset_table_sql();
1393        assert!(sql.contains("_laminardb_sink_offsets"));
1394        assert!(sql.contains("sink_id TEXT PRIMARY KEY"));
1395        assert!(sql.contains("epoch BIGINT NOT NULL"));
1396    }
1397
1398    #[test]
1399    fn test_epoch_commit_sql() {
1400        let sql = PostgresSink::build_epoch_commit_sql();
1401        assert!(sql.contains("INSERT INTO _laminardb_sink_offsets"));
1402        assert!(sql.contains("ON CONFLICT (sink_id) DO UPDATE"));
1403    }
1404
1405    #[test]
1406    fn test_epoch_recover_sql() {
1407        let sql = PostgresSink::build_epoch_recover_sql();
1408        assert!(sql.contains("SELECT epoch FROM _laminardb_sink_offsets"));
1409    }
1410
1411    // ── Changelog splitting tests ──
1412
1413    fn changelog_schema() -> SchemaRef {
1414        Arc::new(Schema::new(vec![
1415            Field::new("id", DataType::Int64, false),
1416            Field::new("name", DataType::Utf8, true),
1417            Field::new("_op", DataType::Utf8, false),
1418            Field::new("_ts_ms", DataType::Int64, false),
1419        ]))
1420    }
1421
1422    fn changelog_batch() -> RecordBatch {
1423        RecordBatch::try_new(
1424            changelog_schema(),
1425            vec![
1426                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1427                Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1428                Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1429                Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1430            ],
1431        )
1432        .expect("changelog batch creation")
1433    }
1434
1435    #[test]
1436    fn test_split_changelog_batch() {
1437        let batch = changelog_batch();
1438        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1439
1440        assert_eq!(inserts.num_rows(), 3);
1441        assert_eq!(deletes.num_rows(), 2);
1442        assert_eq!(inserts.num_columns(), 2);
1443        assert_eq!(deletes.num_columns(), 2);
1444
1445        let insert_ids = inserts
1446            .column(0)
1447            .as_any()
1448            .downcast_ref::<Int64Array>()
1449            .expect("i64 array");
1450        assert_eq!(insert_ids.value(0), 1);
1451        assert_eq!(insert_ids.value(1), 2);
1452        assert_eq!(insert_ids.value(2), 4);
1453
1454        let delete_ids = deletes
1455            .column(0)
1456            .as_any()
1457            .downcast_ref::<Int64Array>()
1458            .expect("i64 array");
1459        assert_eq!(delete_ids.value(0), 3);
1460        assert_eq!(delete_ids.value(1), 5);
1461    }
1462
1463    #[test]
1464    fn test_split_changelog_all_inserts() {
1465        let schema = changelog_schema();
1466        let batch = RecordBatch::try_new(
1467            schema,
1468            vec![
1469                Arc::new(Int64Array::from(vec![1, 2])),
1470                Arc::new(StringArray::from(vec!["a", "b"])),
1471                Arc::new(StringArray::from(vec!["I", "I"])),
1472                Arc::new(Int64Array::from(vec![100, 200])),
1473            ],
1474        )
1475        .expect("batch");
1476
1477        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1478        assert_eq!(inserts.num_rows(), 2);
1479        assert_eq!(deletes.num_rows(), 0);
1480    }
1481
1482    #[test]
1483    fn test_split_changelog_all_deletes() {
1484        let schema = changelog_schema();
1485        let batch = RecordBatch::try_new(
1486            schema,
1487            vec![
1488                Arc::new(Int64Array::from(vec![1, 2])),
1489                Arc::new(StringArray::from(vec!["a", "b"])),
1490                Arc::new(StringArray::from(vec!["D", "D"])),
1491                Arc::new(Int64Array::from(vec![100, 200])),
1492            ],
1493        )
1494        .expect("batch");
1495
1496        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1497        assert_eq!(inserts.num_rows(), 0);
1498        assert_eq!(deletes.num_rows(), 2);
1499    }
1500
1501    #[test]
1502    fn test_split_changelog_missing_op_column() {
1503        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1504        let batch =
1505            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).expect("batch");
1506
1507        let result = PostgresSink::split_changelog_batch(&batch);
1508        assert!(result.is_err());
1509    }
1510
1511    #[test]
1512    fn test_split_changelog_snapshot_read() {
1513        let schema = changelog_schema();
1514        let batch = RecordBatch::try_new(
1515            schema,
1516            vec![
1517                Arc::new(Int64Array::from(vec![1])),
1518                Arc::new(StringArray::from(vec!["a"])),
1519                Arc::new(StringArray::from(vec!["r"])),
1520                Arc::new(Int64Array::from(vec![100])),
1521            ],
1522        )
1523        .expect("batch");
1524
1525        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1526        assert_eq!(inserts.num_rows(), 1);
1527        assert_eq!(deletes.num_rows(), 0);
1528    }
1529
1530    // ── Buffering tests ──
1531
1532    #[tokio::test]
1533    async fn test_write_batch_buffering() {
1534        let mut config = test_config();
1535        config.batch_size = 100;
1536        let mut sink = PostgresSink::new(test_schema(), config);
1537        sink.state = ConnectorState::Running;
1538
1539        let batch = test_batch(10);
1540        let result = sink.write_batch(&batch).await.expect("write");
1541
1542        assert_eq!(result.records_written, 0);
1543        assert_eq!(sink.buffered_rows(), 10);
1544    }
1545
1546    #[tokio::test]
1547    async fn test_write_batch_empty() {
1548        let mut sink = PostgresSink::new(test_schema(), test_config());
1549        sink.state = ConnectorState::Running;
1550
1551        let batch = test_batch(0);
1552        let result = sink.write_batch(&batch).await.expect("write");
1553        assert_eq!(result.records_written, 0);
1554        assert_eq!(sink.buffered_rows(), 0);
1555    }
1556
1557    #[tokio::test]
1558    async fn test_write_batch_not_running() {
1559        let mut sink = PostgresSink::new(test_schema(), test_config());
1560
1561        let batch = test_batch(10);
1562        let result = sink.write_batch(&batch).await;
1563        assert!(result.is_err());
1564    }
1565
1566    #[tokio::test]
1567    async fn test_exactly_once_defers_flush() {
1568        let mut config = test_config();
1569        config.batch_size = 5; // Would normally trigger flush at 10 rows
1570        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1571        let mut sink = PostgresSink::new(test_schema(), config);
1572        sink.state = ConnectorState::Running;
1573
1574        let batch = test_batch(10);
1575        let result = sink.write_batch(&batch).await.expect("write");
1576
1577        // Exactly-once: no auto-flush, all buffered.
1578        assert_eq!(result.records_written, 0);
1579        assert_eq!(sink.buffered_rows(), 10);
1580    }
1581
1582    // ── Health check tests ──
1583
1584    #[test]
1585    fn test_health_check_created() {
1586        let sink = PostgresSink::new(test_schema(), test_config());
1587        assert_eq!(sink.health_check(), HealthStatus::Unknown);
1588    }
1589
1590    // ── Capabilities tests ──
1591
1592    #[test]
1593    fn test_capabilities_append_at_least_once() {
1594        let sink = PostgresSink::new(test_schema(), test_config());
1595        let caps = sink.capabilities();
1596        assert!(caps.idempotent);
1597        assert!(!caps.upsert);
1598        assert!(!caps.changelog);
1599        assert!(!caps.exactly_once);
1600    }
1601
1602    #[test]
1603    fn test_capabilities_upsert() {
1604        let sink = PostgresSink::new(test_schema(), upsert_config());
1605        let caps = sink.capabilities();
1606        assert!(caps.upsert);
1607        assert!(caps.idempotent);
1608    }
1609
1610    #[test]
1611    fn test_capabilities_changelog() {
1612        let mut config = test_config();
1613        config.changelog_mode = true;
1614        let sink = PostgresSink::new(test_schema(), config);
1615        let caps = sink.capabilities();
1616        assert!(caps.changelog);
1617    }
1618
1619    #[test]
1620    fn test_capabilities_exactly_once() {
1621        let mut config = upsert_config();
1622        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1623        let sink = PostgresSink::new(test_schema(), config);
1624        let caps = sink.capabilities();
1625        assert!(caps.exactly_once);
1626    }
1627
1628    // ── Metrics tests ──
1629
1630    #[test]
1631    fn test_metrics_initial() {
1632        let sink = PostgresSink::new(test_schema(), test_config());
1633        let m = sink.metrics();
1634        assert_eq!(m.records_total, 0);
1635        assert_eq!(m.bytes_total, 0);
1636        assert_eq!(m.errors_total, 0);
1637    }
1638
1639    // ── Epoch lifecycle tests ──
1640
1641    #[tokio::test]
1642    async fn test_epoch_lifecycle_state() {
1643        let mut sink = PostgresSink::new(test_schema(), test_config());
1644        sink.state = ConnectorState::Running;
1645
1646        sink.begin_epoch(1).await.expect("begin");
1647        assert_eq!(sink.current_epoch(), 1);
1648
1649        // commit_epoch updates last_committed_epoch and records metric.
1650        sink.commit_epoch(1).await.expect("commit");
1651        assert_eq!(sink.last_committed_epoch(), 1);
1652
1653        let m = sink.metrics();
1654        let committed = m.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
1655        assert_eq!(committed.expect("metric").1, 1.0);
1656    }
1657
1658    #[tokio::test]
1659    async fn test_epoch_mismatch_rejected() {
1660        let mut sink = PostgresSink::new(test_schema(), test_config());
1661        sink.state = ConnectorState::Running;
1662
1663        sink.begin_epoch(1).await.expect("begin");
1664        let result = sink.commit_epoch(2).await;
1665        assert!(result.is_err());
1666    }
1667
1668    #[tokio::test]
1669    async fn test_rollback_clears_buffer() {
1670        let mut config = test_config();
1671        config.batch_size = 1000;
1672        let mut sink = PostgresSink::new(test_schema(), config);
1673        sink.state = ConnectorState::Running;
1674
1675        let batch = test_batch(50);
1676        sink.write_batch(&batch).await.expect("write");
1677        assert_eq!(sink.buffered_rows(), 50);
1678
1679        sink.rollback_epoch(0).await.expect("rollback");
1680        assert_eq!(sink.buffered_rows(), 0);
1681    }
1682
1683    // ── Debug output test ──
1684
1685    #[test]
1686    fn test_debug_output() {
1687        let sink = PostgresSink::new(test_schema(), test_config());
1688        let debug = format!("{sink:?}");
1689        assert!(debug.contains("PostgresSink"));
1690        assert!(debug.contains("public.events"));
1691    }
1692
1693    // ── Helper function tests ──
1694
1695    #[test]
1696    fn test_build_user_schema() {
1697        let schema = Arc::new(Schema::new(vec![
1698            Field::new("id", DataType::Int64, false),
1699            Field::new("_op", DataType::Utf8, false),
1700            Field::new("value", DataType::Float64, true),
1701            Field::new("_ts_ms", DataType::Int64, false),
1702        ]));
1703        let user = build_user_schema(&schema);
1704        assert_eq!(user.fields().len(), 2);
1705        assert_eq!(user.field(0).name(), "id");
1706        assert_eq!(user.field(1).name(), "value");
1707    }
1708
1709    #[test]
1710    fn test_build_user_schema_no_metadata() {
1711        let schema = test_schema();
1712        let user = build_user_schema(&schema);
1713        assert_eq!(user.fields().len(), 3);
1714    }
1715}