Skip to main content

laminar_connectors/postgres/
sink.rs

1//! `PostgreSQL` sink connector implementation.
2//!
3//! [`PostgresSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to `PostgreSQL` tables via two strategies:
5//!
6//! - **Append mode**: COPY BINARY for maximum throughput (>500K rows/sec)
7//! - **Upsert mode**: `INSERT ... ON CONFLICT DO UPDATE` with UNNEST arrays
8//!
9//! Exactly-once semantics use co-transactional offset storage: data and epoch
10//! markers are committed in the same `PostgreSQL` transaction.
11//!
12//! # Ring Architecture
13//!
14//! - **Ring 0**: No sink code. Data arrives via SPSC channel (~5ns push).
15//! - **Ring 1**: Batch buffering, COPY/INSERT writes, transaction management.
16//! - **Ring 2**: Connection pool, table creation, epoch recovery.
17
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use arrow_array::{Array, RecordBatch};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use async_trait::async_trait;
24use tracing::{debug, info, warn};
25
26use crate::config::{ConnectorConfig, ConnectorState};
27use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
28use crate::error::ConnectorError;
29
30use super::sink_config::{PostgresSinkConfig, WriteMode};
31use super::sink_metrics::PostgresSinkMetrics;
32use super::types::{arrow_to_pg_ddl_type, arrow_type_to_pg_array_cast, arrow_type_to_pg_sql};
33use crate::connector::DeliveryGuarantee;
34
35#[cfg(feature = "postgres-sink")]
36use super::types::arrow_column_to_pg_array;
37#[cfg(feature = "postgres-sink")]
38use bytes::BytesMut;
39#[cfg(feature = "postgres-sink")]
40use deadpool_postgres::Pool;
41
42/// `PostgreSQL` sink connector.
43///
44/// Writes Arrow `RecordBatch` to `PostgreSQL` tables using COPY BINARY
45/// (append) or UNNEST-based upsert, with optional exactly-once semantics
46/// via co-transactional epoch storage.
47pub struct PostgresSink {
48    /// Sink configuration.
49    config: PostgresSinkConfig,
50    /// Arrow schema for input batches.
51    schema: SchemaRef,
52    /// User-visible schema (metadata columns stripped).
53    user_schema: SchemaRef,
54    /// Connector lifecycle state.
55    state: ConnectorState,
56    /// Current epoch (for exactly-once).
57    current_epoch: u64,
58    /// Last committed epoch.
59    last_committed_epoch: u64,
60    /// Buffered records awaiting flush.
61    buffer: Vec<RecordBatch>,
62    /// Total rows in buffer.
63    buffered_rows: usize,
64    /// Last flush time.
65    last_flush: Instant,
66    /// Sink metrics.
67    metrics: PostgresSinkMetrics,
68    /// Cached upsert SQL statement (for upsert mode).
69    upsert_sql: Option<String>,
70    /// Cached COPY SQL statement (for append mode).
71    copy_sql: Option<String>,
72    /// Cached CREATE TABLE SQL (for auto-create).
73    create_table_sql: Option<String>,
74    /// Cached DELETE SQL (for changelog mode).
75    delete_sql: Option<String>,
76    /// Connection pool (`None` until `open()` is called).
77    #[cfg(feature = "postgres-sink")]
78    pool: Option<Pool>,
79    /// Pre-allocated encode buffer for pgpq COPY BINARY output.
80    #[cfg(feature = "postgres-sink")]
81    encode_buf: BytesMut,
82}
83
84impl PostgresSink {
85    /// Creates a new `PostgreSQL` sink connector.
86    #[must_use]
87    pub fn new(
88        schema: SchemaRef,
89        config: PostgresSinkConfig,
90        registry: Option<&prometheus::Registry>,
91    ) -> Self {
92        let user_schema = build_user_schema(&schema);
93        // Pre-allocate buffer to avoid reallocation during first epoch.
94        let buf_capacity = (config.batch_size / 1024).max(4);
95        Self {
96            config,
97            schema,
98            user_schema,
99            state: ConnectorState::Created,
100            current_epoch: 0,
101            last_committed_epoch: 0,
102            buffer: Vec::with_capacity(buf_capacity),
103            buffered_rows: 0,
104            last_flush: Instant::now(),
105            metrics: PostgresSinkMetrics::new(registry),
106            upsert_sql: None,
107            copy_sql: None,
108            create_table_sql: None,
109            delete_sql: None,
110            #[cfg(feature = "postgres-sink")]
111            pool: None,
112            #[cfg(feature = "postgres-sink")]
113            encode_buf: BytesMut::with_capacity(64 * 1024),
114        }
115    }
116
117    /// Returns the current connector state.
118    #[must_use]
119    pub fn state(&self) -> ConnectorState {
120        self.state
121    }
122
123    /// Returns the current epoch.
124    #[must_use]
125    pub fn current_epoch(&self) -> u64 {
126        self.current_epoch
127    }
128
129    /// Returns the last committed epoch.
130    #[must_use]
131    pub fn last_committed_epoch(&self) -> u64 {
132        self.last_committed_epoch
133    }
134
135    /// Returns the number of buffered rows pending flush.
136    #[must_use]
137    pub fn buffered_rows(&self) -> usize {
138        self.buffered_rows
139    }
140
141    /// Returns a reference to the sink metrics.
142    #[must_use]
143    pub fn sink_metrics(&self) -> &PostgresSinkMetrics {
144        &self.metrics
145    }
146
147    // ── SQL Generation ──────────────────────────────────────────────
148
149    /// Builds the COPY BINARY SQL statement.
150    ///
151    /// ```sql
152    /// COPY public.events (id, value, ts) FROM STDIN BINARY
153    /// ```
154    #[must_use]
155    pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
156        let columns = user_columns(schema);
157        let col_list = columns.join(", ");
158        format!(
159            "COPY {} ({}) FROM STDIN BINARY",
160            config.qualified_table_name(),
161            col_list,
162        )
163    }
164
165    /// Builds the UNNEST-based upsert SQL statement.
166    ///
167    /// ```sql
168    /// INSERT INTO public.target (id, value, updated_at)
169    /// SELECT * FROM UNNEST($1::int8[], $2::text[], $3::timestamptz[])
170    /// ON CONFLICT (id) DO UPDATE SET
171    ///     value = EXCLUDED.value,
172    ///     updated_at = EXCLUDED.updated_at
173    /// ```
174    #[must_use]
175    pub fn build_upsert_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
176        let fields = user_fields(schema);
177
178        let columns: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
179
180        let unnest_params: Vec<String> = fields
181            .iter()
182            .enumerate()
183            .map(|(i, f)| arrow_type_to_pg_array_cast(f.data_type(), i + 1))
184            .collect();
185
186        let non_key_columns: Vec<&str> = columns
187            .iter()
188            .copied()
189            .filter(|c| {
190                !config
191                    .primary_key_columns
192                    .iter()
193                    .any(|pk| pk.as_str() == *c)
194            })
195            .collect();
196
197        let update_clause: Vec<String> = non_key_columns
198            .iter()
199            .map(|c| format!("{c} = EXCLUDED.{c}"))
200            .collect();
201
202        let pk_list = config.primary_key_columns.join(", ");
203
204        if update_clause.is_empty() {
205            // Key-only table: use DO NOTHING
206            format!(
207                "INSERT INTO {} ({}) \
208                 SELECT * FROM UNNEST({}) \
209                 ON CONFLICT ({}) DO NOTHING",
210                config.qualified_table_name(),
211                columns.join(", "),
212                unnest_params.join(", "),
213                pk_list,
214            )
215        } else {
216            format!(
217                "INSERT INTO {} ({}) \
218                 SELECT * FROM UNNEST({}) \
219                 ON CONFLICT ({}) DO UPDATE SET {}",
220                config.qualified_table_name(),
221                columns.join(", "),
222                unnest_params.join(", "),
223                pk_list,
224                update_clause.join(", "),
225            )
226        }
227    }
228
229    /// Builds the DELETE SQL for changelog deletes.
230    ///
231    /// ```sql
232    /// DELETE FROM public.events WHERE id = ANY($1::int8[])
233    /// ```
234    #[must_use]
235    pub fn build_delete_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
236        let pk_conditions: Vec<String> = config
237            .primary_key_columns
238            .iter()
239            .enumerate()
240            .map(|(i, col)| {
241                let dt = schema
242                    .field_with_name(col)
243                    .map_or(DataType::Utf8, |f| f.data_type().clone());
244                let pg_type = arrow_type_to_pg_sql(&dt);
245                format!("{col} = ANY(${}::{}[])", i + 1, pg_type)
246            })
247            .collect();
248
249        format!(
250            "DELETE FROM {} WHERE {}",
251            config.qualified_table_name(),
252            pk_conditions.join(" AND "),
253        )
254    }
255
256    /// Builds CREATE TABLE DDL from the Arrow schema.
257    ///
258    /// ```sql
259    /// CREATE TABLE IF NOT EXISTS public.events (
260    ///     id BIGINT NOT NULL,
261    ///     value TEXT,
262    ///     ts TIMESTAMPTZ,
263    ///     PRIMARY KEY (id)
264    /// )
265    /// ```
266    #[must_use]
267    pub fn build_create_table_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
268        let fields = user_fields(schema);
269
270        let column_defs: Vec<String> = fields
271            .iter()
272            .map(|f| {
273                let pg_type = arrow_to_pg_ddl_type(f.data_type());
274                let nullable = if f.is_nullable() { "" } else { " NOT NULL" };
275                format!("    {} {}{}", f.name(), pg_type, nullable)
276            })
277            .collect();
278
279        let mut ddl = format!(
280            "CREATE TABLE IF NOT EXISTS {} (\n{}\n",
281            config.qualified_table_name(),
282            column_defs.join(",\n"),
283        );
284
285        if !config.primary_key_columns.is_empty() {
286            use std::fmt::Write;
287            let _ = write!(
288                ddl,
289                ",\n    PRIMARY KEY ({})\n",
290                config.primary_key_columns.join(", ")
291            );
292        }
293
294        ddl.push(')');
295        ddl
296    }
297
298    /// Builds CREATE TABLE DDL for the offset tracking table.
299    #[must_use]
300    pub fn build_offset_table_sql() -> &'static str {
301        "CREATE TABLE IF NOT EXISTS _laminardb_sink_offsets (\
302         \n    sink_id TEXT PRIMARY KEY,\
303         \n    epoch BIGINT NOT NULL,\
304         \n    source_offsets JSONB,\
305         \n    watermark BIGINT,\
306         \n    updated_at TIMESTAMPTZ DEFAULT NOW()\
307         \n)"
308    }
309
310    /// Builds the epoch commit SQL.
311    #[must_use]
312    pub fn build_epoch_commit_sql() -> &'static str {
313        "INSERT INTO _laminardb_sink_offsets (sink_id, epoch, updated_at) \
314         VALUES ($1, $2, NOW()) \
315         ON CONFLICT (sink_id) DO UPDATE SET epoch = $2, updated_at = NOW()"
316    }
317
318    /// Builds the epoch recovery SQL.
319    #[must_use]
320    pub fn build_epoch_recover_sql() -> &'static str {
321        "SELECT epoch FROM _laminardb_sink_offsets WHERE sink_id = $1"
322    }
323
324    // ── Changelog/Retraction ────────────────────────────────────────
325
326    /// Splits a changelog `RecordBatch` into insert and delete batches.
327    ///
328    /// Uses the `_op` metadata column:
329    /// - `"I"` (insert), `"U"` (update-after), `"r"` (snapshot read) → insert batch
330    /// - `"D"` (delete) → delete batch
331    ///
332    /// The returned batches exclude metadata columns (those starting with `_`).
333    ///
334    /// # Errors
335    ///
336    /// Returns `ConnectorError::ConfigurationError` if the `_op` column is
337    /// missing or not a string type.
338    pub fn split_changelog_batch(
339        batch: &RecordBatch,
340    ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
341        let op_idx = batch.schema().index_of("_op").map_err(|_| {
342            ConnectorError::ConfigurationError(
343                "changelog mode requires '_op' column in input schema".into(),
344            )
345        })?;
346
347        let op_array = batch
348            .column(op_idx)
349            .as_any()
350            .downcast_ref::<arrow_array::StringArray>()
351            .ok_or_else(|| {
352                ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
353            })?;
354
355        let mut insert_indices = Vec::new();
356        let mut delete_indices = Vec::new();
357
358        for i in 0..op_array.len() {
359            if op_array.is_null(i) {
360                continue;
361            }
362            match op_array.value(i) {
363                "I" | "U" | "r" => {
364                    insert_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
365                }
366                "D" => {
367                    delete_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
368                }
369                _ => {} // Skip unknown ops
370            }
371        }
372
373        let insert_batch = filter_batch_by_indices(batch, &insert_indices)?;
374        let delete_batch = filter_batch_by_indices(batch, &delete_indices)?;
375
376        Ok((insert_batch, delete_batch))
377    }
378
379    // ── Internal helpers ────────────────────────────────────────────
380
381    /// Prepares cached SQL statements based on schema and config.
382    fn prepare_statements(&mut self) {
383        self.copy_sql = Some(Self::build_copy_sql(&self.schema, &self.config));
384
385        if self.config.write_mode == WriteMode::Upsert {
386            self.upsert_sql = Some(Self::build_upsert_sql(&self.schema, &self.config));
387        }
388
389        if self.config.auto_create_table {
390            self.create_table_sql = Some(Self::build_create_table_sql(&self.schema, &self.config));
391        }
392
393        if self.config.changelog_mode {
394            self.delete_sql = Some(Self::build_delete_sql(&self.schema, &self.config));
395        }
396    }
397
398    /// Returns a reference to the connection pool, or an error if not initialized.
399    #[cfg(feature = "postgres-sink")]
400    fn pool(&self) -> Result<&Pool, ConnectorError> {
401        self.pool.as_ref().ok_or(ConnectorError::InvalidState {
402            expected: "pool initialized (call open() first)".into(),
403            actual: "pool not initialized".into(),
404        })
405    }
406
407    /// Concatenates all buffered batches and strips metadata columns.
408    #[cfg(feature = "postgres-sink")]
409    fn concat_buffer(&self) -> Result<RecordBatch, ConnectorError> {
410        if self.buffer.is_empty() {
411            return Ok(RecordBatch::new_empty(self.user_schema.clone()));
412        }
413
414        let stripped: Result<Vec<RecordBatch>, ConnectorError> =
415            self.buffer.iter().map(strip_metadata_columns).collect();
416        let stripped = stripped?;
417
418        arrow_select::concat::concat_batches(&self.user_schema, &stripped)
419            .map_err(|e| ConnectorError::Internal(format!("batch concat failed: {e}")))
420    }
421
422    /// Flushes buffered data to `PostgreSQL` using the COPY BINARY protocol.
423    #[cfg(feature = "postgres-sink")]
424    async fn flush_append(
425        &mut self,
426        client: &tokio_postgres::Client,
427    ) -> Result<WriteResult, ConnectorError> {
428        let user_batch = self.concat_buffer()?;
429        if user_batch.num_rows() == 0 {
430            return Ok(WriteResult::new(0, 0));
431        }
432
433        let mut encoder = pgpq::ArrowToPostgresBinaryEncoder::try_new(self.user_schema.as_ref())
434            .map_err(|e| ConnectorError::Internal(format!("pgpq encoder init: {e}")))?;
435
436        self.encode_buf.clear();
437        encoder.write_header(&mut self.encode_buf);
438        encoder
439            .write_batch(&user_batch, &mut self.encode_buf)
440            .map_err(|e| ConnectorError::Internal(format!("pgpq encode: {e}")))?;
441        encoder
442            .write_footer(&mut self.encode_buf)
443            .map_err(|e| ConnectorError::Internal(format!("pgpq footer: {e}")))?;
444
445        let encoded_bytes = self.encode_buf.len();
446        let bytes_to_send = self.encode_buf.split().freeze();
447
448        let copy_sql = self
449            .copy_sql
450            .as_deref()
451            .ok_or_else(|| ConnectorError::Internal("COPY SQL not prepared".into()))?;
452
453        let sink = client
454            .copy_in(copy_sql)
455            .await
456            .map_err(|e| ConnectorError::WriteError(format!("COPY start: {e}")))?;
457
458        {
459            use futures_util::SinkExt;
460            futures_util::pin_mut!(sink);
461            sink.send(bytes_to_send)
462                .await
463                .map_err(|e| ConnectorError::WriteError(format!("COPY send: {e}")))?;
464            sink.close()
465                .await
466                .map_err(|e| ConnectorError::WriteError(format!("COPY finish: {e}")))?;
467        }
468
469        let rows = user_batch.num_rows();
470        self.metrics.record_write(rows as u64, encoded_bytes as u64);
471        self.metrics.record_flush();
472        self.metrics.record_copy();
473
474        Ok(WriteResult::new(rows, encoded_bytes as u64))
475    }
476
477    /// Flushes buffered data to `PostgreSQL` using UNNEST-based upsert.
478    #[cfg(feature = "postgres-sink")]
479    #[allow(clippy::cast_possible_truncation)]
480    async fn flush_upsert(
481        &mut self,
482        client: &tokio_postgres::Client,
483    ) -> Result<WriteResult, ConnectorError> {
484        if self.config.changelog_mode {
485            return self.flush_changelog(client).await;
486        }
487
488        let user_batch = self.concat_buffer()?;
489        if user_batch.num_rows() == 0 {
490            return Ok(WriteResult::new(0, 0));
491        }
492
493        let upsert_sql = self
494            .upsert_sql
495            .as_deref()
496            .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
497
498        let rows = execute_unnest(client, upsert_sql, &user_batch).await?;
499
500        let byte_estimate = estimate_batch_bytes(&user_batch);
501        self.metrics.record_write(rows, byte_estimate);
502        self.metrics.record_flush();
503        self.metrics.record_upsert();
504
505        Ok(WriteResult::new(rows as usize, byte_estimate))
506    }
507
508    /// Flushes changelog batches: deletes first, then upserts.
509    #[cfg(feature = "postgres-sink")]
510    #[allow(clippy::cast_possible_truncation)]
511    async fn flush_changelog(
512        &mut self,
513        client: &tokio_postgres::Client,
514    ) -> Result<WriteResult, ConnectorError> {
515        let mut total_rows: u64 = 0;
516        let mut total_bytes: u64 = 0;
517
518        // Split each buffered batch into inserts/deletes.
519        let mut all_inserts = Vec::new();
520        let mut all_deletes = Vec::new();
521        for batch in &self.buffer {
522            let (ins, del) = Self::split_changelog_batch(batch)?;
523            if ins.num_rows() > 0 {
524                all_inserts.push(ins);
525            }
526            if del.num_rows() > 0 {
527                all_deletes.push(del);
528            }
529        }
530
531        // Process inserts/updates first, then deletes.
532        // This handles the common CDC pattern where a key is inserted then
533        // later deleted within the same epoch. For the rare case where a key
534        // is deleted then re-inserted in the same batch, the upsert after
535        // delete produces the correct final state.
536        if !all_inserts.is_empty() {
537            let insert_batch =
538                arrow_select::concat::concat_batches(&self.user_schema, &all_inserts)
539                    .map_err(|e| ConnectorError::Internal(format!("concat inserts: {e}")))?;
540
541            let upsert_sql = self
542                .upsert_sql
543                .as_deref()
544                .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
545
546            let rows = execute_unnest(client, upsert_sql, &insert_batch).await?;
547            let bytes = estimate_batch_bytes(&insert_batch);
548            self.metrics.record_write(rows, bytes);
549            self.metrics.record_upsert();
550            total_rows += rows;
551            total_bytes += bytes;
552        }
553
554        if !all_deletes.is_empty() {
555            let delete_batch =
556                arrow_select::concat::concat_batches(&self.user_schema, &all_deletes)
557                    .map_err(|e| ConnectorError::Internal(format!("concat deletes: {e}")))?;
558            let deleted = self.execute_deletes(client, &delete_batch).await?;
559            self.metrics.record_deletes(deleted as u64);
560            total_rows += deleted as u64;
561        }
562
563        self.metrics.record_flush();
564        Ok(WriteResult::new(total_rows as usize, total_bytes))
565    }
566
567    /// Executes batched DELETE for changelog delete records.
568    #[cfg(feature = "postgres-sink")]
569    #[allow(clippy::cast_possible_truncation)]
570    async fn execute_deletes(
571        &self,
572        client: &tokio_postgres::Client,
573        delete_batch: &RecordBatch,
574    ) -> Result<usize, ConnectorError> {
575        if delete_batch.num_rows() == 0 {
576            return Ok(0);
577        }
578
579        let delete_sql = self
580            .delete_sql
581            .as_deref()
582            .ok_or_else(|| ConnectorError::Internal("DELETE SQL not prepared".into()))?;
583
584        let pk_params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = self
585            .config
586            .primary_key_columns
587            .iter()
588            .map(|col| {
589                let idx = delete_batch.schema().index_of(col).map_err(|_| {
590                    ConnectorError::ConfigurationError(format!(
591                        "primary key column '{col}' not in delete batch"
592                    ))
593                })?;
594                arrow_column_to_pg_array(delete_batch.column(idx))
595            })
596            .collect::<Result<_, _>>()?;
597
598        let pk_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = pk_params
599            .iter()
600            .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
601            .collect();
602
603        let rows = client
604            .execute(delete_sql, &pk_refs)
605            .await
606            .map_err(|e| ConnectorError::WriteError(format!("DELETE: {e}")))?;
607
608        Ok(rows as usize)
609    }
610
611    /// Dispatches flush to the appropriate write mode.
612    #[cfg(feature = "postgres-sink")]
613    async fn flush_to_client(
614        &mut self,
615        client: &tokio_postgres::Client,
616    ) -> Result<WriteResult, ConnectorError> {
617        client
618            .execute(
619                &format!(
620                    "SET statement_timeout = '{}'",
621                    self.config.statement_timeout.as_millis()
622                ),
623                &[],
624            )
625            .await
626            .ok(); // non-fatal if unsupported
627        match self.config.write_mode {
628            WriteMode::Append => self.flush_append(client).await,
629            WriteMode::Upsert => self.flush_upsert(client).await,
630        }
631    }
632
633    /// Checks whether the given epoch has already been committed to `PostgreSQL`.
634    #[cfg(feature = "postgres-sink")]
635    #[allow(clippy::cast_sign_loss)]
636    async fn is_epoch_committed(
637        &self,
638        client: &tokio_postgres::Client,
639        epoch: u64,
640    ) -> Result<bool, ConnectorError> {
641        let sink_id = self.config.effective_sink_id();
642        let row = client
643            .query_opt(Self::build_epoch_recover_sql(), &[&sink_id])
644            .await
645            .map_err(|e| ConnectorError::Internal(format!("epoch recovery query: {e}")))?;
646
647        if let Some(row) = row {
648            let committed: i64 = row.get(0);
649            Ok(committed as u64 >= epoch)
650        } else {
651            Ok(false)
652        }
653    }
654}
655
656// ── SinkConnector implementation ────────────────────────────────────
657
658// When the postgres-sink feature is enabled, provide the real implementation.
659#[cfg(feature = "postgres-sink")]
660#[async_trait]
661impl SinkConnector for PostgresSink {
662    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
663        self.state = ConnectorState::Initializing;
664
665        if !config.properties().is_empty() {
666            self.config = PostgresSinkConfig::from_config(config)?;
667        }
668
669        // Validate changelog requires upsert.
670        if self.config.changelog_mode && self.config.write_mode != WriteMode::Upsert {
671            return Err(ConnectorError::ConfigurationError(
672                "changelog mode requires write.mode = 'upsert'".into(),
673            ));
674        }
675
676        info!(
677            table = %self.config.qualified_table_name(),
678            mode = %self.config.write_mode,
679            guarantee = %self.config.delivery_guarantee,
680            "opening PostgreSQL sink connector"
681        );
682
683        self.prepare_statements();
684
685        if self.config.sink_id.is_empty() {
686            self.config.sink_id = self.config.effective_sink_id();
687        }
688
689        // Build connection pool.
690        let mut pool_cfg = deadpool_postgres::Config::new();
691        pool_cfg.host = Some(self.config.hostname.clone());
692        pool_cfg.port = Some(self.config.port);
693        pool_cfg.dbname = Some(self.config.database.clone());
694        pool_cfg.user = Some(self.config.username.clone());
695        pool_cfg.password = Some(self.config.password.clone());
696        let mut deadpool_cfg = deadpool_postgres::PoolConfig::new(self.config.pool_size);
697        deadpool_cfg.timeouts.wait = Some(self.config.connect_timeout);
698        deadpool_cfg.timeouts.create = Some(self.config.connect_timeout);
699        pool_cfg.pool = Some(deadpool_cfg);
700
701        let pool = pool_cfg
702            .create_pool(
703                Some(deadpool_postgres::Runtime::Tokio1),
704                tokio_postgres::NoTls,
705            )
706            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool creation failed: {e}")))?;
707
708        // Validate connectivity.
709        let client = pool.get().await.map_err(|e| {
710            ConnectorError::ConnectionFailed(format!("initial connection failed: {e}"))
711        })?;
712
713        client
714            .execute(
715                &format!(
716                    "SET statement_timeout = '{}'",
717                    self.config.statement_timeout.as_millis()
718                ),
719                &[],
720            )
721            .await
722            .ok(); // non-fatal if unsupported
723
724        // Auto-create target table.
725        if self.config.auto_create_table {
726            if let Some(ddl) = &self.create_table_sql {
727                client.batch_execute(ddl.as_str()).await.map_err(|e| {
728                    ConnectorError::Internal(format!("auto-create table failed: {e}"))
729                })?;
730                debug!(table = %self.config.qualified_table_name(), "target table ensured");
731            }
732        }
733
734        // Create offset tracking table for exactly-once.
735        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
736            client
737                .batch_execute(Self::build_offset_table_sql())
738                .await
739                .map_err(|e| {
740                    ConnectorError::Internal(format!("create offset table failed: {e}"))
741                })?;
742
743            // Recover last committed epoch.
744            {
745                let recover_sink_id = self.config.effective_sink_id();
746                let row = client
747                    .query_opt(Self::build_epoch_recover_sql(), &[&recover_sink_id])
748                    .await
749                    .map_err(|e| ConnectorError::Internal(format!("epoch recovery: {e}")))?;
750                if let Some(row) = row {
751                    let epoch: i64 = row.get(0);
752                    #[allow(clippy::cast_sign_loss)]
753                    {
754                        self.last_committed_epoch = epoch as u64;
755                    }
756                    info!(epoch, "recovered last committed epoch");
757                }
758            }
759        }
760
761        self.pool = Some(pool);
762        self.state = ConnectorState::Running;
763
764        info!(
765            table = %self.config.qualified_table_name(),
766            pool_size = self.config.pool_size,
767            "PostgreSQL sink connector opened"
768        );
769
770        Ok(())
771    }
772
773    #[allow(clippy::cast_possible_truncation)]
774    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
775        if self.state != ConnectorState::Running {
776            return Err(ConnectorError::InvalidState {
777                expected: "Running".into(),
778                actual: self.state.to_string(),
779            });
780        }
781
782        if batch.num_rows() == 0 {
783            return Ok(WriteResult::new(0, 0));
784        }
785
786        self.buffer.push(batch.clone());
787        self.buffered_rows += batch.num_rows();
788
789        // In exactly-once mode, defer all flushing to pre_commit.
790        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
791            return Ok(WriteResult::new(0, 0));
792        }
793
794        // At-least-once: auto-flush when thresholds are reached.
795        let should_flush = self.buffered_rows >= self.config.batch_size
796            || self.last_flush.elapsed() >= self.config.flush_interval;
797
798        if should_flush {
799            let client = self
800                .pool()?
801                .get()
802                .await
803                .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
804            let result = self.flush_to_client(&client).await?;
805            self.buffer.clear();
806            self.buffered_rows = 0;
807            self.last_flush = Instant::now();
808            return Ok(result);
809        }
810
811        Ok(WriteResult::new(0, 0))
812    }
813
814    fn schema(&self) -> SchemaRef {
815        self.schema.clone()
816    }
817
818    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
819        self.current_epoch = epoch;
820        debug!(epoch, "PostgreSQL sink epoch started");
821        Ok(())
822    }
823
824    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
825        if epoch != self.current_epoch {
826            return Err(ConnectorError::TransactionError(format!(
827                "epoch mismatch in pre_commit: expected {}, got {epoch}",
828                self.current_epoch
829            )));
830        }
831
832        if self.buffer.is_empty() {
833            return Ok(());
834        }
835
836        let client = self
837            .pool()?
838            .get()
839            .await
840            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
841
842        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
843            // Check for already-committed epoch (recovery replay).
844            if self.is_epoch_committed(&client, epoch).await? {
845                info!(epoch, "epoch already committed in PostgreSQL, skipping");
846                self.buffer.clear();
847                self.buffered_rows = 0;
848                return Ok(());
849            }
850
851            // Co-transactional: data + epoch marker in one PG transaction.
852            client
853                .batch_execute("BEGIN")
854                .await
855                .map_err(|e| ConnectorError::WriteError(format!("BEGIN: {e}")))?;
856
857            match self.flush_to_client(&client).await {
858                Ok(_result) => {
859                    let sink_id = self.config.effective_sink_id();
860                    #[allow(clippy::cast_possible_wrap)]
861                    let epoch_i64 = epoch as i64;
862                    let params: Vec<&(dyn postgres_types::ToSql + Sync)> =
863                        vec![&sink_id, &epoch_i64];
864                    client
865                        .execute(Self::build_epoch_commit_sql(), &params)
866                        .await
867                        .map_err(|e| {
868                            ConnectorError::WriteError(format!("epoch marker write: {e}"))
869                        })?;
870
871                    client
872                        .batch_execute("COMMIT")
873                        .await
874                        .map_err(|e| ConnectorError::WriteError(format!("COMMIT: {e}")))?;
875                }
876                Err(e) => {
877                    let _ = client.batch_execute("ROLLBACK").await;
878                    return Err(e);
879                }
880            }
881        } else {
882            // At-least-once: just flush remaining buffer.
883            self.flush_to_client(&client).await?;
884        }
885
886        self.buffer.clear();
887        self.buffered_rows = 0;
888        self.last_flush = Instant::now();
889
890        debug!(epoch, "PostgreSQL sink pre-committed");
891        Ok(())
892    }
893
894    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
895        if epoch != self.current_epoch {
896            return Err(ConnectorError::TransactionError(format!(
897                "epoch mismatch: expected {}, got {epoch}",
898                self.current_epoch
899            )));
900        }
901
902        self.last_committed_epoch = epoch;
903        self.metrics.record_commit();
904
905        debug!(epoch, "PostgreSQL sink epoch committed");
906        Ok(())
907    }
908
909    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
910        self.buffer.clear();
911        self.buffered_rows = 0;
912
913        self.metrics.record_rollback();
914        warn!(epoch, "PostgreSQL sink epoch rolled back");
915        Ok(())
916    }
917
918    fn capabilities(&self) -> SinkConnectorCapabilities {
919        // statement_timeout + small margin for pool checkout / setup.
920        let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
921        let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
922
923        if self.config.write_mode == WriteMode::Upsert {
924            caps = caps.with_upsert();
925        }
926        if self.config.changelog_mode {
927            caps = caps.with_changelog();
928        }
929        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
930            caps = caps.with_exactly_once().with_two_phase_commit();
931        }
932
933        caps
934    }
935
936    async fn flush(&mut self) -> Result<(), ConnectorError> {
937        if self.buffer.is_empty() {
938            return Ok(());
939        }
940        let client = self
941            .pool()?
942            .get()
943            .await
944            .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
945        self.flush_to_client(&client).await?;
946        self.buffer.clear();
947        self.buffered_rows = 0;
948        self.last_flush = Instant::now();
949        Ok(())
950    }
951
952    async fn close(&mut self) -> Result<(), ConnectorError> {
953        info!("closing PostgreSQL sink connector");
954
955        if !self.buffer.is_empty() {
956            if let Some(pool) = &self.pool {
957                if let Ok(client) = pool.get().await {
958                    let _ = self.flush_to_client(&client).await;
959                }
960            }
961        }
962
963        self.buffer.clear();
964        self.buffered_rows = 0;
965        self.pool = None;
966        self.state = ConnectorState::Closed;
967
968        info!(
969            table = %self.config.qualified_table_name(),
970            records = self.metrics.records_written.get(),
971            epochs = self.metrics.epochs_committed.get(),
972            "PostgreSQL sink connector closed"
973        );
974
975        Ok(())
976    }
977}
978
979// When postgres-sink feature is NOT enabled, provide a stub that returns UnsupportedOperation.
980#[cfg(not(feature = "postgres-sink"))]
981#[async_trait]
982impl SinkConnector for PostgresSink {
983    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
984        Err(ConnectorError::ConfigurationError(
985            "PostgreSQL sink requires the 'postgres-sink' feature".into(),
986        ))
987    }
988
989    async fn write_batch(&mut self, _batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
990        Err(ConnectorError::ConfigurationError(
991            "PostgreSQL sink requires the 'postgres-sink' feature".into(),
992        ))
993    }
994
995    fn schema(&self) -> SchemaRef {
996        self.schema.clone()
997    }
998
999    fn capabilities(&self) -> SinkConnectorCapabilities {
1000        let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
1001        let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
1002        if self.config.write_mode == WriteMode::Upsert {
1003            caps = caps.with_upsert();
1004        }
1005        if self.config.changelog_mode {
1006            caps = caps.with_changelog();
1007        }
1008        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1009            caps = caps.with_exactly_once().with_two_phase_commit();
1010        }
1011        caps
1012    }
1013
1014    async fn close(&mut self) -> Result<(), ConnectorError> {
1015        self.state = ConnectorState::Closed;
1016        Ok(())
1017    }
1018}
1019
1020impl std::fmt::Debug for PostgresSink {
1021    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1022        f.debug_struct("PostgresSink")
1023            .field("state", &self.state)
1024            .field("table", &self.config.qualified_table_name())
1025            .field("mode", &self.config.write_mode)
1026            .field("guarantee", &self.config.delivery_guarantee)
1027            .field("current_epoch", &self.current_epoch)
1028            .field("last_committed_epoch", &self.last_committed_epoch)
1029            .field("buffered_rows", &self.buffered_rows)
1030            .finish_non_exhaustive()
1031    }
1032}
1033
1034// ── Helper functions ────────────────────────────────────────────────
1035
1036/// Returns user-visible column names (excluding metadata columns starting with `_`).
1037fn user_columns(schema: &SchemaRef) -> Vec<String> {
1038    schema
1039        .fields()
1040        .iter()
1041        .filter(|f| !f.name().starts_with('_'))
1042        .map(|f| f.name().clone())
1043        .collect()
1044}
1045
1046/// Returns user-visible fields (excluding metadata columns starting with `_`).
1047fn user_fields(schema: &SchemaRef) -> Vec<&Arc<Field>> {
1048    schema
1049        .fields()
1050        .iter()
1051        .filter(|f| !f.name().starts_with('_'))
1052        .collect()
1053}
1054
1055/// Builds a schema containing only user-visible columns.
1056fn build_user_schema(schema: &SchemaRef) -> SchemaRef {
1057    Arc::new(Schema::new(
1058        schema
1059            .fields()
1060            .iter()
1061            .filter(|f| !f.name().starts_with('_'))
1062            .cloned()
1063            .collect::<Vec<_>>(),
1064    ))
1065}
1066
1067/// Filters a `RecordBatch` to include only rows at the given indices.
1068///
1069/// Also strips metadata columns (those starting with `_`) from the output.
1070fn filter_batch_by_indices(
1071    batch: &RecordBatch,
1072    indices: &[u32],
1073) -> Result<RecordBatch, ConnectorError> {
1074    if indices.is_empty() {
1075        let user_schema = Arc::new(Schema::new(
1076            batch
1077                .schema()
1078                .fields()
1079                .iter()
1080                .filter(|f| !f.name().starts_with('_'))
1081                .cloned()
1082                .collect::<Vec<_>>(),
1083        ));
1084        return Ok(RecordBatch::new_empty(user_schema));
1085    }
1086
1087    let indices_array = arrow_array::UInt32Array::from(indices.to_vec());
1088
1089    let user_schema = Arc::new(Schema::new(
1090        batch
1091            .schema()
1092            .fields()
1093            .iter()
1094            .filter(|f| !f.name().starts_with('_'))
1095            .cloned()
1096            .collect::<Vec<_>>(),
1097    ));
1098
1099    let filtered_columns: Vec<Arc<dyn arrow_array::Array>> = batch
1100        .schema()
1101        .fields()
1102        .iter()
1103        .enumerate()
1104        .filter(|(_, f)| !f.name().starts_with('_'))
1105        .map(|(i, _)| {
1106            arrow_select::take::take(batch.column(i), &indices_array, None)
1107                .map_err(|e| ConnectorError::Internal(format!("arrow take failed: {e}")))
1108        })
1109        .collect::<Result<Vec<_>, _>>()?;
1110
1111    RecordBatch::try_new(user_schema, filtered_columns)
1112        .map_err(|e| ConnectorError::Internal(format!("batch construction failed: {e}")))
1113}
1114
1115/// Strips metadata columns (starting with `_`) from a `RecordBatch`.
1116#[cfg(feature = "postgres-sink")]
1117fn strip_metadata_columns(batch: &RecordBatch) -> Result<RecordBatch, ConnectorError> {
1118    let schema = batch.schema();
1119    let user_indices: Vec<usize> = schema
1120        .fields()
1121        .iter()
1122        .enumerate()
1123        .filter(|(_, f)| !f.name().starts_with('_'))
1124        .map(|(i, _)| i)
1125        .collect();
1126
1127    if user_indices.len() == schema.fields().len() {
1128        return Ok(batch.clone());
1129    }
1130
1131    let user_schema = Arc::new(Schema::new(
1132        user_indices
1133            .iter()
1134            .map(|&i| schema.field(i).clone())
1135            .collect::<Vec<_>>(),
1136    ));
1137    let columns: Vec<Arc<dyn Array>> = user_indices
1138        .iter()
1139        .map(|&i| batch.column(i).clone())
1140        .collect();
1141
1142    RecordBatch::try_new(user_schema, columns)
1143        .map_err(|e| ConnectorError::Internal(format!("strip metadata: {e}")))
1144}
1145
1146/// Executes an UNNEST-based INSERT/UPSERT using Arrow column arrays as parameters.
1147#[cfg(feature = "postgres-sink")]
1148async fn execute_unnest(
1149    client: &tokio_postgres::Client,
1150    sql: &str,
1151    batch: &RecordBatch,
1152) -> Result<u64, ConnectorError> {
1153    let params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = (0..batch.num_columns())
1154        .map(|i| arrow_column_to_pg_array(batch.column(i)))
1155        .collect::<Result<_, _>>()?;
1156
1157    let param_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = params
1158        .iter()
1159        .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
1160        .collect();
1161
1162    client
1163        .execute(sql, &param_refs)
1164        .await
1165        .map_err(|e| ConnectorError::WriteError(format!("UNNEST execute: {e}")))
1166}
1167
1168/// Rough byte-size estimate for metrics (sum of column buffer sizes).
1169#[cfg(feature = "postgres-sink")]
1170fn estimate_batch_bytes(batch: &RecordBatch) -> u64 {
1171    (0..batch.num_columns())
1172        .map(|i| batch.column(i).get_buffer_memory_size() as u64)
1173        .sum()
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179    use arrow_array::{Int64Array, StringArray};
1180    use arrow_schema::{DataType, Field, Schema};
1181
1182    fn test_schema() -> SchemaRef {
1183        Arc::new(Schema::new(vec![
1184            Field::new("id", DataType::Int64, false),
1185            Field::new("name", DataType::Utf8, true),
1186            Field::new("value", DataType::Float64, true),
1187        ]))
1188    }
1189
1190    fn test_config() -> PostgresSinkConfig {
1191        PostgresSinkConfig::new("localhost", "mydb", "events")
1192    }
1193
1194    fn upsert_config() -> PostgresSinkConfig {
1195        let mut cfg = test_config();
1196        cfg.write_mode = WriteMode::Upsert;
1197        cfg.primary_key_columns = vec!["id".to_string()];
1198        cfg
1199    }
1200
1201    fn test_batch(n: usize) -> RecordBatch {
1202        let ids: Vec<i64> = (0..n as i64).collect();
1203        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1204        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1205
1206        RecordBatch::try_new(
1207            test_schema(),
1208            vec![
1209                Arc::new(Int64Array::from(ids)),
1210                Arc::new(StringArray::from(names)),
1211                Arc::new(arrow_array::Float64Array::from(values)),
1212            ],
1213        )
1214        .expect("test batch creation")
1215    }
1216
1217    // ── Constructor tests ──
1218
1219    #[test]
1220    fn test_new_defaults() {
1221        let sink = PostgresSink::new(test_schema(), test_config(), None);
1222        assert_eq!(sink.state(), ConnectorState::Created);
1223        assert_eq!(sink.current_epoch(), 0);
1224        assert_eq!(sink.last_committed_epoch(), 0);
1225        assert_eq!(sink.buffered_rows(), 0);
1226        assert!(sink.upsert_sql.is_none());
1227        assert!(sink.copy_sql.is_none());
1228    }
1229
1230    #[test]
1231    fn test_user_schema_strips_metadata() {
1232        let schema = Arc::new(Schema::new(vec![
1233            Field::new("id", DataType::Int64, false),
1234            Field::new("_op", DataType::Utf8, false),
1235            Field::new("value", DataType::Utf8, true),
1236        ]));
1237        let sink = PostgresSink::new(schema, test_config(), None);
1238        assert_eq!(sink.user_schema.fields().len(), 2);
1239        assert_eq!(sink.user_schema.field(0).name(), "id");
1240        assert_eq!(sink.user_schema.field(1).name(), "value");
1241    }
1242
1243    #[test]
1244    fn test_schema_returned() {
1245        let schema = test_schema();
1246        let sink = PostgresSink::new(schema.clone(), test_config(), None);
1247        assert_eq!(sink.schema(), schema);
1248    }
1249
1250    // ── SQL generation tests ──
1251
1252    #[test]
1253    fn test_build_copy_sql() {
1254        let schema = test_schema();
1255        let config = test_config();
1256        let sql = PostgresSink::build_copy_sql(&schema, &config);
1257        assert_eq!(
1258            sql,
1259            "COPY public.events (id, name, value) FROM STDIN BINARY"
1260        );
1261    }
1262
1263    #[test]
1264    fn test_build_copy_sql_custom_schema() {
1265        let schema = test_schema();
1266        let mut config = test_config();
1267        config.schema_name = "analytics".to_string();
1268        let sql = PostgresSink::build_copy_sql(&schema, &config);
1269        assert!(sql.starts_with("COPY analytics.events"));
1270    }
1271
1272    #[test]
1273    fn test_build_copy_sql_excludes_metadata_columns() {
1274        let schema = Arc::new(Schema::new(vec![
1275            Field::new("id", DataType::Int64, false),
1276            Field::new("_op", DataType::Utf8, false),
1277            Field::new("_ts_ms", DataType::Int64, false),
1278            Field::new("value", DataType::Utf8, true),
1279        ]));
1280        let config = test_config();
1281        let sql = PostgresSink::build_copy_sql(&schema, &config);
1282        assert_eq!(sql, "COPY public.events (id, value) FROM STDIN BINARY");
1283    }
1284
1285    #[test]
1286    fn test_build_upsert_sql() {
1287        let schema = test_schema();
1288        let config = upsert_config();
1289        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1290
1291        assert!(sql.starts_with("INSERT INTO public.events"));
1292        assert!(sql.contains("SELECT * FROM UNNEST"));
1293        assert!(sql.contains("$1::int8[]"));
1294        assert!(sql.contains("$2::text[]"));
1295        assert!(sql.contains("$3::float8[]"));
1296        assert!(sql.contains("ON CONFLICT (id)"));
1297        assert!(sql.contains("DO UPDATE SET"));
1298        assert!(sql.contains("name = EXCLUDED.name"));
1299        assert!(sql.contains("value = EXCLUDED.value"));
1300        assert!(!sql.contains("id = EXCLUDED.id"));
1301    }
1302
1303    #[test]
1304    fn test_build_upsert_sql_composite_key() {
1305        let schema = test_schema();
1306        let mut config = upsert_config();
1307        config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1308        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1309
1310        assert!(sql.contains("ON CONFLICT (id, name)"));
1311        assert!(sql.contains("value = EXCLUDED.value"));
1312        assert!(!sql.contains("id = EXCLUDED.id"));
1313        assert!(!sql.contains("name = EXCLUDED.name"));
1314    }
1315
1316    #[test]
1317    fn test_build_upsert_sql_key_only_table() {
1318        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1319        let mut config = test_config();
1320        config.write_mode = WriteMode::Upsert;
1321        config.primary_key_columns = vec!["id".to_string()];
1322
1323        let sql = PostgresSink::build_upsert_sql(&schema, &config);
1324        assert!(sql.contains("DO NOTHING"), "sql: {sql}");
1325    }
1326
1327    #[test]
1328    fn test_build_delete_sql() {
1329        let schema = test_schema();
1330        let config = upsert_config();
1331        let sql = PostgresSink::build_delete_sql(&schema, &config);
1332
1333        assert_eq!(sql, "DELETE FROM public.events WHERE id = ANY($1::int8[])");
1334    }
1335
1336    #[test]
1337    fn test_build_delete_sql_composite_key() {
1338        let schema = test_schema();
1339        let mut config = upsert_config();
1340        config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1341        let sql = PostgresSink::build_delete_sql(&schema, &config);
1342
1343        assert!(sql.contains("id = ANY($1::int8[])"));
1344        assert!(sql.contains("name = ANY($2::text[])"));
1345        assert!(sql.contains(" AND "));
1346    }
1347
1348    #[test]
1349    fn test_build_create_table_sql() {
1350        let schema = test_schema();
1351        let config = upsert_config();
1352        let sql = PostgresSink::build_create_table_sql(&schema, &config);
1353
1354        assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS public.events"));
1355        assert!(sql.contains("id BIGINT NOT NULL"));
1356        assert!(sql.contains("name TEXT"));
1357        assert!(sql.contains("value DOUBLE PRECISION"));
1358        assert!(sql.contains("PRIMARY KEY (id)"));
1359    }
1360
1361    #[test]
1362    fn test_build_create_table_sql_no_pk() {
1363        let schema = test_schema();
1364        let config = test_config();
1365        let sql = PostgresSink::build_create_table_sql(&schema, &config);
1366
1367        assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS"));
1368        assert!(!sql.contains("PRIMARY KEY"));
1369    }
1370
1371    #[test]
1372    fn test_offset_table_sql() {
1373        let sql = PostgresSink::build_offset_table_sql();
1374        assert!(sql.contains("_laminardb_sink_offsets"));
1375        assert!(sql.contains("sink_id TEXT PRIMARY KEY"));
1376        assert!(sql.contains("epoch BIGINT NOT NULL"));
1377    }
1378
1379    #[test]
1380    fn test_epoch_commit_sql() {
1381        let sql = PostgresSink::build_epoch_commit_sql();
1382        assert!(sql.contains("INSERT INTO _laminardb_sink_offsets"));
1383        assert!(sql.contains("ON CONFLICT (sink_id) DO UPDATE"));
1384    }
1385
1386    #[test]
1387    fn test_epoch_recover_sql() {
1388        let sql = PostgresSink::build_epoch_recover_sql();
1389        assert!(sql.contains("SELECT epoch FROM _laminardb_sink_offsets"));
1390    }
1391
1392    // ── Changelog splitting tests ──
1393
1394    fn changelog_schema() -> SchemaRef {
1395        Arc::new(Schema::new(vec![
1396            Field::new("id", DataType::Int64, false),
1397            Field::new("name", DataType::Utf8, true),
1398            Field::new("_op", DataType::Utf8, false),
1399            Field::new("_ts_ms", DataType::Int64, false),
1400        ]))
1401    }
1402
1403    fn changelog_batch() -> RecordBatch {
1404        RecordBatch::try_new(
1405            changelog_schema(),
1406            vec![
1407                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1408                Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1409                Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1410                Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1411            ],
1412        )
1413        .expect("changelog batch creation")
1414    }
1415
1416    #[test]
1417    fn test_split_changelog_batch() {
1418        let batch = changelog_batch();
1419        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1420
1421        assert_eq!(inserts.num_rows(), 3);
1422        assert_eq!(deletes.num_rows(), 2);
1423        assert_eq!(inserts.num_columns(), 2);
1424        assert_eq!(deletes.num_columns(), 2);
1425
1426        let insert_ids = inserts
1427            .column(0)
1428            .as_any()
1429            .downcast_ref::<Int64Array>()
1430            .expect("i64 array");
1431        assert_eq!(insert_ids.value(0), 1);
1432        assert_eq!(insert_ids.value(1), 2);
1433        assert_eq!(insert_ids.value(2), 4);
1434
1435        let delete_ids = deletes
1436            .column(0)
1437            .as_any()
1438            .downcast_ref::<Int64Array>()
1439            .expect("i64 array");
1440        assert_eq!(delete_ids.value(0), 3);
1441        assert_eq!(delete_ids.value(1), 5);
1442    }
1443
1444    #[test]
1445    fn test_split_changelog_all_inserts() {
1446        let schema = changelog_schema();
1447        let batch = RecordBatch::try_new(
1448            schema,
1449            vec![
1450                Arc::new(Int64Array::from(vec![1, 2])),
1451                Arc::new(StringArray::from(vec!["a", "b"])),
1452                Arc::new(StringArray::from(vec!["I", "I"])),
1453                Arc::new(Int64Array::from(vec![100, 200])),
1454            ],
1455        )
1456        .expect("batch");
1457
1458        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1459        assert_eq!(inserts.num_rows(), 2);
1460        assert_eq!(deletes.num_rows(), 0);
1461    }
1462
1463    #[test]
1464    fn test_split_changelog_all_deletes() {
1465        let schema = changelog_schema();
1466        let batch = RecordBatch::try_new(
1467            schema,
1468            vec![
1469                Arc::new(Int64Array::from(vec![1, 2])),
1470                Arc::new(StringArray::from(vec!["a", "b"])),
1471                Arc::new(StringArray::from(vec!["D", "D"])),
1472                Arc::new(Int64Array::from(vec![100, 200])),
1473            ],
1474        )
1475        .expect("batch");
1476
1477        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1478        assert_eq!(inserts.num_rows(), 0);
1479        assert_eq!(deletes.num_rows(), 2);
1480    }
1481
1482    #[test]
1483    fn test_split_changelog_missing_op_column() {
1484        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1485        let batch =
1486            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).expect("batch");
1487
1488        let result = PostgresSink::split_changelog_batch(&batch);
1489        assert!(result.is_err());
1490    }
1491
1492    #[test]
1493    fn test_split_changelog_snapshot_read() {
1494        let schema = changelog_schema();
1495        let batch = RecordBatch::try_new(
1496            schema,
1497            vec![
1498                Arc::new(Int64Array::from(vec![1])),
1499                Arc::new(StringArray::from(vec!["a"])),
1500                Arc::new(StringArray::from(vec!["r"])),
1501                Arc::new(Int64Array::from(vec![100])),
1502            ],
1503        )
1504        .expect("batch");
1505
1506        let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1507        assert_eq!(inserts.num_rows(), 1);
1508        assert_eq!(deletes.num_rows(), 0);
1509    }
1510
1511    // ── Buffering tests ──
1512
1513    #[tokio::test]
1514    async fn test_write_batch_buffering() {
1515        let mut config = test_config();
1516        config.batch_size = 100;
1517        let mut sink = PostgresSink::new(test_schema(), config, None);
1518        sink.state = ConnectorState::Running;
1519
1520        let batch = test_batch(10);
1521        let result = sink.write_batch(&batch).await.expect("write");
1522
1523        assert_eq!(result.records_written, 0);
1524        assert_eq!(sink.buffered_rows(), 10);
1525    }
1526
1527    #[tokio::test]
1528    async fn test_write_batch_empty() {
1529        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1530        sink.state = ConnectorState::Running;
1531
1532        let batch = test_batch(0);
1533        let result = sink.write_batch(&batch).await.expect("write");
1534        assert_eq!(result.records_written, 0);
1535        assert_eq!(sink.buffered_rows(), 0);
1536    }
1537
1538    #[tokio::test]
1539    async fn test_write_batch_not_running() {
1540        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1541
1542        let batch = test_batch(10);
1543        let result = sink.write_batch(&batch).await;
1544        assert!(result.is_err());
1545    }
1546
1547    #[tokio::test]
1548    async fn test_exactly_once_defers_flush() {
1549        let mut config = test_config();
1550        config.batch_size = 5; // Would normally trigger flush at 10 rows
1551        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1552        let mut sink = PostgresSink::new(test_schema(), config, None);
1553        sink.state = ConnectorState::Running;
1554
1555        let batch = test_batch(10);
1556        let result = sink.write_batch(&batch).await.expect("write");
1557
1558        // Exactly-once: no auto-flush, all buffered.
1559        assert_eq!(result.records_written, 0);
1560        assert_eq!(sink.buffered_rows(), 10);
1561    }
1562
1563    // ── Capabilities tests ──
1564
1565    #[test]
1566    fn test_capabilities_append_at_least_once() {
1567        let sink = PostgresSink::new(test_schema(), test_config(), None);
1568        let caps = sink.capabilities();
1569        assert!(caps.idempotent);
1570        assert!(!caps.upsert);
1571        assert!(!caps.changelog);
1572        assert!(!caps.exactly_once);
1573    }
1574
1575    #[test]
1576    fn test_capabilities_upsert() {
1577        let sink = PostgresSink::new(test_schema(), upsert_config(), None);
1578        let caps = sink.capabilities();
1579        assert!(caps.upsert);
1580        assert!(caps.idempotent);
1581    }
1582
1583    #[test]
1584    fn test_capabilities_changelog() {
1585        let mut config = test_config();
1586        config.changelog_mode = true;
1587        let sink = PostgresSink::new(test_schema(), config, None);
1588        let caps = sink.capabilities();
1589        assert!(caps.changelog);
1590    }
1591
1592    #[test]
1593    fn test_capabilities_exactly_once() {
1594        let mut config = upsert_config();
1595        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1596        let sink = PostgresSink::new(test_schema(), config, None);
1597        let caps = sink.capabilities();
1598        assert!(caps.exactly_once);
1599    }
1600
1601    // ── Epoch lifecycle tests ──
1602
1603    #[tokio::test]
1604    async fn test_epoch_lifecycle_state() {
1605        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1606        sink.state = ConnectorState::Running;
1607
1608        sink.begin_epoch(1).await.expect("begin");
1609        assert_eq!(sink.current_epoch(), 1);
1610
1611        // commit_epoch updates last_committed_epoch and records metric.
1612        sink.commit_epoch(1).await.expect("commit");
1613        assert_eq!(sink.last_committed_epoch(), 1);
1614
1615        assert_eq!(sink.metrics.epochs_committed.get(), 1);
1616    }
1617
1618    #[tokio::test]
1619    async fn test_epoch_mismatch_rejected() {
1620        let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1621        sink.state = ConnectorState::Running;
1622
1623        sink.begin_epoch(1).await.expect("begin");
1624        let result = sink.commit_epoch(2).await;
1625        assert!(result.is_err());
1626    }
1627
1628    #[tokio::test]
1629    async fn test_rollback_clears_buffer() {
1630        let mut config = test_config();
1631        config.batch_size = 1000;
1632        let mut sink = PostgresSink::new(test_schema(), config, None);
1633        sink.state = ConnectorState::Running;
1634
1635        let batch = test_batch(50);
1636        sink.write_batch(&batch).await.expect("write");
1637        assert_eq!(sink.buffered_rows(), 50);
1638
1639        sink.rollback_epoch(0).await.expect("rollback");
1640        assert_eq!(sink.buffered_rows(), 0);
1641    }
1642
1643    // ── Debug output test ──
1644
1645    #[test]
1646    fn test_debug_output() {
1647        let sink = PostgresSink::new(test_schema(), test_config(), None);
1648        let debug = format!("{sink:?}");
1649        assert!(debug.contains("PostgresSink"));
1650        assert!(debug.contains("public.events"));
1651    }
1652
1653    // ── Helper function tests ──
1654
1655    #[test]
1656    fn test_build_user_schema() {
1657        let schema = Arc::new(Schema::new(vec![
1658            Field::new("id", DataType::Int64, false),
1659            Field::new("_op", DataType::Utf8, false),
1660            Field::new("value", DataType::Float64, true),
1661            Field::new("_ts_ms", DataType::Int64, false),
1662        ]));
1663        let user = build_user_schema(&schema);
1664        assert_eq!(user.fields().len(), 2);
1665        assert_eq!(user.field(0).name(), "id");
1666        assert_eq!(user.field(1).name(), "value");
1667    }
1668
1669    #[test]
1670    fn test_build_user_schema_no_metadata() {
1671        let schema = test_schema();
1672        let user = build_user_schema(&schema);
1673        assert_eq!(user.fields().len(), 3);
1674    }
1675}