Skip to main content

laminar_connectors/postgres/
sink.rs

1//! `PostgreSQL` sink connector implementation.
2//!
3//! [`PostgresSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to `PostgreSQL` tables via two strategies:
5//!
6//! - **Append mode**: COPY BINARY for maximum throughput (>500K rows/sec)
7//! - **Upsert mode**: `INSERT ... ON CONFLICT DO UPDATE` with UNNEST arrays
8//!
9//! Exactly-once semantics use co-transactional offset storage: data and epoch
10//! markers are committed in the same `PostgreSQL` transaction.
11//!
12//! # Ring Architecture
13//!
14//! - **Ring 0**: No sink code. Data arrives via SPSC channel (~5ns push).
15//! - **Ring 1**: Batch buffering, COPY/INSERT writes, transaction management.
16//! - **Ring 2**: Connection pool, table creation, epoch recovery.
17
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use arrow_array::{Array, RecordBatch};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use async_trait::async_trait;
24use tracing::{debug, info, warn};
25
26use crate::config::{ConnectorConfig, ConnectorState};
27use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
28use crate::error::ConnectorError;
29use crate::health::HealthStatus;
30use crate::metrics::ConnectorMetrics;
31
32use super::sink_config::{PostgresSinkConfig, WriteMode};
33use super::sink_metrics::PostgresSinkMetrics;
34use super::types::{arrow_to_pg_ddl_type, arrow_type_to_pg_array_cast, arrow_type_to_pg_sql};
35use crate::connector::DeliveryGuarantee;
36
37#[cfg(feature = "postgres-sink")]
38use super::types::arrow_column_to_pg_array;
39#[cfg(feature = "postgres-sink")]
40use bytes::BytesMut;
41#[cfg(feature = "postgres-sink")]
42use deadpool_postgres::Pool;
43
44/// `PostgreSQL` sink connector.
45///
46/// Writes Arrow `RecordBatch` to `PostgreSQL` tables using COPY BINARY
47/// (append) or UNNEST-based upsert, with optional exactly-once semantics
48/// via co-transactional epoch storage.
49pub struct PostgresSink {
50    /// Sink configuration.
51    config: PostgresSinkConfig,
52    /// Arrow schema for input batches.
53    schema: SchemaRef,
54    /// User-visible schema (metadata columns stripped).
55    user_schema: SchemaRef,
56    /// Connector lifecycle state.
57    state: ConnectorState,
58    /// Current epoch (for exactly-once).
59    current_epoch: u64,
60    /// Last committed epoch.
61    last_committed_epoch: u64,
62    /// Buffered records awaiting flush.
63    buffer: Vec<RecordBatch>,
64    /// Total rows in buffer.
65    buffered_rows: usize,
66    /// Last flush time.
67    last_flush: Instant,
68    /// Sink metrics.
69    metrics: PostgresSinkMetrics,
70    /// Cached upsert SQL statement (for upsert mode).
71    upsert_sql: Option<String>,
72    /// Cached COPY SQL statement (for append mode).
73    copy_sql: Option<String>,
74    /// Cached CREATE TABLE SQL (for auto-create).
75    create_table_sql: Option<String>,
76    /// Cached DELETE SQL (for changelog mode).
77    delete_sql: Option<String>,
78    /// Connection pool (`None` until `open()` is called).
79    #[cfg(feature = "postgres-sink")]
80    pool: Option<Pool>,
81    /// Pre-allocated encode buffer for pgpq COPY BINARY output.
82    #[cfg(feature = "postgres-sink")]
83    encode_buf: BytesMut,
84}
85
86impl PostgresSink {
87    /// Creates a new `PostgreSQL` sink connector.
88    #[must_use]
89    pub fn new(
90        schema: SchemaRef,
91        config: PostgresSinkConfig,
92        registry: Option<&prometheus::Registry>,
93    ) -> Self {
94        let user_schema = build_user_schema(&schema);
95        // Pre-allocate buffer to avoid reallocation during first epoch.
96        let buf_capacity = (config.batch_size / 1024).max(4);
97        Self {
98            config,
99            schema,
100            user_schema,
101            state: ConnectorState::Created,
102            current_epoch: 0,
103            last_committed_epoch: 0,
104            buffer: Vec::with_capacity(buf_capacity),
105            buffered_rows: 0,
106            last_flush: Instant::now(),
107            metrics: PostgresSinkMetrics::new(registry),
108            upsert_sql: None,
109            copy_sql: None,
110            create_table_sql: None,
111            delete_sql: None,
112            #[cfg(feature = "postgres-sink")]
113            pool: None,
114            #[cfg(feature = "postgres-sink")]
115            encode_buf: BytesMut::with_capacity(64 * 1024),
116        }
117    }
118
119    /// Returns the current connector state.
120    #[must_use]
121    pub fn state(&self) -> ConnectorState {
122        self.state
123    }
124
125    /// Returns the current epoch.
126    #[must_use]
127    pub fn current_epoch(&self) -> u64 {
128        self.current_epoch
129    }
130
131    /// Returns the last committed epoch.
132    #[must_use]
133    pub fn last_committed_epoch(&self) -> u64 {
134        self.last_committed_epoch
135    }
136
137    /// Returns the number of buffered rows pending flush.
138    #[must_use]
139    pub fn buffered_rows(&self) -> usize {
140        self.buffered_rows
141    }
142
143    /// Returns a reference to the sink metrics.
144    #[must_use]
145    pub fn sink_metrics(&self) -> &PostgresSinkMetrics {
146        &self.metrics
147    }
148
149    // ── SQL Generation ──────────────────────────────────────────────
150
151    /// Builds the COPY BINARY SQL statement.
152    ///
153    /// ```sql
154    /// COPY public.events (id, value, ts) FROM STDIN BINARY
155    /// ```
156    #[must_use]
157    pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
158        let columns = user_columns(schema);
159        let col_list = columns.join(", ");
160        format!(
161            "COPY {} ({}) FROM STDIN BINARY",
162            config.qualified_table_name(),
163            col_list,
164        )
165    }
166
167    /// Builds the UNNEST-based upsert SQL statement.
168    ///
169    /// ```sql
170    /// INSERT INTO public.target (id, value, updated_at)
171    /// SELECT * FROM UNNEST($1::int8[], $2::text[], $3::timestamptz[])
172    /// ON CONFLICT (id) DO UPDATE SET
173    ///     value = EXCLUDED.value,
174    ///     updated_at = EXCLUDED.updated_at
175    /// ```
176    #[must_use]
177    pub fn build_upsert_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
178        let fields = user_fields(schema);
179
180        let columns: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
181
182        let unnest_params: Vec<String> = fields
183            .iter()
184            .enumerate()
185            .map(|(i, f)| arrow_type_to_pg_array_cast(f.data_type(), i + 1))
186            .collect();
187
188        let non_key_columns: Vec<&str> = columns
189            .iter()
190            .copied()
191            .filter(|c| {
192                !config
193                    .primary_key_columns
194                    .iter()
195                    .any(|pk| pk.as_str() == *c)
196            })
197            .collect();
198
199        let update_clause: Vec<String> = non_key_columns
200            .iter()
201            .map(|c| format!("{c} = EXCLUDED.{c}"))
202            .collect();
203
204        let pk_list = config.primary_key_columns.join(", ");
205
206        if update_clause.is_empty() {
207            // Key-only table: use DO NOTHING
208            format!(
209                "INSERT INTO {} ({}) \
210                 SELECT * FROM UNNEST({}) \
211                 ON CONFLICT ({}) DO NOTHING",
212                config.qualified_table_name(),
213                columns.join(", "),
214                unnest_params.join(", "),
215                pk_list,
216            )
217        } else {
218            format!(
219                "INSERT INTO {} ({}) \
220                 SELECT * FROM UNNEST({}) \
221                 ON CONFLICT ({}) DO UPDATE SET {}",
222                config.qualified_table_name(),
223                columns.join(", "),
224                unnest_params.join(", "),
225                pk_list,
226                update_clause.join(", "),
227            )
228        }
229    }
230
231    /// Builds the DELETE SQL for changelog deletes.
232    ///
233    /// ```sql
234    /// DELETE FROM public.events WHERE id = ANY($1::int8[])
235    /// ```
236    #[must_use]
237    pub fn build_delete_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
238        let pk_conditions: Vec<String> = config
239            .primary_key_columns
240            .iter()
241            .enumerate()
242            .map(|(i, col)| {
243                let dt = schema
244                    .field_with_name(col)
245                    .map_or(DataType::Utf8, |f| f.data_type().clone());
246                let pg_type = arrow_type_to_pg_sql(&dt);
247                format!("{col} = ANY(${}::{}[])", i + 1, pg_type)
248            })
249            .collect();
250
251        format!(
252            "DELETE FROM {} WHERE {}",
253            config.qualified_table_name(),
254            pk_conditions.join(" AND "),
255        )
256    }
257
258    /// Builds CREATE TABLE DDL from the Arrow schema.
259    ///
260    /// ```sql
261    /// CREATE TABLE IF NOT EXISTS public.events (
262    ///     id BIGINT NOT NULL,
263    ///     value TEXT,
264    ///     ts TIMESTAMPTZ,
265    ///     PRIMARY KEY (id)
266    /// )
267    /// ```
268    #[must_use]
269    pub fn build_create_table_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
270        let fields = user_fields(schema);
271
272        let column_defs: Vec<String> = fields
273            .iter()
274            .map(|f| {
275                let pg_type = arrow_to_pg_ddl_type(f.data_type());
276                let nullable = if f.is_nullable() { "" } else { " NOT NULL" };
277                format!("    {} {}{}", f.name(), pg_type, nullable)
278            })
279            .collect();
280
281        let mut ddl = format!(
282            "CREATE TABLE IF NOT EXISTS {} (\n{}\n",
283            config.qualified_table_name(),
284            column_defs.join(",\n"),
285        );
286
287        if !config.primary_key_columns.is_empty() {
288            use std::fmt::Write;
289            let _ = write!(
290                ddl,
291                ",\n    PRIMARY KEY ({})\n",
292                config.primary_key_columns.join(", ")
293            );
294        }
295
296        ddl.push(')');
297        ddl
298    }
299
300    /// Builds CREATE TABLE DDL for the offset tracking table.
301    #[must_use]
302    pub fn build_offset_table_sql() -> &'static str {
303        "CREATE TABLE IF NOT EXISTS _laminardb_sink_offsets (\
304         \n    sink_id TEXT PRIMARY KEY,\
305         \n    epoch BIGINT NOT NULL,\
306         \n    source_offsets JSONB,\
307         \n    watermark BIGINT,\
308         \n    updated_at TIMESTAMPTZ DEFAULT NOW()\
309         \n)"
310    }
311
312    /// Builds the epoch commit SQL.
313    #[must_use]
314    pub fn build_epoch_commit_sql() -> &'static str {
315        "INSERT INTO _laminardb_sink_offsets (sink_id, epoch, updated_at) \
316         VALUES ($1, $2, NOW()) \
317         ON CONFLICT (sink_id) DO UPDATE SET epoch = $2, updated_at = NOW()"
318    }
319
320    /// Builds the epoch recovery SQL.
321    #[must_use]
322    pub fn build_epoch_recover_sql() -> &'static str {
323        "SELECT epoch FROM _laminardb_sink_offsets WHERE sink_id = $1"
324    }
325
326    // ── Changelog/Retraction ────────────────────────────────────────
327
328    /// Splits a changelog `RecordBatch` into insert and delete batches.
329    ///
330    /// Uses the `_op` metadata column:
331    /// - `"I"` (insert), `"U"` (update-after), `"r"` (snapshot read) → insert batch
332    /// - `"D"` (delete) → delete batch
333    ///
334    /// The returned batches exclude metadata columns (those starting with `_`).
335    ///
336    /// # Errors
337    ///
338    /// Returns `ConnectorError::ConfigurationError` if the `_op` column is
339    /// missing or not a string type.
340    pub fn split_changelog_batch(
341        batch: &RecordBatch,
342    ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
343        let op_idx = batch.schema().index_of("_op").map_err(|_| {
344            ConnectorError::ConfigurationError(
345                "changelog mode requires '_op' column in input schema".into(),
346            )
347        })?;
348
349        let op_array = batch
350            .column(op_idx)
351            .as_any()
352            .downcast_ref::<arrow_array::StringArray>()
353            .ok_or_else(|| {
354                ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
355            })?;
356
357        let mut insert_indices = Vec::new();
358        let mut delete_indices = Vec::new();
359
360        for i in 0..op_array.len() {
361            if op_array.is_null(i) {
362                continue;
363            }
364            match op_array.value(i) {
365                "I" | "U" | "r" => {
366                    insert_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
367                }
368                "D" => {
369                    delete_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
370                }
371                _ => {} // Skip unknown ops
372            }
373        }
374
375        let insert_batch = filter_batch_by_indices(batch, &insert_indices)?;
376        let delete_batch = filter_batch_by_indices(batch, &delete_indices)?;
377
378        Ok((insert_batch, delete_batch))
379    }
380
381    // ── Internal helpers ────────────────────────────────────────────
382
383    /// Prepares cached SQL statements based on schema and config.
384    fn prepare_statements(&mut self) {
385        self.copy_sql = Some(Self::build_copy_sql(&self.schema, &self.config));
386
387        if self.config.write_mode == WriteMode::Upsert {
388            self.upsert_sql = Some(Self::build_upsert_sql(&self.schema, &self.config));
389        }
390
391        if self.config.auto_create_table {
392            self.create_table_sql = Some(Self::build_create_table_sql(&self.schema, &self.config));
393        }
394
395        if self.config.changelog_mode {
396            self.delete_sql = Some(Self::build_delete_sql(&self.schema, &self.config));
397        }
398    }
399
400    /// Returns a reference to the connection pool, or an error if not initialized.
401    #[cfg(feature = "postgres-sink")]
402    fn pool(&self) -> Result<&Pool, ConnectorError> {
403        self.pool.as_ref().ok_or(ConnectorError::InvalidState {
404            expected: "pool initialized (call open() first)".into(),
405            actual: "pool not initialized".into(),
406        })
407    }
408
409    /// Concatenates all buffered batches and strips metadata columns.
410    #[cfg(feature = "postgres-sink")]
411    fn concat_buffer(&self) -> Result<RecordBatch, ConnectorError> {
412        if self.buffer.is_empty() {
413            return Ok(RecordBatch::new_empty(self.user_schema.clone()));
414        }
415
416        let stripped: Result<Vec<RecordBatch>, ConnectorError> =
417            self.buffer.iter().map(strip_metadata_columns).collect();
418        let stripped = stripped?;
419
420        arrow_select::concat::concat_batches(&self.user_schema, &stripped)
421            .map_err(|e| ConnectorError::Internal(format!("batch concat failed: {e}")))
422    }
423
424    /// Flushes buffered data to `PostgreSQL` using the COPY BINARY protocol.
425    #[cfg(feature = "postgres-sink")]
426    async fn flush_append(
427        &mut self,
428        client: &tokio_postgres::Client,
429    ) -> Result<WriteResult, ConnectorError> {
430        let user_batch = self.concat_buffer()?;
431        if user_batch.num_rows() == 0 {
432            return Ok(WriteResult::new(0, 0));
433        }
434
435        let mut encoder = pgpq::ArrowToPostgresBinaryEncoder::try_new(self.user_schema.as_ref())
436            .map_err(|e| ConnectorError::Internal(format!("pgpq encoder init: {e}")))?;
437
438        self.encode_buf.clear();
439        encoder.write_header(&mut self.encode_buf);
440        encoder
441            .write_batch(&user_batch, &mut self.encode_buf)
442            .map_err(|e| ConnectorError::Internal(format!("pgpq encode: {e}")))?;
443        encoder
444            .write_footer(&mut self.encode_buf)
445            .map_err(|e| ConnectorError::Internal(format!("pgpq footer: {e}")))?;
446
447        let encoded_bytes = self.encode_buf.len();
448        let bytes_to_send = self.encode_buf.split().freeze();
449
450        let copy_sql = self
451            .copy_sql
452            .as_deref()
453            .ok_or_else(|| ConnectorError::Internal("COPY SQL not prepared".into()))?;
454
455        let sink = client
456            .copy_in(copy_sql)
457            .await
458            .map_err(|e| ConnectorError::WriteError(format!("COPY start: {e}")))?;
459
460        {
461            use futures_util::SinkExt;
462            futures_util::pin_mut!(sink);
463            sink.send(bytes_to_send)
464                .await
465                .map_err(|e| ConnectorError::WriteError(format!("COPY send: {e}")))?;
466            sink.close()
467                .await
468                .map_err(|e| ConnectorError::WriteError(format!("COPY finish: {e}")))?;
469        }
470
471        let rows = user_batch.num_rows();
472        self.metrics.record_write(rows as u64, encoded_bytes as u64);
473        self.metrics.record_flush();
474        self.metrics.record_copy();
475
476        Ok(WriteResult::new(rows, encoded_bytes as u64))
477    }
478
479    /// Flushes buffered data to `PostgreSQL` using UNNEST-based upsert.
480    #[cfg(feature = "postgres-sink")]
481    #[allow(clippy::cast_possible_truncation)]
482    async fn flush_upsert(
483        &mut self,
484        client: &tokio_postgres::Client,
485    ) -> Result<WriteResult, ConnectorError> {
486        if self.config.changelog_mode {
487            return self.flush_changelog(client).await;
488        }
489
490        let user_batch = self.concat_buffer()?;
491        if user_batch.num_rows() == 0 {
492            return Ok(WriteResult::new(0, 0));
493        }
494
495        let upsert_sql = self
496            .upsert_sql
497            .as_deref()
498            .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
499
500        let rows = execute_unnest(client, upsert_sql, &user_batch).await?;
501
502        let byte_estimate = estimate_batch_bytes(&user_batch);
503        self.metrics.record_write(rows, byte_estimate);
504        self.metrics.record_flush();
505        self.metrics.record_upsert();
506
507        Ok(WriteResult::new(rows as usize, byte_estimate))
508    }
509
510    /// Flushes changelog batches: deletes first, then upserts.
511    #[cfg(feature = "postgres-sink")]
512    #[allow(clippy::cast_possible_truncation)]
513    async fn flush_changelog(
514        &mut self,
515        client: &tokio_postgres::Client,
516    ) -> Result<WriteResult, ConnectorError> {
517        let mut total_rows: u64 = 0;
518        let mut total_bytes: u64 = 0;
519
520        // Split each buffered batch into inserts/deletes.
521        let mut all_inserts = Vec::new();
522        let mut all_deletes = Vec::new();
523        for batch in &self.buffer {
524            let (ins, del) = Self::split_changelog_batch(batch)?;
525            if ins.num_rows() > 0 {
526                all_inserts.push(ins);
527            }
528            if del.num_rows() > 0 {
529                all_deletes.push(del);
530            }
531        }
532
533        // Process inserts/updates first, then deletes.
534        // This handles the common CDC pattern where a key is inserted then
535        // later deleted within the same epoch. For the rare case where a key
536        // is deleted then re-inserted in the same batch, the upsert after
537        // delete produces the correct final state.
538        if !all_inserts.is_empty() {
539            let insert_batch =
540                arrow_select::concat::concat_batches(&self.user_schema, &all_inserts)
541                    .map_err(|e| ConnectorError::Internal(format!("concat inserts: {e}")))?;
542
543            let upsert_sql = self
544                .upsert_sql
545                .as_deref()
546                .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
547
548            let rows = execute_unnest(client, upsert_sql, &insert_batch).await?;
549            let bytes = estimate_batch_bytes(&insert_batch);
550            self.metrics.record_write(rows, bytes);
551            self.metrics.record_upsert();
552            total_rows += rows;
553            total_bytes += bytes;
554        }
555
556        if !all_deletes.is_empty() {
557            let delete_batch =
558                arrow_select::concat::concat_batches(&self.user_schema, &all_deletes)
559                    .map_err(|e| ConnectorError::Internal(format!("concat deletes: {e}")))?;
560            let deleted = self.execute_deletes(client, &delete_batch).await?;
561            self.metrics.record_deletes(deleted as u64);
562            total_rows += deleted as u64;
563        }
564
565        self.metrics.record_flush();
566        Ok(WriteResult::new(total_rows as usize, total_bytes))
567    }
568
569    /// Executes batched DELETE for changelog delete records.
570    #[cfg(feature = "postgres-sink")]
571    #[allow(clippy::cast_possible_truncation)]
572    async fn execute_deletes(
573        &self,
574        client: &tokio_postgres::Client,
575        delete_batch: &RecordBatch,
576    ) -> Result<usize, ConnectorError> {
577        if delete_batch.num_rows() == 0 {
578            return Ok(0);
579        }
580
581        let delete_sql = self
582            .delete_sql
583            .as_deref()
584            .ok_or_else(|| ConnectorError::Internal("DELETE SQL not prepared".into()))?;
585
586        let pk_params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = self
587            .config
588            .primary_key_columns
589            .iter()
590            .map(|col| {
591                let idx = delete_batch.schema().index_of(col).map_err(|_| {
592                    ConnectorError::ConfigurationError(format!(
593                        "primary key column '{col}' not in delete batch"
594                    ))
595                })?;
596                arrow_column_to_pg_array(delete_batch.column(idx))
597            })
598            .collect::<Result<_, _>>()?;
599
600        let pk_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = pk_params
601            .iter()
602            .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
603            .collect();
604
605        let rows = client
606            .execute(delete_sql, &pk_refs)
607            .await
608            .map_err(|e| ConnectorError::WriteError(format!("DELETE: {e}")))?;
609
610        Ok(rows as usize)
611    }
612
613    /// Dispatches flush to the appropriate write mode.
614    #[cfg(feature = "postgres-sink")]
615    async fn flush_to_client(
616        &mut self,
617        client: &tokio_postgres::Client,
618    ) -> Result<WriteResult, ConnectorError> {
619        client
620            .execute(
621                &format!(
622                    "SET statement_timeout = '{}'",
623                    self.config.statement_timeout.as_millis()
624                ),
625                &[],
626            )
627            .await
628            .ok(); // non-fatal if unsupported
629        match self.config.write_mode {
630            WriteMode::Append => self.flush_append(client).await,
631            WriteMode::Upsert => self.flush_upsert(client).await,
632        }
633    }
634
635    /// Checks whether the given epoch has already been committed to `PostgreSQL`.
636    #[cfg(feature = "postgres-sink")]
637    #[allow(clippy::cast_sign_loss)]
638    async fn is_epoch_committed(
639        &self,
640        client: &tokio_postgres::Client,
641        epoch: u64,
642    ) -> Result<bool, ConnectorError> {
643        let sink_id = self.config.effective_sink_id();
644        let row = client
645            .query_opt(Self::build_epoch_recover_sql(), &[&sink_id])
646            .await
647            .map_err(|e| ConnectorError::Internal(format!("epoch recovery query: {e}")))?;
648
649        if let Some(row) = row {
650            let committed: i64 = row.get(0);
651            Ok(committed as u64 >= epoch)
652        } else {
653            Ok(false)
654        }
655    }
656}
657
658// ── SinkConnector implementation ────────────────────────────────────
659
660// When the postgres-sink feature is enabled, provide the real implementation.
661#[cfg(feature = "postgres-sink")]
662#[async_trait]
663impl SinkConnector for PostgresSink {
664    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
665        self.state = ConnectorState::Initializing;
666
667        if !config.properties().is_empty() {
668            self.config = PostgresSinkConfig::from_config(config)?;
669        }
670
671        // Validate changelog requires upsert.
672        if self.config.changelog_mode && self.config.write_mode != WriteMode::Upsert {
673            return Err(ConnectorError::ConfigurationError(
674                "changelog mode requires write.mode = 'upsert'".into(),
675            ));
676        }
677
678        info!(
679            table = %self.config.qualified_table_name(),
680            mode = %self.config.write_mode,
681            guarantee = %self.config.delivery_guarantee,
682            "opening PostgreSQL sink connector"
683        );
684
685        self.prepare_statements();
686
687        if self.config.sink_id.is_empty() {
688            self.config.sink_id = self.config.effective_sink_id();
689        }
690
691        // Build connection pool.
692        let mut pool_cfg = deadpool_postgres::Config::new();
693        pool_cfg.host = Some(self.config.hostname.clone());
694        pool_cfg.port = Some(self.config.port);
695        pool_cfg.dbname = Some(self.config.database.clone());
696        pool_cfg.user = Some(self.config.username.clone());
697        pool_cfg.password = Some(self.config.password.clone());
698        let mut deadpool_cfg = deadpool_postgres::PoolConfig::new(self.config.pool_size);
699        deadpool_cfg.timeouts.wait = Some(self.config.connect_timeout);
700        deadpool_cfg.timeouts.create = Some(self.config.connect_timeout);
701        pool_cfg.pool = Some(deadpool_cfg);
702
703        let pool = pool_cfg
704            .create_pool(
705                Some(deadpool_postgres::Runtime::Tokio1),
706                tokio_postgres::NoTls,
707            )
708            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool creation failed: {e}")))?;
709
710        // Validate connectivity.
711        let client = pool.get().await.map_err(|e| {
712            ConnectorError::ConnectionFailed(format!("initial connection failed: {e}"))
713        })?;
714
715        client
716            .execute(
717                &format!(
718                    "SET statement_timeout = '{}'",
719                    self.config.statement_timeout.as_millis()
720                ),
721                &[],
722            )
723            .await
724            .ok(); // non-fatal if unsupported
725
726        // Auto-create target table.
727        if self.config.auto_create_table {
728            if let Some(ddl) = &self.create_table_sql {
729                client.batch_execute(ddl.as_str()).await.map_err(|e| {
730                    ConnectorError::Internal(format!("auto-create table failed: {e}"))
731                })?;
732                debug!(table = %self.config.qualified_table_name(), "target table ensured");
733            }
734        }
735
736        // Create offset tracking table for exactly-once.
737        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
738            client
739                .batch_execute(Self::build_offset_table_sql())
740                .await
741                .map_err(|e| {
742                    ConnectorError::Internal(format!("create offset table failed: {e}"))
743                })?;
744
745            // Recover last committed epoch.
746            {
747                let recover_sink_id = self.config.effective_sink_id();
748                let row = client
749                    .query_opt(Self::build_epoch_recover_sql(), &[&recover_sink_id])
750                    .await
751                    .map_err(|e| ConnectorError::Internal(format!("epoch recovery: {e}")))?;
752                if let Some(row) = row {
753                    let epoch: i64 = row.get(0);
754                    #[allow(clippy::cast_sign_loss)]
755                    {
756                        self.last_committed_epoch = epoch as u64;
757                    }
758                    info!(epoch, "recovered last committed epoch");
759                }
760            }
761        }
762
763        self.pool = Some(pool);
764        self.state = ConnectorState::Running;
765
766        info!(
767            table = %self.config.qualified_table_name(),
768            pool_size = self.config.pool_size,
769            "PostgreSQL sink connector opened"
770        );
771
772        Ok(())
773    }
774
775    #[allow(clippy::cast_possible_truncation)]
776    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
777        if self.state != ConnectorState::Running {
778            return Err(ConnectorError::InvalidState {
779                expected: "Running".into(),
780                actual: self.state.to_string(),
781            });
782        }
783
784        if batch.num_rows() == 0 {
785            return Ok(WriteResult::new(0, 0));
786        }
787
788        self.buffer.push(batch.clone());
789        self.buffered_rows += batch.num_rows();
790
791        // In exactly-once mode, defer all flushing to pre_commit.
792        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
793            return Ok(WriteResult::new(0, 0));
794        }
795
796        // At-least-once: auto-flush when thresholds are reached.
797        let should_flush = self.buffered_rows >= self.config.batch_size
798            || self.last_flush.elapsed() >= self.config.flush_interval;
799
800        if should_flush {
801            let client = self
802                .pool()?
803                .get()
804                .await
805                .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
806            let result = self.flush_to_client(&client).await?;
807            self.buffer.clear();
808            self.buffered_rows = 0;
809            self.last_flush = Instant::now();
810            return Ok(result);
811        }
812
813        Ok(WriteResult::new(0, 0))
814    }
815
816    fn schema(&self) -> SchemaRef {
817        self.schema.clone()
818    }
819
820    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
821        self.current_epoch = epoch;
822        debug!(epoch, "PostgreSQL sink epoch started");
823        Ok(())
824    }
825
826    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
827        if epoch != self.current_epoch {
828            return Err(ConnectorError::TransactionError(format!(
829                "epoch mismatch in pre_commit: expected {}, got {epoch}",
830                self.current_epoch
831            )));
832        }
833
834        if self.buffer.is_empty() {
835            return Ok(());
836        }
837
838        let client = self
839            .pool()?
840            .get()
841            .await
842            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
843
844        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
845            // Check for already-committed epoch (recovery replay).
846            if self.is_epoch_committed(&client, epoch).await? {
847                info!(epoch, "epoch already committed in PostgreSQL, skipping");
848                self.buffer.clear();
849                self.buffered_rows = 0;
850                return Ok(());
851            }
852
853            // Co-transactional: data + epoch marker in one PG transaction.
854            client
855                .batch_execute("BEGIN")
856                .await
857                .map_err(|e| ConnectorError::WriteError(format!("BEGIN: {e}")))?;
858
859            match self.flush_to_client(&client).await {
860                Ok(_result) => {
861                    let sink_id = self.config.effective_sink_id();
862                    #[allow(clippy::cast_possible_wrap)]
863                    let epoch_i64 = epoch as i64;
864                    let params: Vec<&(dyn postgres_types::ToSql + Sync)> =
865                        vec![&sink_id, &epoch_i64];
866                    client
867                        .execute(Self::build_epoch_commit_sql(), &params)
868                        .await
869                        .map_err(|e| {
870                            ConnectorError::WriteError(format!("epoch marker write: {e}"))
871                        })?;
872
873                    client
874                        .batch_execute("COMMIT")
875                        .await
876                        .map_err(|e| ConnectorError::WriteError(format!("COMMIT: {e}")))?;
877                }
878                Err(e) => {
879                    let _ = client.batch_execute("ROLLBACK").await;
880                    return Err(e);
881                }
882            }
883        } else {
884            // At-least-once: just flush remaining buffer.
885            self.flush_to_client(&client).await?;
886        }
887
888        self.buffer.clear();
889        self.buffered_rows = 0;
890        self.last_flush = Instant::now();
891
892        debug!(epoch, "PostgreSQL sink pre-committed");
893        Ok(())
894    }
895
896    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
897        if epoch != self.current_epoch {
898            return Err(ConnectorError::TransactionError(format!(
899                "epoch mismatch: expected {}, got {epoch}",
900                self.current_epoch
901            )));
902        }
903
904        self.last_committed_epoch = epoch;
905        self.metrics.record_commit();
906
907        debug!(epoch, "PostgreSQL sink epoch committed");
908        Ok(())
909    }
910
911    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
912        self.buffer.clear();
913        self.buffered_rows = 0;
914
915        self.metrics.record_rollback();
916        warn!(epoch, "PostgreSQL sink epoch rolled back");
917        Ok(())
918    }
919
920    fn health_check(&self) -> HealthStatus {
921        match self.state {
922            ConnectorState::Running => {
923                if let Some(pool) = &self.pool {
924                    let status = pool.status();
925                    if status.available > 0 {
926                        HealthStatus::Healthy
927                    } else {
928                        HealthStatus::Degraded(format!(
929                            "no available connections ({} in use)",
930                            status.size
931                        ))
932                    }
933                } else {
934                    HealthStatus::Unhealthy("pool not initialized".into())
935                }
936            }
937            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
938            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
939            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
940            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
941            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
942        }
943    }
944
945    fn metrics(&self) -> ConnectorMetrics {
946        self.metrics.to_connector_metrics()
947    }
948
949    fn capabilities(&self) -> SinkConnectorCapabilities {
950        // statement_timeout + small margin for pool checkout / setup.
951        let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
952        let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
953
954        if self.config.write_mode == WriteMode::Upsert {
955            caps = caps.with_upsert();
956        }
957        if self.config.changelog_mode {
958            caps = caps.with_changelog();
959        }
960        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
961            caps = caps.with_exactly_once().with_two_phase_commit();
962        }
963
964        caps
965    }
966
967    async fn flush(&mut self) -> Result<(), ConnectorError> {
968        if self.buffer.is_empty() {
969            return Ok(());
970        }
971        let client = self
972            .pool()?
973            .get()
974            .await
975            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
976        self.flush_to_client(&client).await?;
977        self.buffer.clear();
978        self.buffered_rows = 0;
979        self.last_flush = Instant::now();
980        Ok(())
981    }
982
983    async fn close(&mut self) -> Result<(), ConnectorError> {
984        info!("closing PostgreSQL sink connector");
985
986        if !self.buffer.is_empty() {
987            if let Some(pool) = &self.pool {
988                if let Ok(client) = pool.get().await {
989                    let _ = self.flush_to_client(&client).await;
990                }
991            }
992        }
993
994        self.buffer.clear();
995        self.buffered_rows = 0;
996        self.pool = None;
997        self.state = ConnectorState::Closed;
998
999        info!(
1000            table = %self.config.qualified_table_name(),
1001            records = self.metrics.records_written.get(),
1002            epochs = self.metrics.epochs_committed.get(),
1003            "PostgreSQL sink connector closed"
1004        );
1005
1006        Ok(())
1007    }
1008}
1009
1010// When postgres-sink feature is NOT enabled, provide a stub that returns UnsupportedOperation.
1011#[cfg(not(feature = "postgres-sink"))]
1012#[async_trait]
1013impl SinkConnector for PostgresSink {
1014    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
1015        Err(ConnectorError::ConfigurationError(
1016            "PostgreSQL sink requires the 'postgres-sink' feature".into(),
1017        ))
1018    }
1019
1020    async fn write_batch(&mut self, _batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
1021        Err(ConnectorError::ConfigurationError(
1022            "PostgreSQL sink requires the 'postgres-sink' feature".into(),
1023        ))
1024    }
1025
1026    fn schema(&self) -> SchemaRef {
1027        self.schema.clone()
1028    }
1029
1030    fn health_check(&self) -> HealthStatus {
1031        match self.state {
1032            ConnectorState::Running => HealthStatus::Healthy,
1033            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1034            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1035            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1036            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1037            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1038        }
1039    }
1040
1041    fn metrics(&self) -> ConnectorMetrics {
1042        self.metrics.to_connector_metrics()
1043    }
1044
1045    fn capabilities(&self) -> SinkConnectorCapabilities {
1046        let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
1047        let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
1048        if self.config.write_mode == WriteMode::Upsert {
1049            caps = caps.with_upsert();
1050        }
1051        if self.config.changelog_mode {
1052            caps = caps.with_changelog();
1053        }
1054        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1055            caps = caps.with_exactly_once().with_two_phase_commit();
1056        }
1057        caps
1058    }
1059
1060    async fn close(&mut self) -> Result<(), ConnectorError> {
1061        self.state = ConnectorState::Closed;
1062        Ok(())
1063    }
1064}
1065
1066impl std::fmt::Debug for PostgresSink {
1067    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1068        f.debug_struct("PostgresSink")
1069            .field("state", &self.state)
1070            .field("table", &self.config.qualified_table_name())
1071            .field("mode", &self.config.write_mode)
1072            .field("guarantee", &self.config.delivery_guarantee)
1073            .field("current_epoch", &self.current_epoch)
1074            .field("last_committed_epoch", &self.last_committed_epoch)
1075            .field("buffered_rows", &self.buffered_rows)
1076            .finish_non_exhaustive()
1077    }
1078}
1079
1080// ── Helper functions ────────────────────────────────────────────────
1081
1082/// Returns user-visible column names (excluding metadata columns starting with `_`).
1083fn user_columns(schema: &SchemaRef) -> Vec<String> {
1084    schema
1085        .fields()
1086        .iter()
1087        .filter(|f| !f.name().starts_with('_'))
1088        .map(|f| f.name().clone())
1089        .collect()
1090}
1091
1092/// Returns user-visible fields (excluding metadata columns starting with `_`).
1093fn user_fields(schema: &SchemaRef) -> Vec<&Arc<Field>> {
1094    schema
1095        .fields()
1096        .iter()
1097        .filter(|f| !f.name().starts_with('_'))
1098        .collect()
1099}
1100
1101/// Builds a schema containing only user-visible columns.
1102fn build_user_schema(schema: &SchemaRef) -> SchemaRef {
1103    Arc::new(Schema::new(
1104        schema
1105            .fields()
1106            .iter()
1107            .filter(|f| !f.name().starts_with('_'))
1108            .cloned()
1109            .collect::<Vec<_>>(),
1110    ))
1111}
1112
1113/// Filters a `RecordBatch` to include only rows at the given indices.
1114///
1115/// Also strips metadata columns (those starting with `_`) from the output.
1116fn filter_batch_by_indices(
1117    batch: &RecordBatch,
1118    indices: &[u32],
1119) -> Result<RecordBatch, ConnectorError> {
1120    if indices.is_empty() {
1121        let user_schema = Arc::new(Schema::new(
1122            batch
1123                .schema()
1124                .fields()
1125                .iter()
1126                .filter(|f| !f.name().starts_with('_'))
1127                .cloned()
1128                .collect::<Vec<_>>(),
1129        ));
1130        return Ok(RecordBatch::new_empty(user_schema));
1131    }
1132
1133    let indices_array = arrow_array::UInt32Array::from(indices.to_vec());
1134
1135    let user_schema = Arc::new(Schema::new(
1136        batch
1137            .schema()
1138            .fields()
1139            .iter()
1140            .filter(|f| !f.name().starts_with('_'))
1141            .cloned()
1142            .collect::<Vec<_>>(),
1143    ));
1144
1145    let filtered_columns: Vec<Arc<dyn arrow_array::Array>> = batch
1146        .schema()
1147        .fields()
1148        .iter()
1149        .enumerate()
1150        .filter(|(_, f)| !f.name().starts_with('_'))
1151        .map(|(i, _)| {
1152            arrow_select::take::take(batch.column(i), &indices_array, None)
1153                .map_err(|e| ConnectorError::Internal(format!("arrow take failed: {e}")))
1154        })
1155        .collect::<Result<Vec<_>, _>>()?;
1156
1157    RecordBatch::try_new(user_schema, filtered_columns)
1158        .map_err(|e| ConnectorError::Internal(format!("batch construction failed: {e}")))
1159}
1160
1161/// Strips metadata columns (starting with `_`) from a `RecordBatch`.
1162#[cfg(feature = "postgres-sink")]
1163fn strip_metadata_columns(batch: &RecordBatch) -> Result<RecordBatch, ConnectorError> {
1164    let schema = batch.schema();
1165    let user_indices: Vec<usize> = schema
1166        .fields()
1167        .iter()
1168        .enumerate()
1169        .filter(|(_, f)| !f.name().starts_with('_'))
1170        .map(|(i, _)| i)
1171        .collect();
1172
1173    if user_indices.len() == schema.fields().len() {
1174        return Ok(batch.clone());
1175    }
1176
1177    let user_schema = Arc::new(Schema::new(
1178        user_indices
1179            .iter()
1180            .map(|&i| schema.field(i).clone())
1181            .collect::<Vec<_>>(),
1182    ));
1183    let columns: Vec<Arc<dyn Array>> = user_indices
1184        .iter()
1185        .map(|&i| batch.column(i).clone())
1186        .collect();
1187
1188    RecordBatch::try_new(user_schema, columns)
1189        .map_err(|e| ConnectorError::Internal(format!("strip metadata: {e}")))
1190}
1191
1192/// Executes an UNNEST-based INSERT/UPSERT using Arrow column arrays as parameters.
1193#[cfg(feature = "postgres-sink")]
1194async fn execute_unnest(
1195    client: &tokio_postgres::Client,
1196    sql: &str,
1197    batch: &RecordBatch,
1198) -> Result<u64, ConnectorError> {
1199    let params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = (0..batch.num_columns())
1200        .map(|i| arrow_column_to_pg_array(batch.column(i)))
1201        .collect::<Result<_, _>>()?;
1202
1203    let param_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = params
1204        .iter()
1205        .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
1206        .collect();
1207
1208    client
1209        .execute(sql, &param_refs)
1210        .await
1211        .map_err(|e| ConnectorError::WriteError(format!("UNNEST execute: {e}")))
1212}
1213
1214/// Rough byte-size estimate for metrics (sum of column buffer sizes).
1215#[cfg(feature = "postgres-sink")]
1216fn estimate_batch_bytes(batch: &RecordBatch) -> u64 {
1217    (0..batch.num_columns())
1218        .map(|i| batch.column(i).get_buffer_memory_size() as u64)
1219        .sum()
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224    use super::*;
1225    use arrow_array::{Int64Array, StringArray};
1226    use arrow_schema::{DataType, Field, Schema};
1227
1228    fn test_schema() -> SchemaRef {
1229        Arc::new(Schema::new(vec![
1230            Field::new("id", DataType::Int64, false),
1231            Field::new("name", DataType::Utf8, true),
1232            Field::new("value", DataType::Float64, true),
1233        ]))
1234    }
1235
1236    fn test_config() -> PostgresSinkConfig {
1237        PostgresSinkConfig::new("localhost", "mydb", "events")
1238    }
1239
1240    fn upsert_config() -> PostgresSinkConfig {
1241        let mut cfg = test_config();
1242        cfg.write_mode = WriteMode::Upsert;
1243        cfg.primary_key_columns = vec!["id".to_string()];
1244        cfg
1245    }
1246
1247    fn test_batch(n: usize) -> RecordBatch {
1248        let ids: Vec<i64> = (0..n as i64).collect();
1249        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1250        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1251
1252        RecordBatch::try_new(
1253            test_schema(),
1254            vec![
1255                Arc::new(Int64Array::from(ids)),
1256                Arc::new(StringArray::from(names)),
1257                Arc::new(arrow_array::Float64Array::from(values)),
1258            ],
1259        )
1260        .expect("test batch creation")
1261    }
1262
1263    // ── Constructor tests ──
1264
1265    #[test]
1266    fn test_new_defaults() {
1267        let sink = PostgresSink::new(test_schema(), test_config(), None);
1268        assert_eq!(sink.state(), ConnectorState::Created);
1269        assert_eq!(sink.current_epoch(), 0);
1270        assert_eq!(sink.last_committed_epoch(), 0);
1271        assert_eq!(sink.buffered_rows(), 0);
1272        assert!(sink.upsert_sql.is_none());
1273        assert!(sink.copy_sql.is_none());
1274    }
1275
1276    #[test]
1277    fn test_user_schema_strips_metadata() {
1278        let schema = Arc::new(Schema::new(vec![
1279            Field::new("id", DataType::Int64, false),
1280            Field::new("_op", DataType::Utf8, false),
1281            Field::new("value", DataType::Utf8, true),
1282        ]));
1283        let sink = PostgresSink::new(schema, test_config(), None);
1284        assert_eq!(sink.user_schema.fields().len(), 2);
1285        assert_eq!(sink.user_schema.field(0).name(), "id");
1286        assert_eq!(sink.user_schema.field(1).name(), "value");
1287    }
1288
1289    #[test]
1290    fn test_schema_returned() {
1291        let schema = test_schema();
1292        let sink = PostgresSink::new(schema.clone(), test_config(), None);
1293        assert_eq!(sink.schema(), schema);
1294    }
1295
1296    // ── SQL generation tests ──
1297
1298    #[test]
1299    fn test_build_copy_sql() {
1300        let schema = test_schema();
1301        let config = test_config();
1302        let sql = PostgresSink::build_copy_sql(&schema, &config);
1303        assert_eq!(
1304            sql,
1305            "COPY public.events (id, name, value) FROM STDIN BINARY"
1306        );
1307    }
1308
1309    #[test]
1310    fn test_build_copy_sql_custom_schema() {
1311        let schema = test_schema();
1312        let mut config = test_config();
1313        config.schema_name = "analytics".to_string();
1314        let sql = PostgresSink::build_copy_sql(&schema, &config);
1315        assert!(sql.starts_with("COPY analytics.events"));
1316    }
1317
1318    #[test]
1319    fn test_build_copy_sql_excludes_metadata_columns() {
1320        let schema = Arc::new(Schema::new(vec![
1321            Field::new("id", DataType::Int64, false),
1322            Field::new("_op", DataType::Utf8, false),
1323            Field::new("_ts_ms", DataType::Int64, false),
1324            Field::new("value", DataType::Utf8, true),
1325        ]));
1326        let config = test_config();
1327        let sql = PostgresSink::build_copy_sql(&schema, &config);
1328        assert_eq!(sql, "COPY public.events (id, value) FROM STDIN BINARY");
1329    }
1330
1331    #[test]
1332    fn test_build_upsert_sql() {
1333        let schema = test_schema();
1334        let config = upsert_config();
1335        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1336
1337        assert!(sql.starts_with("INSERT INTO public.events"));
1338        assert!(sql.contains("SELECT * FROM UNNEST"));
1339        assert!(sql.contains("$1::int8[]"));
1340        assert!(sql.contains("$2::text[]"));
1341        assert!(sql.contains("$3::float8[]"));
1342        assert!(sql.contains("ON CONFLICT (id)"));
1343        assert!(sql.contains("DO UPDATE SET"));
1344        assert!(sql.contains("name = EXCLUDED.name"));
1345        assert!(sql.contains("value = EXCLUDED.value"));
1346        assert!(!sql.contains("id = EXCLUDED.id"));
1347    }
1348
1349    #[test]
1350    fn test_build_upsert_sql_composite_key() {
1351        let schema = test_schema();
1352        let mut config = upsert_config();
1353        config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1354        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1355
1356        assert!(sql.contains("ON CONFLICT (id, name)"));
1357        assert!(sql.contains("value = EXCLUDED.value"));
1358        assert!(!sql.contains("id = EXCLUDED.id"));
1359        assert!(!sql.contains("name = EXCLUDED.name"));
1360    }
1361
1362    #[test]
1363    fn test_build_upsert_sql_key_only_table() {
1364        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1365        let mut config = test_config();
1366        config.write_mode = WriteMode::Upsert;
1367        config.primary_key_columns = vec!["id".to_string()];
1368
1369        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1370        assert!(sql.contains("DO NOTHING"), "sql: {sql}");
1371    }
1372
1373    #[test]
1374    fn test_build_delete_sql() {
1375        let schema = test_schema();
1376        let config = upsert_config();
1377        let sql = PostgresSink::build_delete_sql(&schema, &config);
1378
1379        assert_eq!(sql, "DELETE FROM public.events WHERE id = ANY($1::int8[])");
1380    }
1381
1382    #[test]
1383    fn test_build_delete_sql_composite_key() {
1384        let schema = test_schema();
1385        let mut config = upsert_config();
1386        config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1387        let sql = PostgresSink::build_delete_sql(&schema, &config);
1388
1389        assert!(sql.contains("id = ANY($1::int8[])"));
1390        assert!(sql.contains("name = ANY($2::text[])"));
1391        assert!(sql.contains(" AND "));
1392    }
1393
1394    #[test]
1395    fn test_build_create_table_sql() {
1396        let schema = test_schema();
1397        let config = upsert_config();
1398        let sql = PostgresSink::build_create_table_sql(&schema, &config);
1399
1400        assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS public.events"));
1401        assert!(sql.contains("id BIGINT NOT NULL"));
1402        assert!(sql.contains("name TEXT"));
1403        assert!(sql.contains("value DOUBLE PRECISION"));
1404        assert!(sql.contains("PRIMARY KEY (id)"));
1405    }
1406
1407    #[test]
1408    fn test_build_create_table_sql_no_pk() {
1409        let schema = test_schema();
1410        let config = test_config();
1411        let sql = PostgresSink::build_create_table_sql(&schema, &config);
1412
1413        assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS"));
1414        assert!(!sql.contains("PRIMARY KEY"));
1415    }
1416
1417    #[test]
1418    fn test_offset_table_sql() {
1419        let sql = PostgresSink::build_offset_table_sql();
1420        assert!(sql.contains("_laminardb_sink_offsets"));
1421        assert!(sql.contains("sink_id TEXT PRIMARY KEY"));
1422        assert!(sql.contains("epoch BIGINT NOT NULL"));
1423    }
1424
1425    #[test]
1426    fn test_epoch_commit_sql() {
1427        let sql = PostgresSink::build_epoch_commit_sql();
1428        assert!(sql.contains("INSERT INTO _laminardb_sink_offsets"));
1429        assert!(sql.contains("ON CONFLICT (sink_id) DO UPDATE"));
1430    }
1431
1432    #[test]
1433    fn test_epoch_recover_sql() {
1434        let sql = PostgresSink::build_epoch_recover_sql();
1435        assert!(sql.contains("SELECT epoch FROM _laminardb_sink_offsets"));
1436    }
1437
1438    // ── Changelog splitting tests ──
1439
1440    fn changelog_schema() -> SchemaRef {
1441        Arc::new(Schema::new(vec![
1442            Field::new("id", DataType::Int64, false),
1443            Field::new("name", DataType::Utf8, true),
1444            Field::new("_op", DataType::Utf8, false),
1445            Field::new("_ts_ms", DataType::Int64, false),
1446        ]))
1447    }
1448
1449    fn changelog_batch() -> RecordBatch {
1450        RecordBatch::try_new(
1451            changelog_schema(),
1452            vec![
1453                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1454                Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1455                Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1456                Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1457            ],
1458        )
1459        .expect("changelog batch creation")
1460    }
1461
1462    #[test]
1463    fn test_split_changelog_batch() {
1464        let batch = changelog_batch();
1465        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1466
1467        assert_eq!(inserts.num_rows(), 3);
1468        assert_eq!(deletes.num_rows(), 2);
1469        assert_eq!(inserts.num_columns(), 2);
1470        assert_eq!(deletes.num_columns(), 2);
1471
1472        let insert_ids = inserts
1473            .column(0)
1474            .as_any()
1475            .downcast_ref::<Int64Array>()
1476            .expect("i64 array");
1477        assert_eq!(insert_ids.value(0), 1);
1478        assert_eq!(insert_ids.value(1), 2);
1479        assert_eq!(insert_ids.value(2), 4);
1480
1481        let delete_ids = deletes
1482            .column(0)
1483            .as_any()
1484            .downcast_ref::<Int64Array>()
1485            .expect("i64 array");
1486        assert_eq!(delete_ids.value(0), 3);
1487        assert_eq!(delete_ids.value(1), 5);
1488    }
1489
1490    #[test]
1491    fn test_split_changelog_all_inserts() {
1492        let schema = changelog_schema();
1493        let batch = RecordBatch::try_new(
1494            schema,
1495            vec![
1496                Arc::new(Int64Array::from(vec![1, 2])),
1497                Arc::new(StringArray::from(vec!["a", "b"])),
1498                Arc::new(StringArray::from(vec!["I", "I"])),
1499                Arc::new(Int64Array::from(vec![100, 200])),
1500            ],
1501        )
1502        .expect("batch");
1503
1504        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1505        assert_eq!(inserts.num_rows(), 2);
1506        assert_eq!(deletes.num_rows(), 0);
1507    }
1508
1509    #[test]
1510    fn test_split_changelog_all_deletes() {
1511        let schema = changelog_schema();
1512        let batch = RecordBatch::try_new(
1513            schema,
1514            vec![
1515                Arc::new(Int64Array::from(vec![1, 2])),
1516                Arc::new(StringArray::from(vec!["a", "b"])),
1517                Arc::new(StringArray::from(vec!["D", "D"])),
1518                Arc::new(Int64Array::from(vec![100, 200])),
1519            ],
1520        )
1521        .expect("batch");
1522
1523        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1524        assert_eq!(inserts.num_rows(), 0);
1525        assert_eq!(deletes.num_rows(), 2);
1526    }
1527
1528    #[test]
1529    fn test_split_changelog_missing_op_column() {
1530        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1531        let batch =
1532            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).expect("batch");
1533
1534        let result = PostgresSink::split_changelog_batch(&batch);
1535        assert!(result.is_err());
1536    }
1537
1538    #[test]
1539    fn test_split_changelog_snapshot_read() {
1540        let schema = changelog_schema();
1541        let batch = RecordBatch::try_new(
1542            schema,
1543            vec![
1544                Arc::new(Int64Array::from(vec![1])),
1545                Arc::new(StringArray::from(vec!["a"])),
1546                Arc::new(StringArray::from(vec!["r"])),
1547                Arc::new(Int64Array::from(vec![100])),
1548            ],
1549        )
1550        .expect("batch");
1551
1552        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1553        assert_eq!(inserts.num_rows(), 1);
1554        assert_eq!(deletes.num_rows(), 0);
1555    }
1556
1557    // ── Buffering tests ──
1558
1559    #[tokio::test]
1560    async fn test_write_batch_buffering() {
1561        let mut config = test_config();
1562        config.batch_size = 100;
1563        let mut sink = PostgresSink::new(test_schema(), config, None);
1564        sink.state = ConnectorState::Running;
1565
1566        let batch = test_batch(10);
1567        let result = sink.write_batch(&batch).await.expect("write");
1568
1569        assert_eq!(result.records_written, 0);
1570        assert_eq!(sink.buffered_rows(), 10);
1571    }
1572
1573    #[tokio::test]
1574    async fn test_write_batch_empty() {
1575        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1576        sink.state = ConnectorState::Running;
1577
1578        let batch = test_batch(0);
1579        let result = sink.write_batch(&batch).await.expect("write");
1580        assert_eq!(result.records_written, 0);
1581        assert_eq!(sink.buffered_rows(), 0);
1582    }
1583
1584    #[tokio::test]
1585    async fn test_write_batch_not_running() {
1586        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1587
1588        let batch = test_batch(10);
1589        let result = sink.write_batch(&batch).await;
1590        assert!(result.is_err());
1591    }
1592
1593    #[tokio::test]
1594    async fn test_exactly_once_defers_flush() {
1595        let mut config = test_config();
1596        config.batch_size = 5; // Would normally trigger flush at 10 rows
1597        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1598        let mut sink = PostgresSink::new(test_schema(), config, None);
1599        sink.state = ConnectorState::Running;
1600
1601        let batch = test_batch(10);
1602        let result = sink.write_batch(&batch).await.expect("write");
1603
1604        // Exactly-once: no auto-flush, all buffered.
1605        assert_eq!(result.records_written, 0);
1606        assert_eq!(sink.buffered_rows(), 10);
1607    }
1608
1609    // ── Health check tests ──
1610
1611    #[test]
1612    fn test_health_check_created() {
1613        let sink = PostgresSink::new(test_schema(), test_config(), None);
1614        assert_eq!(sink.health_check(), HealthStatus::Unknown);
1615    }
1616
1617    // ── Capabilities tests ──
1618
1619    #[test]
1620    fn test_capabilities_append_at_least_once() {
1621        let sink = PostgresSink::new(test_schema(), test_config(), None);
1622        let caps = sink.capabilities();
1623        assert!(caps.idempotent);
1624        assert!(!caps.upsert);
1625        assert!(!caps.changelog);
1626        assert!(!caps.exactly_once);
1627    }
1628
1629    #[test]
1630    fn test_capabilities_upsert() {
1631        let sink = PostgresSink::new(test_schema(), upsert_config(), None);
1632        let caps = sink.capabilities();
1633        assert!(caps.upsert);
1634        assert!(caps.idempotent);
1635    }
1636
1637    #[test]
1638    fn test_capabilities_changelog() {
1639        let mut config = test_config();
1640        config.changelog_mode = true;
1641        let sink = PostgresSink::new(test_schema(), config, None);
1642        let caps = sink.capabilities();
1643        assert!(caps.changelog);
1644    }
1645
1646    #[test]
1647    fn test_capabilities_exactly_once() {
1648        let mut config = upsert_config();
1649        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1650        let sink = PostgresSink::new(test_schema(), config, None);
1651        let caps = sink.capabilities();
1652        assert!(caps.exactly_once);
1653    }
1654
1655    // ── Metrics tests ──
1656
1657    #[test]
1658    fn test_metrics_initial() {
1659        let sink = PostgresSink::new(test_schema(), test_config(), None);
1660        let m = sink.metrics();
1661        assert_eq!(m.records_total, 0);
1662        assert_eq!(m.bytes_total, 0);
1663        assert_eq!(m.errors_total, 0);
1664    }
1665
1666    // ── Epoch lifecycle tests ──
1667
1668    #[tokio::test]
1669    async fn test_epoch_lifecycle_state() {
1670        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1671        sink.state = ConnectorState::Running;
1672
1673        sink.begin_epoch(1).await.expect("begin");
1674        assert_eq!(sink.current_epoch(), 1);
1675
1676        // commit_epoch updates last_committed_epoch and records metric.
1677        sink.commit_epoch(1).await.expect("commit");
1678        assert_eq!(sink.last_committed_epoch(), 1);
1679
1680        let m = sink.metrics();
1681        let committed = m.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
1682        assert_eq!(committed.expect("metric").1, 1.0);
1683    }
1684
1685    #[tokio::test]
1686    async fn test_epoch_mismatch_rejected() {
1687        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1688        sink.state = ConnectorState::Running;
1689
1690        sink.begin_epoch(1).await.expect("begin");
1691        let result = sink.commit_epoch(2).await;
1692        assert!(result.is_err());
1693    }
1694
1695    #[tokio::test]
1696    async fn test_rollback_clears_buffer() {
1697        let mut config = test_config();
1698        config.batch_size = 1000;
1699        let mut sink = PostgresSink::new(test_schema(), config, None);
1700        sink.state = ConnectorState::Running;
1701
1702        let batch = test_batch(50);
1703        sink.write_batch(&batch).await.expect("write");
1704        assert_eq!(sink.buffered_rows(), 50);
1705
1706        sink.rollback_epoch(0).await.expect("rollback");
1707        assert_eq!(sink.buffered_rows(), 0);
1708    }
1709
1710    // ── Debug output test ──
1711
1712    #[test]
1713    fn test_debug_output() {
1714        let sink = PostgresSink::new(test_schema(), test_config(), None);
1715        let debug = format!("{sink:?}");
1716        assert!(debug.contains("PostgresSink"));
1717        assert!(debug.contains("public.events"));
1718    }
1719
1720    // ── Helper function tests ──
1721
1722    #[test]
1723    fn test_build_user_schema() {
1724        let schema = Arc::new(Schema::new(vec![
1725            Field::new("id", DataType::Int64, false),
1726            Field::new("_op", DataType::Utf8, false),
1727            Field::new("value", DataType::Float64, true),
1728            Field::new("_ts_ms", DataType::Int64, false),
1729        ]));
1730        let user = build_user_schema(&schema);
1731        assert_eq!(user.fields().len(), 2);
1732        assert_eq!(user.field(0).name(), "id");
1733        assert_eq!(user.field(1).name(), "value");
1734    }
1735
1736    #[test]
1737    fn test_build_user_schema_no_metadata() {
1738        let schema = test_schema();
1739        let user = build_user_schema(&schema);
1740        assert_eq!(user.fields().len(), 3);
1741    }
1742}