1use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use arrow_array::{Array, RecordBatch};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use async_trait::async_trait;
24use tracing::{debug, info, warn};
25
26use crate::config::{ConnectorConfig, ConnectorState};
27use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
28use crate::error::ConnectorError;
29
30use super::sink_config::{PostgresSinkConfig, WriteMode};
31use super::sink_metrics::PostgresSinkMetrics;
32use super::types::{arrow_to_pg_ddl_type, arrow_type_to_pg_array_cast, arrow_type_to_pg_sql};
33use crate::connector::DeliveryGuarantee;
34
35#[cfg(feature = "postgres-sink")]
36use super::types::arrow_column_to_pg_array;
37#[cfg(feature = "postgres-sink")]
38use bytes::BytesMut;
39#[cfg(feature = "postgres-sink")]
40use deadpool_postgres::Pool;
41
42pub struct PostgresSink {
48 config: PostgresSinkConfig,
50 schema: SchemaRef,
52 user_schema: SchemaRef,
54 state: ConnectorState,
56 current_epoch: u64,
58 last_committed_epoch: u64,
60 buffer: Vec<RecordBatch>,
62 buffered_rows: usize,
64 last_flush: Instant,
66 metrics: PostgresSinkMetrics,
68 upsert_sql: Option<String>,
70 copy_sql: Option<String>,
72 create_table_sql: Option<String>,
74 delete_sql: Option<String>,
76 #[cfg(feature = "postgres-sink")]
78 pool: Option<Pool>,
79 #[cfg(feature = "postgres-sink")]
81 encode_buf: BytesMut,
82}
83
84impl PostgresSink {
85 #[must_use]
87 pub fn new(
88 schema: SchemaRef,
89 config: PostgresSinkConfig,
90 registry: Option<&prometheus::Registry>,
91 ) -> Self {
92 let user_schema = build_user_schema(&schema);
93 let buf_capacity = (config.batch_size / 1024).max(4);
95 Self {
96 config,
97 schema,
98 user_schema,
99 state: ConnectorState::Created,
100 current_epoch: 0,
101 last_committed_epoch: 0,
102 buffer: Vec::with_capacity(buf_capacity),
103 buffered_rows: 0,
104 last_flush: Instant::now(),
105 metrics: PostgresSinkMetrics::new(registry),
106 upsert_sql: None,
107 copy_sql: None,
108 create_table_sql: None,
109 delete_sql: None,
110 #[cfg(feature = "postgres-sink")]
111 pool: None,
112 #[cfg(feature = "postgres-sink")]
113 encode_buf: BytesMut::with_capacity(64 * 1024),
114 }
115 }
116
117 #[must_use]
119 pub fn state(&self) -> ConnectorState {
120 self.state
121 }
122
123 #[must_use]
125 pub fn current_epoch(&self) -> u64 {
126 self.current_epoch
127 }
128
129 #[must_use]
131 pub fn last_committed_epoch(&self) -> u64 {
132 self.last_committed_epoch
133 }
134
135 #[must_use]
137 pub fn buffered_rows(&self) -> usize {
138 self.buffered_rows
139 }
140
141 #[must_use]
143 pub fn sink_metrics(&self) -> &PostgresSinkMetrics {
144 &self.metrics
145 }
146
147 #[must_use]
155 pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
156 let columns = user_columns(schema);
157 let col_list = columns.join(", ");
158 format!(
159 "COPY {} ({}) FROM STDIN BINARY",
160 config.qualified_table_name(),
161 col_list,
162 )
163 }
164
165 #[must_use]
175 pub fn build_upsert_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
176 let fields = user_fields(schema);
177
178 let columns: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
179
180 let unnest_params: Vec<String> = fields
181 .iter()
182 .enumerate()
183 .map(|(i, f)| arrow_type_to_pg_array_cast(f.data_type(), i + 1))
184 .collect();
185
186 let non_key_columns: Vec<&str> = columns
187 .iter()
188 .copied()
189 .filter(|c| {
190 !config
191 .primary_key_columns
192 .iter()
193 .any(|pk| pk.as_str() == *c)
194 })
195 .collect();
196
197 let update_clause: Vec<String> = non_key_columns
198 .iter()
199 .map(|c| format!("{c} = EXCLUDED.{c}"))
200 .collect();
201
202 let pk_list = config.primary_key_columns.join(", ");
203
204 if update_clause.is_empty() {
205 format!(
207 "INSERT INTO {} ({}) \
208 SELECT * FROM UNNEST({}) \
209 ON CONFLICT ({}) DO NOTHING",
210 config.qualified_table_name(),
211 columns.join(", "),
212 unnest_params.join(", "),
213 pk_list,
214 )
215 } else {
216 format!(
217 "INSERT INTO {} ({}) \
218 SELECT * FROM UNNEST({}) \
219 ON CONFLICT ({}) DO UPDATE SET {}",
220 config.qualified_table_name(),
221 columns.join(", "),
222 unnest_params.join(", "),
223 pk_list,
224 update_clause.join(", "),
225 )
226 }
227 }
228
229 #[must_use]
235 pub fn build_delete_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
236 let pk_conditions: Vec<String> = config
237 .primary_key_columns
238 .iter()
239 .enumerate()
240 .map(|(i, col)| {
241 let dt = schema
242 .field_with_name(col)
243 .map_or(DataType::Utf8, |f| f.data_type().clone());
244 let pg_type = arrow_type_to_pg_sql(&dt);
245 format!("{col} = ANY(${}::{}[])", i + 1, pg_type)
246 })
247 .collect();
248
249 format!(
250 "DELETE FROM {} WHERE {}",
251 config.qualified_table_name(),
252 pk_conditions.join(" AND "),
253 )
254 }
255
256 #[must_use]
267 pub fn build_create_table_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
268 let fields = user_fields(schema);
269
270 let column_defs: Vec<String> = fields
271 .iter()
272 .map(|f| {
273 let pg_type = arrow_to_pg_ddl_type(f.data_type());
274 let nullable = if f.is_nullable() { "" } else { " NOT NULL" };
275 format!(" {} {}{}", f.name(), pg_type, nullable)
276 })
277 .collect();
278
279 let mut ddl = format!(
280 "CREATE TABLE IF NOT EXISTS {} (\n{}\n",
281 config.qualified_table_name(),
282 column_defs.join(",\n"),
283 );
284
285 if !config.primary_key_columns.is_empty() {
286 use std::fmt::Write;
287 let _ = write!(
288 ddl,
289 ",\n PRIMARY KEY ({})\n",
290 config.primary_key_columns.join(", ")
291 );
292 }
293
294 ddl.push(')');
295 ddl
296 }
297
298 #[must_use]
300 pub fn build_offset_table_sql() -> &'static str {
301 "CREATE TABLE IF NOT EXISTS _laminardb_sink_offsets (\
302 \n sink_id TEXT PRIMARY KEY,\
303 \n epoch BIGINT NOT NULL,\
304 \n source_offsets JSONB,\
305 \n watermark BIGINT,\
306 \n updated_at TIMESTAMPTZ DEFAULT NOW()\
307 \n)"
308 }
309
310 #[must_use]
312 pub fn build_epoch_commit_sql() -> &'static str {
313 "INSERT INTO _laminardb_sink_offsets (sink_id, epoch, updated_at) \
314 VALUES ($1, $2, NOW()) \
315 ON CONFLICT (sink_id) DO UPDATE SET epoch = $2, updated_at = NOW()"
316 }
317
318 #[must_use]
320 pub fn build_epoch_recover_sql() -> &'static str {
321 "SELECT epoch FROM _laminardb_sink_offsets WHERE sink_id = $1"
322 }
323
324 pub fn split_changelog_batch(
339 batch: &RecordBatch,
340 ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
341 let op_idx = batch.schema().index_of("_op").map_err(|_| {
342 ConnectorError::ConfigurationError(
343 "changelog mode requires '_op' column in input schema".into(),
344 )
345 })?;
346
347 let op_array = batch
348 .column(op_idx)
349 .as_any()
350 .downcast_ref::<arrow_array::StringArray>()
351 .ok_or_else(|| {
352 ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
353 })?;
354
355 let mut insert_indices = Vec::new();
356 let mut delete_indices = Vec::new();
357
358 for i in 0..op_array.len() {
359 if op_array.is_null(i) {
360 continue;
361 }
362 match op_array.value(i) {
363 "I" | "U" | "r" => {
364 insert_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
365 }
366 "D" => {
367 delete_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
368 }
369 _ => {} }
371 }
372
373 let insert_batch = filter_batch_by_indices(batch, &insert_indices)?;
374 let delete_batch = filter_batch_by_indices(batch, &delete_indices)?;
375
376 Ok((insert_batch, delete_batch))
377 }
378
379 fn prepare_statements(&mut self) {
383 self.copy_sql = Some(Self::build_copy_sql(&self.schema, &self.config));
384
385 if self.config.write_mode == WriteMode::Upsert {
386 self.upsert_sql = Some(Self::build_upsert_sql(&self.schema, &self.config));
387 }
388
389 if self.config.auto_create_table {
390 self.create_table_sql = Some(Self::build_create_table_sql(&self.schema, &self.config));
391 }
392
393 if self.config.changelog_mode {
394 self.delete_sql = Some(Self::build_delete_sql(&self.schema, &self.config));
395 }
396 }
397
398 #[cfg(feature = "postgres-sink")]
400 fn pool(&self) -> Result<&Pool, ConnectorError> {
401 self.pool.as_ref().ok_or(ConnectorError::InvalidState {
402 expected: "pool initialized (call open() first)".into(),
403 actual: "pool not initialized".into(),
404 })
405 }
406
407 #[cfg(feature = "postgres-sink")]
409 fn concat_buffer(&self) -> Result<RecordBatch, ConnectorError> {
410 if self.buffer.is_empty() {
411 return Ok(RecordBatch::new_empty(self.user_schema.clone()));
412 }
413
414 let stripped: Result<Vec<RecordBatch>, ConnectorError> =
415 self.buffer.iter().map(strip_metadata_columns).collect();
416 let stripped = stripped?;
417
418 arrow_select::concat::concat_batches(&self.user_schema, &stripped)
419 .map_err(|e| ConnectorError::Internal(format!("batch concat failed: {e}")))
420 }
421
422 #[cfg(feature = "postgres-sink")]
424 async fn flush_append(
425 &mut self,
426 client: &tokio_postgres::Client,
427 ) -> Result<WriteResult, ConnectorError> {
428 let user_batch = self.concat_buffer()?;
429 if user_batch.num_rows() == 0 {
430 return Ok(WriteResult::new(0, 0));
431 }
432
433 let mut encoder = pgpq::ArrowToPostgresBinaryEncoder::try_new(self.user_schema.as_ref())
434 .map_err(|e| ConnectorError::Internal(format!("pgpq encoder init: {e}")))?;
435
436 self.encode_buf.clear();
437 encoder.write_header(&mut self.encode_buf);
438 encoder
439 .write_batch(&user_batch, &mut self.encode_buf)
440 .map_err(|e| ConnectorError::Internal(format!("pgpq encode: {e}")))?;
441 encoder
442 .write_footer(&mut self.encode_buf)
443 .map_err(|e| ConnectorError::Internal(format!("pgpq footer: {e}")))?;
444
445 let encoded_bytes = self.encode_buf.len();
446 let bytes_to_send = self.encode_buf.split().freeze();
447
448 let copy_sql = self
449 .copy_sql
450 .as_deref()
451 .ok_or_else(|| ConnectorError::Internal("COPY SQL not prepared".into()))?;
452
453 let sink = client
454 .copy_in(copy_sql)
455 .await
456 .map_err(|e| ConnectorError::WriteError(format!("COPY start: {e}")))?;
457
458 {
459 use futures_util::SinkExt;
460 futures_util::pin_mut!(sink);
461 sink.send(bytes_to_send)
462 .await
463 .map_err(|e| ConnectorError::WriteError(format!("COPY send: {e}")))?;
464 sink.close()
465 .await
466 .map_err(|e| ConnectorError::WriteError(format!("COPY finish: {e}")))?;
467 }
468
469 let rows = user_batch.num_rows();
470 self.metrics.record_write(rows as u64, encoded_bytes as u64);
471 self.metrics.record_flush();
472 self.metrics.record_copy();
473
474 Ok(WriteResult::new(rows, encoded_bytes as u64))
475 }
476
477 #[cfg(feature = "postgres-sink")]
479 #[allow(clippy::cast_possible_truncation)]
480 async fn flush_upsert(
481 &mut self,
482 client: &tokio_postgres::Client,
483 ) -> Result<WriteResult, ConnectorError> {
484 if self.config.changelog_mode {
485 return self.flush_changelog(client).await;
486 }
487
488 let user_batch = self.concat_buffer()?;
489 if user_batch.num_rows() == 0 {
490 return Ok(WriteResult::new(0, 0));
491 }
492
493 let upsert_sql = self
494 .upsert_sql
495 .as_deref()
496 .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
497
498 let rows = execute_unnest(client, upsert_sql, &user_batch).await?;
499
500 let byte_estimate = estimate_batch_bytes(&user_batch);
501 self.metrics.record_write(rows, byte_estimate);
502 self.metrics.record_flush();
503 self.metrics.record_upsert();
504
505 Ok(WriteResult::new(rows as usize, byte_estimate))
506 }
507
508 #[cfg(feature = "postgres-sink")]
510 #[allow(clippy::cast_possible_truncation)]
511 async fn flush_changelog(
512 &mut self,
513 client: &tokio_postgres::Client,
514 ) -> Result<WriteResult, ConnectorError> {
515 let mut total_rows: u64 = 0;
516 let mut total_bytes: u64 = 0;
517
518 let mut all_inserts = Vec::new();
520 let mut all_deletes = Vec::new();
521 for batch in &self.buffer {
522 let (ins, del) = Self::split_changelog_batch(batch)?;
523 if ins.num_rows() > 0 {
524 all_inserts.push(ins);
525 }
526 if del.num_rows() > 0 {
527 all_deletes.push(del);
528 }
529 }
530
531 if !all_inserts.is_empty() {
537 let insert_batch =
538 arrow_select::concat::concat_batches(&self.user_schema, &all_inserts)
539 .map_err(|e| ConnectorError::Internal(format!("concat inserts: {e}")))?;
540
541 let upsert_sql = self
542 .upsert_sql
543 .as_deref()
544 .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
545
546 let rows = execute_unnest(client, upsert_sql, &insert_batch).await?;
547 let bytes = estimate_batch_bytes(&insert_batch);
548 self.metrics.record_write(rows, bytes);
549 self.metrics.record_upsert();
550 total_rows += rows;
551 total_bytes += bytes;
552 }
553
554 if !all_deletes.is_empty() {
555 let delete_batch =
556 arrow_select::concat::concat_batches(&self.user_schema, &all_deletes)
557 .map_err(|e| ConnectorError::Internal(format!("concat deletes: {e}")))?;
558 let deleted = self.execute_deletes(client, &delete_batch).await?;
559 self.metrics.record_deletes(deleted as u64);
560 total_rows += deleted as u64;
561 }
562
563 self.metrics.record_flush();
564 Ok(WriteResult::new(total_rows as usize, total_bytes))
565 }
566
567 #[cfg(feature = "postgres-sink")]
569 #[allow(clippy::cast_possible_truncation)]
570 async fn execute_deletes(
571 &self,
572 client: &tokio_postgres::Client,
573 delete_batch: &RecordBatch,
574 ) -> Result<usize, ConnectorError> {
575 if delete_batch.num_rows() == 0 {
576 return Ok(0);
577 }
578
579 let delete_sql = self
580 .delete_sql
581 .as_deref()
582 .ok_or_else(|| ConnectorError::Internal("DELETE SQL not prepared".into()))?;
583
584 let pk_params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = self
585 .config
586 .primary_key_columns
587 .iter()
588 .map(|col| {
589 let idx = delete_batch.schema().index_of(col).map_err(|_| {
590 ConnectorError::ConfigurationError(format!(
591 "primary key column '{col}' not in delete batch"
592 ))
593 })?;
594 arrow_column_to_pg_array(delete_batch.column(idx))
595 })
596 .collect::<Result<_, _>>()?;
597
598 let pk_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = pk_params
599 .iter()
600 .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
601 .collect();
602
603 let rows = client
604 .execute(delete_sql, &pk_refs)
605 .await
606 .map_err(|e| ConnectorError::WriteError(format!("DELETE: {e}")))?;
607
608 Ok(rows as usize)
609 }
610
611 #[cfg(feature = "postgres-sink")]
613 async fn flush_to_client(
614 &mut self,
615 client: &tokio_postgres::Client,
616 ) -> Result<WriteResult, ConnectorError> {
617 client
618 .execute(
619 &format!(
620 "SET statement_timeout = '{}'",
621 self.config.statement_timeout.as_millis()
622 ),
623 &[],
624 )
625 .await
626 .ok(); match self.config.write_mode {
628 WriteMode::Append => self.flush_append(client).await,
629 WriteMode::Upsert => self.flush_upsert(client).await,
630 }
631 }
632
633 #[cfg(feature = "postgres-sink")]
635 #[allow(clippy::cast_sign_loss)]
636 async fn is_epoch_committed(
637 &self,
638 client: &tokio_postgres::Client,
639 epoch: u64,
640 ) -> Result<bool, ConnectorError> {
641 let sink_id = self.config.effective_sink_id();
642 let row = client
643 .query_opt(Self::build_epoch_recover_sql(), &[&sink_id])
644 .await
645 .map_err(|e| ConnectorError::Internal(format!("epoch recovery query: {e}")))?;
646
647 if let Some(row) = row {
648 let committed: i64 = row.get(0);
649 Ok(committed as u64 >= epoch)
650 } else {
651 Ok(false)
652 }
653 }
654}
655
656#[cfg(feature = "postgres-sink")]
660#[async_trait]
661impl SinkConnector for PostgresSink {
662 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
663 self.state = ConnectorState::Initializing;
664
665 if !config.properties().is_empty() {
666 self.config = PostgresSinkConfig::from_config(config)?;
667 }
668
669 if self.config.changelog_mode && self.config.write_mode != WriteMode::Upsert {
671 return Err(ConnectorError::ConfigurationError(
672 "changelog mode requires write.mode = 'upsert'".into(),
673 ));
674 }
675
676 info!(
677 table = %self.config.qualified_table_name(),
678 mode = %self.config.write_mode,
679 guarantee = %self.config.delivery_guarantee,
680 "opening PostgreSQL sink connector"
681 );
682
683 self.prepare_statements();
684
685 if self.config.sink_id.is_empty() {
686 self.config.sink_id = self.config.effective_sink_id();
687 }
688
689 let mut pool_cfg = deadpool_postgres::Config::new();
691 pool_cfg.host = Some(self.config.hostname.clone());
692 pool_cfg.port = Some(self.config.port);
693 pool_cfg.dbname = Some(self.config.database.clone());
694 pool_cfg.user = Some(self.config.username.clone());
695 pool_cfg.password = Some(self.config.password.clone());
696 let mut deadpool_cfg = deadpool_postgres::PoolConfig::new(self.config.pool_size);
697 deadpool_cfg.timeouts.wait = Some(self.config.connect_timeout);
698 deadpool_cfg.timeouts.create = Some(self.config.connect_timeout);
699 pool_cfg.pool = Some(deadpool_cfg);
700
701 let pool = pool_cfg
702 .create_pool(
703 Some(deadpool_postgres::Runtime::Tokio1),
704 tokio_postgres::NoTls,
705 )
706 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool creation failed: {e}")))?;
707
708 let client = pool.get().await.map_err(|e| {
710 ConnectorError::ConnectionFailed(format!("initial connection failed: {e}"))
711 })?;
712
713 client
714 .execute(
715 &format!(
716 "SET statement_timeout = '{}'",
717 self.config.statement_timeout.as_millis()
718 ),
719 &[],
720 )
721 .await
722 .ok(); if self.config.auto_create_table {
726 if let Some(ddl) = &self.create_table_sql {
727 client.batch_execute(ddl.as_str()).await.map_err(|e| {
728 ConnectorError::Internal(format!("auto-create table failed: {e}"))
729 })?;
730 debug!(table = %self.config.qualified_table_name(), "target table ensured");
731 }
732 }
733
734 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
736 client
737 .batch_execute(Self::build_offset_table_sql())
738 .await
739 .map_err(|e| {
740 ConnectorError::Internal(format!("create offset table failed: {e}"))
741 })?;
742
743 {
745 let recover_sink_id = self.config.effective_sink_id();
746 let row = client
747 .query_opt(Self::build_epoch_recover_sql(), &[&recover_sink_id])
748 .await
749 .map_err(|e| ConnectorError::Internal(format!("epoch recovery: {e}")))?;
750 if let Some(row) = row {
751 let epoch: i64 = row.get(0);
752 #[allow(clippy::cast_sign_loss)]
753 {
754 self.last_committed_epoch = epoch as u64;
755 }
756 info!(epoch, "recovered last committed epoch");
757 }
758 }
759 }
760
761 self.pool = Some(pool);
762 self.state = ConnectorState::Running;
763
764 info!(
765 table = %self.config.qualified_table_name(),
766 pool_size = self.config.pool_size,
767 "PostgreSQL sink connector opened"
768 );
769
770 Ok(())
771 }
772
773 #[allow(clippy::cast_possible_truncation)]
774 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
775 if self.state != ConnectorState::Running {
776 return Err(ConnectorError::InvalidState {
777 expected: "Running".into(),
778 actual: self.state.to_string(),
779 });
780 }
781
782 if batch.num_rows() == 0 {
783 return Ok(WriteResult::new(0, 0));
784 }
785
786 self.buffer.push(batch.clone());
787 self.buffered_rows += batch.num_rows();
788
789 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
791 return Ok(WriteResult::new(0, 0));
792 }
793
794 let should_flush = self.buffered_rows >= self.config.batch_size
796 || self.last_flush.elapsed() >= self.config.flush_interval;
797
798 if should_flush {
799 let client = self
800 .pool()?
801 .get()
802 .await
803 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
804 let result = self.flush_to_client(&client).await?;
805 self.buffer.clear();
806 self.buffered_rows = 0;
807 self.last_flush = Instant::now();
808 return Ok(result);
809 }
810
811 Ok(WriteResult::new(0, 0))
812 }
813
814 fn schema(&self) -> SchemaRef {
815 self.schema.clone()
816 }
817
818 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
819 self.current_epoch = epoch;
820 debug!(epoch, "PostgreSQL sink epoch started");
821 Ok(())
822 }
823
824 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
825 if epoch != self.current_epoch {
826 return Err(ConnectorError::TransactionError(format!(
827 "epoch mismatch in pre_commit: expected {}, got {epoch}",
828 self.current_epoch
829 )));
830 }
831
832 if self.buffer.is_empty() {
833 return Ok(());
834 }
835
836 let client = self
837 .pool()?
838 .get()
839 .await
840 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
841
842 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
843 if self.is_epoch_committed(&client, epoch).await? {
845 info!(epoch, "epoch already committed in PostgreSQL, skipping");
846 self.buffer.clear();
847 self.buffered_rows = 0;
848 return Ok(());
849 }
850
851 client
853 .batch_execute("BEGIN")
854 .await
855 .map_err(|e| ConnectorError::WriteError(format!("BEGIN: {e}")))?;
856
857 match self.flush_to_client(&client).await {
858 Ok(_result) => {
859 let sink_id = self.config.effective_sink_id();
860 #[allow(clippy::cast_possible_wrap)]
861 let epoch_i64 = epoch as i64;
862 let params: Vec<&(dyn postgres_types::ToSql + Sync)> =
863 vec![&sink_id, &epoch_i64];
864 client
865 .execute(Self::build_epoch_commit_sql(), ¶ms)
866 .await
867 .map_err(|e| {
868 ConnectorError::WriteError(format!("epoch marker write: {e}"))
869 })?;
870
871 client
872 .batch_execute("COMMIT")
873 .await
874 .map_err(|e| ConnectorError::WriteError(format!("COMMIT: {e}")))?;
875 }
876 Err(e) => {
877 let _ = client.batch_execute("ROLLBACK").await;
878 return Err(e);
879 }
880 }
881 } else {
882 self.flush_to_client(&client).await?;
884 }
885
886 self.buffer.clear();
887 self.buffered_rows = 0;
888 self.last_flush = Instant::now();
889
890 debug!(epoch, "PostgreSQL sink pre-committed");
891 Ok(())
892 }
893
894 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
895 if epoch != self.current_epoch {
896 return Err(ConnectorError::TransactionError(format!(
897 "epoch mismatch: expected {}, got {epoch}",
898 self.current_epoch
899 )));
900 }
901
902 self.last_committed_epoch = epoch;
903 self.metrics.record_commit();
904
905 debug!(epoch, "PostgreSQL sink epoch committed");
906 Ok(())
907 }
908
909 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
910 self.buffer.clear();
911 self.buffered_rows = 0;
912
913 self.metrics.record_rollback();
914 warn!(epoch, "PostgreSQL sink epoch rolled back");
915 Ok(())
916 }
917
918 fn capabilities(&self) -> SinkConnectorCapabilities {
919 let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
921 let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
922
923 if self.config.write_mode == WriteMode::Upsert {
924 caps = caps.with_upsert();
925 }
926 if self.config.changelog_mode {
927 caps = caps.with_changelog();
928 }
929 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
930 caps = caps.with_exactly_once().with_two_phase_commit();
931 }
932
933 caps
934 }
935
936 async fn flush(&mut self) -> Result<(), ConnectorError> {
937 if self.buffer.is_empty() {
938 return Ok(());
939 }
940 let client = self
941 .pool()?
942 .get()
943 .await
944 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
945 self.flush_to_client(&client).await?;
946 self.buffer.clear();
947 self.buffered_rows = 0;
948 self.last_flush = Instant::now();
949 Ok(())
950 }
951
952 async fn close(&mut self) -> Result<(), ConnectorError> {
953 info!("closing PostgreSQL sink connector");
954
955 if !self.buffer.is_empty() {
956 if let Some(pool) = &self.pool {
957 if let Ok(client) = pool.get().await {
958 let _ = self.flush_to_client(&client).await;
959 }
960 }
961 }
962
963 self.buffer.clear();
964 self.buffered_rows = 0;
965 self.pool = None;
966 self.state = ConnectorState::Closed;
967
968 info!(
969 table = %self.config.qualified_table_name(),
970 records = self.metrics.records_written.get(),
971 epochs = self.metrics.epochs_committed.get(),
972 "PostgreSQL sink connector closed"
973 );
974
975 Ok(())
976 }
977}
978
979#[cfg(not(feature = "postgres-sink"))]
981#[async_trait]
982impl SinkConnector for PostgresSink {
983 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
984 Err(ConnectorError::ConfigurationError(
985 "PostgreSQL sink requires the 'postgres-sink' feature".into(),
986 ))
987 }
988
989 async fn write_batch(&mut self, _batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
990 Err(ConnectorError::ConfigurationError(
991 "PostgreSQL sink requires the 'postgres-sink' feature".into(),
992 ))
993 }
994
995 fn schema(&self) -> SchemaRef {
996 self.schema.clone()
997 }
998
999 fn capabilities(&self) -> SinkConnectorCapabilities {
1000 let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
1001 let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
1002 if self.config.write_mode == WriteMode::Upsert {
1003 caps = caps.with_upsert();
1004 }
1005 if self.config.changelog_mode {
1006 caps = caps.with_changelog();
1007 }
1008 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1009 caps = caps.with_exactly_once().with_two_phase_commit();
1010 }
1011 caps
1012 }
1013
1014 async fn close(&mut self) -> Result<(), ConnectorError> {
1015 self.state = ConnectorState::Closed;
1016 Ok(())
1017 }
1018}
1019
1020impl std::fmt::Debug for PostgresSink {
1021 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1022 f.debug_struct("PostgresSink")
1023 .field("state", &self.state)
1024 .field("table", &self.config.qualified_table_name())
1025 .field("mode", &self.config.write_mode)
1026 .field("guarantee", &self.config.delivery_guarantee)
1027 .field("current_epoch", &self.current_epoch)
1028 .field("last_committed_epoch", &self.last_committed_epoch)
1029 .field("buffered_rows", &self.buffered_rows)
1030 .finish_non_exhaustive()
1031 }
1032}
1033
1034fn user_columns(schema: &SchemaRef) -> Vec<String> {
1038 schema
1039 .fields()
1040 .iter()
1041 .filter(|f| !f.name().starts_with('_'))
1042 .map(|f| f.name().clone())
1043 .collect()
1044}
1045
1046fn user_fields(schema: &SchemaRef) -> Vec<&Arc<Field>> {
1048 schema
1049 .fields()
1050 .iter()
1051 .filter(|f| !f.name().starts_with('_'))
1052 .collect()
1053}
1054
1055fn build_user_schema(schema: &SchemaRef) -> SchemaRef {
1057 Arc::new(Schema::new(
1058 schema
1059 .fields()
1060 .iter()
1061 .filter(|f| !f.name().starts_with('_'))
1062 .cloned()
1063 .collect::<Vec<_>>(),
1064 ))
1065}
1066
1067fn filter_batch_by_indices(
1071 batch: &RecordBatch,
1072 indices: &[u32],
1073) -> Result<RecordBatch, ConnectorError> {
1074 if indices.is_empty() {
1075 let user_schema = Arc::new(Schema::new(
1076 batch
1077 .schema()
1078 .fields()
1079 .iter()
1080 .filter(|f| !f.name().starts_with('_'))
1081 .cloned()
1082 .collect::<Vec<_>>(),
1083 ));
1084 return Ok(RecordBatch::new_empty(user_schema));
1085 }
1086
1087 let indices_array = arrow_array::UInt32Array::from(indices.to_vec());
1088
1089 let user_schema = Arc::new(Schema::new(
1090 batch
1091 .schema()
1092 .fields()
1093 .iter()
1094 .filter(|f| !f.name().starts_with('_'))
1095 .cloned()
1096 .collect::<Vec<_>>(),
1097 ));
1098
1099 let filtered_columns: Vec<Arc<dyn arrow_array::Array>> = batch
1100 .schema()
1101 .fields()
1102 .iter()
1103 .enumerate()
1104 .filter(|(_, f)| !f.name().starts_with('_'))
1105 .map(|(i, _)| {
1106 arrow_select::take::take(batch.column(i), &indices_array, None)
1107 .map_err(|e| ConnectorError::Internal(format!("arrow take failed: {e}")))
1108 })
1109 .collect::<Result<Vec<_>, _>>()?;
1110
1111 RecordBatch::try_new(user_schema, filtered_columns)
1112 .map_err(|e| ConnectorError::Internal(format!("batch construction failed: {e}")))
1113}
1114
1115#[cfg(feature = "postgres-sink")]
1117fn strip_metadata_columns(batch: &RecordBatch) -> Result<RecordBatch, ConnectorError> {
1118 let schema = batch.schema();
1119 let user_indices: Vec<usize> = schema
1120 .fields()
1121 .iter()
1122 .enumerate()
1123 .filter(|(_, f)| !f.name().starts_with('_'))
1124 .map(|(i, _)| i)
1125 .collect();
1126
1127 if user_indices.len() == schema.fields().len() {
1128 return Ok(batch.clone());
1129 }
1130
1131 let user_schema = Arc::new(Schema::new(
1132 user_indices
1133 .iter()
1134 .map(|&i| schema.field(i).clone())
1135 .collect::<Vec<_>>(),
1136 ));
1137 let columns: Vec<Arc<dyn Array>> = user_indices
1138 .iter()
1139 .map(|&i| batch.column(i).clone())
1140 .collect();
1141
1142 RecordBatch::try_new(user_schema, columns)
1143 .map_err(|e| ConnectorError::Internal(format!("strip metadata: {e}")))
1144}
1145
1146#[cfg(feature = "postgres-sink")]
1148async fn execute_unnest(
1149 client: &tokio_postgres::Client,
1150 sql: &str,
1151 batch: &RecordBatch,
1152) -> Result<u64, ConnectorError> {
1153 let params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = (0..batch.num_columns())
1154 .map(|i| arrow_column_to_pg_array(batch.column(i)))
1155 .collect::<Result<_, _>>()?;
1156
1157 let param_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = params
1158 .iter()
1159 .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
1160 .collect();
1161
1162 client
1163 .execute(sql, ¶m_refs)
1164 .await
1165 .map_err(|e| ConnectorError::WriteError(format!("UNNEST execute: {e}")))
1166}
1167
1168#[cfg(feature = "postgres-sink")]
1170fn estimate_batch_bytes(batch: &RecordBatch) -> u64 {
1171 (0..batch.num_columns())
1172 .map(|i| batch.column(i).get_buffer_memory_size() as u64)
1173 .sum()
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use super::*;
1179 use arrow_array::{Int64Array, StringArray};
1180 use arrow_schema::{DataType, Field, Schema};
1181
1182 fn test_schema() -> SchemaRef {
1183 Arc::new(Schema::new(vec![
1184 Field::new("id", DataType::Int64, false),
1185 Field::new("name", DataType::Utf8, true),
1186 Field::new("value", DataType::Float64, true),
1187 ]))
1188 }
1189
1190 fn test_config() -> PostgresSinkConfig {
1191 PostgresSinkConfig::new("localhost", "mydb", "events")
1192 }
1193
1194 fn upsert_config() -> PostgresSinkConfig {
1195 let mut cfg = test_config();
1196 cfg.write_mode = WriteMode::Upsert;
1197 cfg.primary_key_columns = vec!["id".to_string()];
1198 cfg
1199 }
1200
1201 fn test_batch(n: usize) -> RecordBatch {
1202 let ids: Vec<i64> = (0..n as i64).collect();
1203 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1204 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1205
1206 RecordBatch::try_new(
1207 test_schema(),
1208 vec![
1209 Arc::new(Int64Array::from(ids)),
1210 Arc::new(StringArray::from(names)),
1211 Arc::new(arrow_array::Float64Array::from(values)),
1212 ],
1213 )
1214 .expect("test batch creation")
1215 }
1216
1217 #[test]
1220 fn test_new_defaults() {
1221 let sink = PostgresSink::new(test_schema(), test_config(), None);
1222 assert_eq!(sink.state(), ConnectorState::Created);
1223 assert_eq!(sink.current_epoch(), 0);
1224 assert_eq!(sink.last_committed_epoch(), 0);
1225 assert_eq!(sink.buffered_rows(), 0);
1226 assert!(sink.upsert_sql.is_none());
1227 assert!(sink.copy_sql.is_none());
1228 }
1229
1230 #[test]
1231 fn test_user_schema_strips_metadata() {
1232 let schema = Arc::new(Schema::new(vec![
1233 Field::new("id", DataType::Int64, false),
1234 Field::new("_op", DataType::Utf8, false),
1235 Field::new("value", DataType::Utf8, true),
1236 ]));
1237 let sink = PostgresSink::new(schema, test_config(), None);
1238 assert_eq!(sink.user_schema.fields().len(), 2);
1239 assert_eq!(sink.user_schema.field(0).name(), "id");
1240 assert_eq!(sink.user_schema.field(1).name(), "value");
1241 }
1242
1243 #[test]
1244 fn test_schema_returned() {
1245 let schema = test_schema();
1246 let sink = PostgresSink::new(schema.clone(), test_config(), None);
1247 assert_eq!(sink.schema(), schema);
1248 }
1249
1250 #[test]
1253 fn test_build_copy_sql() {
1254 let schema = test_schema();
1255 let config = test_config();
1256 let sql = PostgresSink::build_copy_sql(&schema, &config);
1257 assert_eq!(
1258 sql,
1259 "COPY public.events (id, name, value) FROM STDIN BINARY"
1260 );
1261 }
1262
1263 #[test]
1264 fn test_build_copy_sql_custom_schema() {
1265 let schema = test_schema();
1266 let mut config = test_config();
1267 config.schema_name = "analytics".to_string();
1268 let sql = PostgresSink::build_copy_sql(&schema, &config);
1269 assert!(sql.starts_with("COPY analytics.events"));
1270 }
1271
1272 #[test]
1273 fn test_build_copy_sql_excludes_metadata_columns() {
1274 let schema = Arc::new(Schema::new(vec![
1275 Field::new("id", DataType::Int64, false),
1276 Field::new("_op", DataType::Utf8, false),
1277 Field::new("_ts_ms", DataType::Int64, false),
1278 Field::new("value", DataType::Utf8, true),
1279 ]));
1280 let config = test_config();
1281 let sql = PostgresSink::build_copy_sql(&schema, &config);
1282 assert_eq!(sql, "COPY public.events (id, value) FROM STDIN BINARY");
1283 }
1284
1285 #[test]
1286 fn test_build_upsert_sql() {
1287 let schema = test_schema();
1288 let config = upsert_config();
1289 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1290
1291 assert!(sql.starts_with("INSERT INTO public.events"));
1292 assert!(sql.contains("SELECT * FROM UNNEST"));
1293 assert!(sql.contains("$1::int8[]"));
1294 assert!(sql.contains("$2::text[]"));
1295 assert!(sql.contains("$3::float8[]"));
1296 assert!(sql.contains("ON CONFLICT (id)"));
1297 assert!(sql.contains("DO UPDATE SET"));
1298 assert!(sql.contains("name = EXCLUDED.name"));
1299 assert!(sql.contains("value = EXCLUDED.value"));
1300 assert!(!sql.contains("id = EXCLUDED.id"));
1301 }
1302
1303 #[test]
1304 fn test_build_upsert_sql_composite_key() {
1305 let schema = test_schema();
1306 let mut config = upsert_config();
1307 config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1308 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1309
1310 assert!(sql.contains("ON CONFLICT (id, name)"));
1311 assert!(sql.contains("value = EXCLUDED.value"));
1312 assert!(!sql.contains("id = EXCLUDED.id"));
1313 assert!(!sql.contains("name = EXCLUDED.name"));
1314 }
1315
1316 #[test]
1317 fn test_build_upsert_sql_key_only_table() {
1318 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1319 let mut config = test_config();
1320 config.write_mode = WriteMode::Upsert;
1321 config.primary_key_columns = vec!["id".to_string()];
1322
1323 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1324 assert!(sql.contains("DO NOTHING"), "sql: {sql}");
1325 }
1326
1327 #[test]
1328 fn test_build_delete_sql() {
1329 let schema = test_schema();
1330 let config = upsert_config();
1331 let sql = PostgresSink::build_delete_sql(&schema, &config);
1332
1333 assert_eq!(sql, "DELETE FROM public.events WHERE id = ANY($1::int8[])");
1334 }
1335
1336 #[test]
1337 fn test_build_delete_sql_composite_key() {
1338 let schema = test_schema();
1339 let mut config = upsert_config();
1340 config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1341 let sql = PostgresSink::build_delete_sql(&schema, &config);
1342
1343 assert!(sql.contains("id = ANY($1::int8[])"));
1344 assert!(sql.contains("name = ANY($2::text[])"));
1345 assert!(sql.contains(" AND "));
1346 }
1347
1348 #[test]
1349 fn test_build_create_table_sql() {
1350 let schema = test_schema();
1351 let config = upsert_config();
1352 let sql = PostgresSink::build_create_table_sql(&schema, &config);
1353
1354 assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS public.events"));
1355 assert!(sql.contains("id BIGINT NOT NULL"));
1356 assert!(sql.contains("name TEXT"));
1357 assert!(sql.contains("value DOUBLE PRECISION"));
1358 assert!(sql.contains("PRIMARY KEY (id)"));
1359 }
1360
1361 #[test]
1362 fn test_build_create_table_sql_no_pk() {
1363 let schema = test_schema();
1364 let config = test_config();
1365 let sql = PostgresSink::build_create_table_sql(&schema, &config);
1366
1367 assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS"));
1368 assert!(!sql.contains("PRIMARY KEY"));
1369 }
1370
1371 #[test]
1372 fn test_offset_table_sql() {
1373 let sql = PostgresSink::build_offset_table_sql();
1374 assert!(sql.contains("_laminardb_sink_offsets"));
1375 assert!(sql.contains("sink_id TEXT PRIMARY KEY"));
1376 assert!(sql.contains("epoch BIGINT NOT NULL"));
1377 }
1378
1379 #[test]
1380 fn test_epoch_commit_sql() {
1381 let sql = PostgresSink::build_epoch_commit_sql();
1382 assert!(sql.contains("INSERT INTO _laminardb_sink_offsets"));
1383 assert!(sql.contains("ON CONFLICT (sink_id) DO UPDATE"));
1384 }
1385
1386 #[test]
1387 fn test_epoch_recover_sql() {
1388 let sql = PostgresSink::build_epoch_recover_sql();
1389 assert!(sql.contains("SELECT epoch FROM _laminardb_sink_offsets"));
1390 }
1391
1392 fn changelog_schema() -> SchemaRef {
1395 Arc::new(Schema::new(vec![
1396 Field::new("id", DataType::Int64, false),
1397 Field::new("name", DataType::Utf8, true),
1398 Field::new("_op", DataType::Utf8, false),
1399 Field::new("_ts_ms", DataType::Int64, false),
1400 ]))
1401 }
1402
1403 fn changelog_batch() -> RecordBatch {
1404 RecordBatch::try_new(
1405 changelog_schema(),
1406 vec![
1407 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1408 Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1409 Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1410 Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1411 ],
1412 )
1413 .expect("changelog batch creation")
1414 }
1415
1416 #[test]
1417 fn test_split_changelog_batch() {
1418 let batch = changelog_batch();
1419 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1420
1421 assert_eq!(inserts.num_rows(), 3);
1422 assert_eq!(deletes.num_rows(), 2);
1423 assert_eq!(inserts.num_columns(), 2);
1424 assert_eq!(deletes.num_columns(), 2);
1425
1426 let insert_ids = inserts
1427 .column(0)
1428 .as_any()
1429 .downcast_ref::<Int64Array>()
1430 .expect("i64 array");
1431 assert_eq!(insert_ids.value(0), 1);
1432 assert_eq!(insert_ids.value(1), 2);
1433 assert_eq!(insert_ids.value(2), 4);
1434
1435 let delete_ids = deletes
1436 .column(0)
1437 .as_any()
1438 .downcast_ref::<Int64Array>()
1439 .expect("i64 array");
1440 assert_eq!(delete_ids.value(0), 3);
1441 assert_eq!(delete_ids.value(1), 5);
1442 }
1443
1444 #[test]
1445 fn test_split_changelog_all_inserts() {
1446 let schema = changelog_schema();
1447 let batch = RecordBatch::try_new(
1448 schema,
1449 vec![
1450 Arc::new(Int64Array::from(vec![1, 2])),
1451 Arc::new(StringArray::from(vec!["a", "b"])),
1452 Arc::new(StringArray::from(vec!["I", "I"])),
1453 Arc::new(Int64Array::from(vec![100, 200])),
1454 ],
1455 )
1456 .expect("batch");
1457
1458 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1459 assert_eq!(inserts.num_rows(), 2);
1460 assert_eq!(deletes.num_rows(), 0);
1461 }
1462
1463 #[test]
1464 fn test_split_changelog_all_deletes() {
1465 let schema = changelog_schema();
1466 let batch = RecordBatch::try_new(
1467 schema,
1468 vec![
1469 Arc::new(Int64Array::from(vec![1, 2])),
1470 Arc::new(StringArray::from(vec!["a", "b"])),
1471 Arc::new(StringArray::from(vec!["D", "D"])),
1472 Arc::new(Int64Array::from(vec![100, 200])),
1473 ],
1474 )
1475 .expect("batch");
1476
1477 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1478 assert_eq!(inserts.num_rows(), 0);
1479 assert_eq!(deletes.num_rows(), 2);
1480 }
1481
1482 #[test]
1483 fn test_split_changelog_missing_op_column() {
1484 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1485 let batch =
1486 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).expect("batch");
1487
1488 let result = PostgresSink::split_changelog_batch(&batch);
1489 assert!(result.is_err());
1490 }
1491
1492 #[test]
1493 fn test_split_changelog_snapshot_read() {
1494 let schema = changelog_schema();
1495 let batch = RecordBatch::try_new(
1496 schema,
1497 vec![
1498 Arc::new(Int64Array::from(vec![1])),
1499 Arc::new(StringArray::from(vec!["a"])),
1500 Arc::new(StringArray::from(vec!["r"])),
1501 Arc::new(Int64Array::from(vec![100])),
1502 ],
1503 )
1504 .expect("batch");
1505
1506 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1507 assert_eq!(inserts.num_rows(), 1);
1508 assert_eq!(deletes.num_rows(), 0);
1509 }
1510
1511 #[tokio::test]
1514 async fn test_write_batch_buffering() {
1515 let mut config = test_config();
1516 config.batch_size = 100;
1517 let mut sink = PostgresSink::new(test_schema(), config, None);
1518 sink.state = ConnectorState::Running;
1519
1520 let batch = test_batch(10);
1521 let result = sink.write_batch(&batch).await.expect("write");
1522
1523 assert_eq!(result.records_written, 0);
1524 assert_eq!(sink.buffered_rows(), 10);
1525 }
1526
1527 #[tokio::test]
1528 async fn test_write_batch_empty() {
1529 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1530 sink.state = ConnectorState::Running;
1531
1532 let batch = test_batch(0);
1533 let result = sink.write_batch(&batch).await.expect("write");
1534 assert_eq!(result.records_written, 0);
1535 assert_eq!(sink.buffered_rows(), 0);
1536 }
1537
1538 #[tokio::test]
1539 async fn test_write_batch_not_running() {
1540 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1541
1542 let batch = test_batch(10);
1543 let result = sink.write_batch(&batch).await;
1544 assert!(result.is_err());
1545 }
1546
1547 #[tokio::test]
1548 async fn test_exactly_once_defers_flush() {
1549 let mut config = test_config();
1550 config.batch_size = 5; config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1552 let mut sink = PostgresSink::new(test_schema(), config, None);
1553 sink.state = ConnectorState::Running;
1554
1555 let batch = test_batch(10);
1556 let result = sink.write_batch(&batch).await.expect("write");
1557
1558 assert_eq!(result.records_written, 0);
1560 assert_eq!(sink.buffered_rows(), 10);
1561 }
1562
1563 #[test]
1566 fn test_capabilities_append_at_least_once() {
1567 let sink = PostgresSink::new(test_schema(), test_config(), None);
1568 let caps = sink.capabilities();
1569 assert!(caps.idempotent);
1570 assert!(!caps.upsert);
1571 assert!(!caps.changelog);
1572 assert!(!caps.exactly_once);
1573 }
1574
1575 #[test]
1576 fn test_capabilities_upsert() {
1577 let sink = PostgresSink::new(test_schema(), upsert_config(), None);
1578 let caps = sink.capabilities();
1579 assert!(caps.upsert);
1580 assert!(caps.idempotent);
1581 }
1582
1583 #[test]
1584 fn test_capabilities_changelog() {
1585 let mut config = test_config();
1586 config.changelog_mode = true;
1587 let sink = PostgresSink::new(test_schema(), config, None);
1588 let caps = sink.capabilities();
1589 assert!(caps.changelog);
1590 }
1591
1592 #[test]
1593 fn test_capabilities_exactly_once() {
1594 let mut config = upsert_config();
1595 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1596 let sink = PostgresSink::new(test_schema(), config, None);
1597 let caps = sink.capabilities();
1598 assert!(caps.exactly_once);
1599 }
1600
1601 #[tokio::test]
1604 async fn test_epoch_lifecycle_state() {
1605 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1606 sink.state = ConnectorState::Running;
1607
1608 sink.begin_epoch(1).await.expect("begin");
1609 assert_eq!(sink.current_epoch(), 1);
1610
1611 sink.commit_epoch(1).await.expect("commit");
1613 assert_eq!(sink.last_committed_epoch(), 1);
1614
1615 assert_eq!(sink.metrics.epochs_committed.get(), 1);
1616 }
1617
1618 #[tokio::test]
1619 async fn test_epoch_mismatch_rejected() {
1620 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1621 sink.state = ConnectorState::Running;
1622
1623 sink.begin_epoch(1).await.expect("begin");
1624 let result = sink.commit_epoch(2).await;
1625 assert!(result.is_err());
1626 }
1627
1628 #[tokio::test]
1629 async fn test_rollback_clears_buffer() {
1630 let mut config = test_config();
1631 config.batch_size = 1000;
1632 let mut sink = PostgresSink::new(test_schema(), config, None);
1633 sink.state = ConnectorState::Running;
1634
1635 let batch = test_batch(50);
1636 sink.write_batch(&batch).await.expect("write");
1637 assert_eq!(sink.buffered_rows(), 50);
1638
1639 sink.rollback_epoch(0).await.expect("rollback");
1640 assert_eq!(sink.buffered_rows(), 0);
1641 }
1642
1643 #[test]
1646 fn test_debug_output() {
1647 let sink = PostgresSink::new(test_schema(), test_config(), None);
1648 let debug = format!("{sink:?}");
1649 assert!(debug.contains("PostgresSink"));
1650 assert!(debug.contains("public.events"));
1651 }
1652
1653 #[test]
1656 fn test_build_user_schema() {
1657 let schema = Arc::new(Schema::new(vec![
1658 Field::new("id", DataType::Int64, false),
1659 Field::new("_op", DataType::Utf8, false),
1660 Field::new("value", DataType::Float64, true),
1661 Field::new("_ts_ms", DataType::Int64, false),
1662 ]));
1663 let user = build_user_schema(&schema);
1664 assert_eq!(user.fields().len(), 2);
1665 assert_eq!(user.field(0).name(), "id");
1666 assert_eq!(user.field(1).name(), "value");
1667 }
1668
1669 #[test]
1670 fn test_build_user_schema_no_metadata() {
1671 let schema = test_schema();
1672 let user = build_user_schema(&schema);
1673 assert_eq!(user.fields().len(), 3);
1674 }
1675}