1use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use arrow_array::{Array, RecordBatch};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use async_trait::async_trait;
24use tracing::{debug, info, warn};
25
26use crate::config::{ConnectorConfig, ConnectorState};
27use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
28use crate::error::ConnectorError;
29use crate::health::HealthStatus;
30use crate::metrics::ConnectorMetrics;
31
32use super::sink_config::{PostgresSinkConfig, WriteMode};
33use super::sink_metrics::PostgresSinkMetrics;
34use super::types::{arrow_to_pg_ddl_type, arrow_type_to_pg_array_cast, arrow_type_to_pg_sql};
35use crate::connector::DeliveryGuarantee;
36
37#[cfg(feature = "postgres-sink")]
38use super::types::arrow_column_to_pg_array;
39#[cfg(feature = "postgres-sink")]
40use bytes::BytesMut;
41#[cfg(feature = "postgres-sink")]
42use deadpool_postgres::Pool;
43
44pub struct PostgresSink {
50 config: PostgresSinkConfig,
52 schema: SchemaRef,
54 user_schema: SchemaRef,
56 state: ConnectorState,
58 current_epoch: u64,
60 last_committed_epoch: u64,
62 buffer: Vec<RecordBatch>,
64 buffered_rows: usize,
66 last_flush: Instant,
68 metrics: PostgresSinkMetrics,
70 upsert_sql: Option<String>,
72 copy_sql: Option<String>,
74 create_table_sql: Option<String>,
76 delete_sql: Option<String>,
78 #[cfg(feature = "postgres-sink")]
80 pool: Option<Pool>,
81 #[cfg(feature = "postgres-sink")]
83 encode_buf: BytesMut,
84}
85
86impl PostgresSink {
87 #[must_use]
89 pub fn new(
90 schema: SchemaRef,
91 config: PostgresSinkConfig,
92 registry: Option<&prometheus::Registry>,
93 ) -> Self {
94 let user_schema = build_user_schema(&schema);
95 let buf_capacity = (config.batch_size / 1024).max(4);
97 Self {
98 config,
99 schema,
100 user_schema,
101 state: ConnectorState::Created,
102 current_epoch: 0,
103 last_committed_epoch: 0,
104 buffer: Vec::with_capacity(buf_capacity),
105 buffered_rows: 0,
106 last_flush: Instant::now(),
107 metrics: PostgresSinkMetrics::new(registry),
108 upsert_sql: None,
109 copy_sql: None,
110 create_table_sql: None,
111 delete_sql: None,
112 #[cfg(feature = "postgres-sink")]
113 pool: None,
114 #[cfg(feature = "postgres-sink")]
115 encode_buf: BytesMut::with_capacity(64 * 1024),
116 }
117 }
118
119 #[must_use]
121 pub fn state(&self) -> ConnectorState {
122 self.state
123 }
124
125 #[must_use]
127 pub fn current_epoch(&self) -> u64 {
128 self.current_epoch
129 }
130
131 #[must_use]
133 pub fn last_committed_epoch(&self) -> u64 {
134 self.last_committed_epoch
135 }
136
137 #[must_use]
139 pub fn buffered_rows(&self) -> usize {
140 self.buffered_rows
141 }
142
143 #[must_use]
145 pub fn sink_metrics(&self) -> &PostgresSinkMetrics {
146 &self.metrics
147 }
148
149 #[must_use]
157 pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
158 let columns = user_columns(schema);
159 let col_list = columns.join(", ");
160 format!(
161 "COPY {} ({}) FROM STDIN BINARY",
162 config.qualified_table_name(),
163 col_list,
164 )
165 }
166
167 #[must_use]
177 pub fn build_upsert_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
178 let fields = user_fields(schema);
179
180 let columns: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
181
182 let unnest_params: Vec<String> = fields
183 .iter()
184 .enumerate()
185 .map(|(i, f)| arrow_type_to_pg_array_cast(f.data_type(), i + 1))
186 .collect();
187
188 let non_key_columns: Vec<&str> = columns
189 .iter()
190 .copied()
191 .filter(|c| {
192 !config
193 .primary_key_columns
194 .iter()
195 .any(|pk| pk.as_str() == *c)
196 })
197 .collect();
198
199 let update_clause: Vec<String> = non_key_columns
200 .iter()
201 .map(|c| format!("{c} = EXCLUDED.{c}"))
202 .collect();
203
204 let pk_list = config.primary_key_columns.join(", ");
205
206 if update_clause.is_empty() {
207 format!(
209 "INSERT INTO {} ({}) \
210 SELECT * FROM UNNEST({}) \
211 ON CONFLICT ({}) DO NOTHING",
212 config.qualified_table_name(),
213 columns.join(", "),
214 unnest_params.join(", "),
215 pk_list,
216 )
217 } else {
218 format!(
219 "INSERT INTO {} ({}) \
220 SELECT * FROM UNNEST({}) \
221 ON CONFLICT ({}) DO UPDATE SET {}",
222 config.qualified_table_name(),
223 columns.join(", "),
224 unnest_params.join(", "),
225 pk_list,
226 update_clause.join(", "),
227 )
228 }
229 }
230
231 #[must_use]
237 pub fn build_delete_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
238 let pk_conditions: Vec<String> = config
239 .primary_key_columns
240 .iter()
241 .enumerate()
242 .map(|(i, col)| {
243 let dt = schema
244 .field_with_name(col)
245 .map_or(DataType::Utf8, |f| f.data_type().clone());
246 let pg_type = arrow_type_to_pg_sql(&dt);
247 format!("{col} = ANY(${}::{}[])", i + 1, pg_type)
248 })
249 .collect();
250
251 format!(
252 "DELETE FROM {} WHERE {}",
253 config.qualified_table_name(),
254 pk_conditions.join(" AND "),
255 )
256 }
257
258 #[must_use]
269 pub fn build_create_table_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
270 let fields = user_fields(schema);
271
272 let column_defs: Vec<String> = fields
273 .iter()
274 .map(|f| {
275 let pg_type = arrow_to_pg_ddl_type(f.data_type());
276 let nullable = if f.is_nullable() { "" } else { " NOT NULL" };
277 format!(" {} {}{}", f.name(), pg_type, nullable)
278 })
279 .collect();
280
281 let mut ddl = format!(
282 "CREATE TABLE IF NOT EXISTS {} (\n{}\n",
283 config.qualified_table_name(),
284 column_defs.join(",\n"),
285 );
286
287 if !config.primary_key_columns.is_empty() {
288 use std::fmt::Write;
289 let _ = write!(
290 ddl,
291 ",\n PRIMARY KEY ({})\n",
292 config.primary_key_columns.join(", ")
293 );
294 }
295
296 ddl.push(')');
297 ddl
298 }
299
300 #[must_use]
302 pub fn build_offset_table_sql() -> &'static str {
303 "CREATE TABLE IF NOT EXISTS _laminardb_sink_offsets (\
304 \n sink_id TEXT PRIMARY KEY,\
305 \n epoch BIGINT NOT NULL,\
306 \n source_offsets JSONB,\
307 \n watermark BIGINT,\
308 \n updated_at TIMESTAMPTZ DEFAULT NOW()\
309 \n)"
310 }
311
312 #[must_use]
314 pub fn build_epoch_commit_sql() -> &'static str {
315 "INSERT INTO _laminardb_sink_offsets (sink_id, epoch, updated_at) \
316 VALUES ($1, $2, NOW()) \
317 ON CONFLICT (sink_id) DO UPDATE SET epoch = $2, updated_at = NOW()"
318 }
319
320 #[must_use]
322 pub fn build_epoch_recover_sql() -> &'static str {
323 "SELECT epoch FROM _laminardb_sink_offsets WHERE sink_id = $1"
324 }
325
326 pub fn split_changelog_batch(
341 batch: &RecordBatch,
342 ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
343 let op_idx = batch.schema().index_of("_op").map_err(|_| {
344 ConnectorError::ConfigurationError(
345 "changelog mode requires '_op' column in input schema".into(),
346 )
347 })?;
348
349 let op_array = batch
350 .column(op_idx)
351 .as_any()
352 .downcast_ref::<arrow_array::StringArray>()
353 .ok_or_else(|| {
354 ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
355 })?;
356
357 let mut insert_indices = Vec::new();
358 let mut delete_indices = Vec::new();
359
360 for i in 0..op_array.len() {
361 if op_array.is_null(i) {
362 continue;
363 }
364 match op_array.value(i) {
365 "I" | "U" | "r" => {
366 insert_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
367 }
368 "D" => {
369 delete_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
370 }
371 _ => {} }
373 }
374
375 let insert_batch = filter_batch_by_indices(batch, &insert_indices)?;
376 let delete_batch = filter_batch_by_indices(batch, &delete_indices)?;
377
378 Ok((insert_batch, delete_batch))
379 }
380
381 fn prepare_statements(&mut self) {
385 self.copy_sql = Some(Self::build_copy_sql(&self.schema, &self.config));
386
387 if self.config.write_mode == WriteMode::Upsert {
388 self.upsert_sql = Some(Self::build_upsert_sql(&self.schema, &self.config));
389 }
390
391 if self.config.auto_create_table {
392 self.create_table_sql = Some(Self::build_create_table_sql(&self.schema, &self.config));
393 }
394
395 if self.config.changelog_mode {
396 self.delete_sql = Some(Self::build_delete_sql(&self.schema, &self.config));
397 }
398 }
399
400 #[cfg(feature = "postgres-sink")]
402 fn pool(&self) -> Result<&Pool, ConnectorError> {
403 self.pool.as_ref().ok_or(ConnectorError::InvalidState {
404 expected: "pool initialized (call open() first)".into(),
405 actual: "pool not initialized".into(),
406 })
407 }
408
409 #[cfg(feature = "postgres-sink")]
411 fn concat_buffer(&self) -> Result<RecordBatch, ConnectorError> {
412 if self.buffer.is_empty() {
413 return Ok(RecordBatch::new_empty(self.user_schema.clone()));
414 }
415
416 let stripped: Result<Vec<RecordBatch>, ConnectorError> =
417 self.buffer.iter().map(strip_metadata_columns).collect();
418 let stripped = stripped?;
419
420 arrow_select::concat::concat_batches(&self.user_schema, &stripped)
421 .map_err(|e| ConnectorError::Internal(format!("batch concat failed: {e}")))
422 }
423
424 #[cfg(feature = "postgres-sink")]
426 async fn flush_append(
427 &mut self,
428 client: &tokio_postgres::Client,
429 ) -> Result<WriteResult, ConnectorError> {
430 let user_batch = self.concat_buffer()?;
431 if user_batch.num_rows() == 0 {
432 return Ok(WriteResult::new(0, 0));
433 }
434
435 let mut encoder = pgpq::ArrowToPostgresBinaryEncoder::try_new(self.user_schema.as_ref())
436 .map_err(|e| ConnectorError::Internal(format!("pgpq encoder init: {e}")))?;
437
438 self.encode_buf.clear();
439 encoder.write_header(&mut self.encode_buf);
440 encoder
441 .write_batch(&user_batch, &mut self.encode_buf)
442 .map_err(|e| ConnectorError::Internal(format!("pgpq encode: {e}")))?;
443 encoder
444 .write_footer(&mut self.encode_buf)
445 .map_err(|e| ConnectorError::Internal(format!("pgpq footer: {e}")))?;
446
447 let encoded_bytes = self.encode_buf.len();
448 let bytes_to_send = self.encode_buf.split().freeze();
449
450 let copy_sql = self
451 .copy_sql
452 .as_deref()
453 .ok_or_else(|| ConnectorError::Internal("COPY SQL not prepared".into()))?;
454
455 let sink = client
456 .copy_in(copy_sql)
457 .await
458 .map_err(|e| ConnectorError::WriteError(format!("COPY start: {e}")))?;
459
460 {
461 use futures_util::SinkExt;
462 futures_util::pin_mut!(sink);
463 sink.send(bytes_to_send)
464 .await
465 .map_err(|e| ConnectorError::WriteError(format!("COPY send: {e}")))?;
466 sink.close()
467 .await
468 .map_err(|e| ConnectorError::WriteError(format!("COPY finish: {e}")))?;
469 }
470
471 let rows = user_batch.num_rows();
472 self.metrics.record_write(rows as u64, encoded_bytes as u64);
473 self.metrics.record_flush();
474 self.metrics.record_copy();
475
476 Ok(WriteResult::new(rows, encoded_bytes as u64))
477 }
478
479 #[cfg(feature = "postgres-sink")]
481 #[allow(clippy::cast_possible_truncation)]
482 async fn flush_upsert(
483 &mut self,
484 client: &tokio_postgres::Client,
485 ) -> Result<WriteResult, ConnectorError> {
486 if self.config.changelog_mode {
487 return self.flush_changelog(client).await;
488 }
489
490 let user_batch = self.concat_buffer()?;
491 if user_batch.num_rows() == 0 {
492 return Ok(WriteResult::new(0, 0));
493 }
494
495 let upsert_sql = self
496 .upsert_sql
497 .as_deref()
498 .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
499
500 let rows = execute_unnest(client, upsert_sql, &user_batch).await?;
501
502 let byte_estimate = estimate_batch_bytes(&user_batch);
503 self.metrics.record_write(rows, byte_estimate);
504 self.metrics.record_flush();
505 self.metrics.record_upsert();
506
507 Ok(WriteResult::new(rows as usize, byte_estimate))
508 }
509
510 #[cfg(feature = "postgres-sink")]
512 #[allow(clippy::cast_possible_truncation)]
513 async fn flush_changelog(
514 &mut self,
515 client: &tokio_postgres::Client,
516 ) -> Result<WriteResult, ConnectorError> {
517 let mut total_rows: u64 = 0;
518 let mut total_bytes: u64 = 0;
519
520 let mut all_inserts = Vec::new();
522 let mut all_deletes = Vec::new();
523 for batch in &self.buffer {
524 let (ins, del) = Self::split_changelog_batch(batch)?;
525 if ins.num_rows() > 0 {
526 all_inserts.push(ins);
527 }
528 if del.num_rows() > 0 {
529 all_deletes.push(del);
530 }
531 }
532
533 if !all_inserts.is_empty() {
539 let insert_batch =
540 arrow_select::concat::concat_batches(&self.user_schema, &all_inserts)
541 .map_err(|e| ConnectorError::Internal(format!("concat inserts: {e}")))?;
542
543 let upsert_sql = self
544 .upsert_sql
545 .as_deref()
546 .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
547
548 let rows = execute_unnest(client, upsert_sql, &insert_batch).await?;
549 let bytes = estimate_batch_bytes(&insert_batch);
550 self.metrics.record_write(rows, bytes);
551 self.metrics.record_upsert();
552 total_rows += rows;
553 total_bytes += bytes;
554 }
555
556 if !all_deletes.is_empty() {
557 let delete_batch =
558 arrow_select::concat::concat_batches(&self.user_schema, &all_deletes)
559 .map_err(|e| ConnectorError::Internal(format!("concat deletes: {e}")))?;
560 let deleted = self.execute_deletes(client, &delete_batch).await?;
561 self.metrics.record_deletes(deleted as u64);
562 total_rows += deleted as u64;
563 }
564
565 self.metrics.record_flush();
566 Ok(WriteResult::new(total_rows as usize, total_bytes))
567 }
568
569 #[cfg(feature = "postgres-sink")]
571 #[allow(clippy::cast_possible_truncation)]
572 async fn execute_deletes(
573 &self,
574 client: &tokio_postgres::Client,
575 delete_batch: &RecordBatch,
576 ) -> Result<usize, ConnectorError> {
577 if delete_batch.num_rows() == 0 {
578 return Ok(0);
579 }
580
581 let delete_sql = self
582 .delete_sql
583 .as_deref()
584 .ok_or_else(|| ConnectorError::Internal("DELETE SQL not prepared".into()))?;
585
586 let pk_params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = self
587 .config
588 .primary_key_columns
589 .iter()
590 .map(|col| {
591 let idx = delete_batch.schema().index_of(col).map_err(|_| {
592 ConnectorError::ConfigurationError(format!(
593 "primary key column '{col}' not in delete batch"
594 ))
595 })?;
596 arrow_column_to_pg_array(delete_batch.column(idx))
597 })
598 .collect::<Result<_, _>>()?;
599
600 let pk_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = pk_params
601 .iter()
602 .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
603 .collect();
604
605 let rows = client
606 .execute(delete_sql, &pk_refs)
607 .await
608 .map_err(|e| ConnectorError::WriteError(format!("DELETE: {e}")))?;
609
610 Ok(rows as usize)
611 }
612
613 #[cfg(feature = "postgres-sink")]
615 async fn flush_to_client(
616 &mut self,
617 client: &tokio_postgres::Client,
618 ) -> Result<WriteResult, ConnectorError> {
619 client
620 .execute(
621 &format!(
622 "SET statement_timeout = '{}'",
623 self.config.statement_timeout.as_millis()
624 ),
625 &[],
626 )
627 .await
628 .ok(); match self.config.write_mode {
630 WriteMode::Append => self.flush_append(client).await,
631 WriteMode::Upsert => self.flush_upsert(client).await,
632 }
633 }
634
635 #[cfg(feature = "postgres-sink")]
637 #[allow(clippy::cast_sign_loss)]
638 async fn is_epoch_committed(
639 &self,
640 client: &tokio_postgres::Client,
641 epoch: u64,
642 ) -> Result<bool, ConnectorError> {
643 let sink_id = self.config.effective_sink_id();
644 let row = client
645 .query_opt(Self::build_epoch_recover_sql(), &[&sink_id])
646 .await
647 .map_err(|e| ConnectorError::Internal(format!("epoch recovery query: {e}")))?;
648
649 if let Some(row) = row {
650 let committed: i64 = row.get(0);
651 Ok(committed as u64 >= epoch)
652 } else {
653 Ok(false)
654 }
655 }
656}
657
658#[cfg(feature = "postgres-sink")]
662#[async_trait]
663impl SinkConnector for PostgresSink {
664 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
665 self.state = ConnectorState::Initializing;
666
667 if !config.properties().is_empty() {
668 self.config = PostgresSinkConfig::from_config(config)?;
669 }
670
671 if self.config.changelog_mode && self.config.write_mode != WriteMode::Upsert {
673 return Err(ConnectorError::ConfigurationError(
674 "changelog mode requires write.mode = 'upsert'".into(),
675 ));
676 }
677
678 info!(
679 table = %self.config.qualified_table_name(),
680 mode = %self.config.write_mode,
681 guarantee = %self.config.delivery_guarantee,
682 "opening PostgreSQL sink connector"
683 );
684
685 self.prepare_statements();
686
687 if self.config.sink_id.is_empty() {
688 self.config.sink_id = self.config.effective_sink_id();
689 }
690
691 let mut pool_cfg = deadpool_postgres::Config::new();
693 pool_cfg.host = Some(self.config.hostname.clone());
694 pool_cfg.port = Some(self.config.port);
695 pool_cfg.dbname = Some(self.config.database.clone());
696 pool_cfg.user = Some(self.config.username.clone());
697 pool_cfg.password = Some(self.config.password.clone());
698 let mut deadpool_cfg = deadpool_postgres::PoolConfig::new(self.config.pool_size);
699 deadpool_cfg.timeouts.wait = Some(self.config.connect_timeout);
700 deadpool_cfg.timeouts.create = Some(self.config.connect_timeout);
701 pool_cfg.pool = Some(deadpool_cfg);
702
703 let pool = pool_cfg
704 .create_pool(
705 Some(deadpool_postgres::Runtime::Tokio1),
706 tokio_postgres::NoTls,
707 )
708 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool creation failed: {e}")))?;
709
710 let client = pool.get().await.map_err(|e| {
712 ConnectorError::ConnectionFailed(format!("initial connection failed: {e}"))
713 })?;
714
715 client
716 .execute(
717 &format!(
718 "SET statement_timeout = '{}'",
719 self.config.statement_timeout.as_millis()
720 ),
721 &[],
722 )
723 .await
724 .ok(); if self.config.auto_create_table {
728 if let Some(ddl) = &self.create_table_sql {
729 client.batch_execute(ddl.as_str()).await.map_err(|e| {
730 ConnectorError::Internal(format!("auto-create table failed: {e}"))
731 })?;
732 debug!(table = %self.config.qualified_table_name(), "target table ensured");
733 }
734 }
735
736 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
738 client
739 .batch_execute(Self::build_offset_table_sql())
740 .await
741 .map_err(|e| {
742 ConnectorError::Internal(format!("create offset table failed: {e}"))
743 })?;
744
745 {
747 let recover_sink_id = self.config.effective_sink_id();
748 let row = client
749 .query_opt(Self::build_epoch_recover_sql(), &[&recover_sink_id])
750 .await
751 .map_err(|e| ConnectorError::Internal(format!("epoch recovery: {e}")))?;
752 if let Some(row) = row {
753 let epoch: i64 = row.get(0);
754 #[allow(clippy::cast_sign_loss)]
755 {
756 self.last_committed_epoch = epoch as u64;
757 }
758 info!(epoch, "recovered last committed epoch");
759 }
760 }
761 }
762
763 self.pool = Some(pool);
764 self.state = ConnectorState::Running;
765
766 info!(
767 table = %self.config.qualified_table_name(),
768 pool_size = self.config.pool_size,
769 "PostgreSQL sink connector opened"
770 );
771
772 Ok(())
773 }
774
775 #[allow(clippy::cast_possible_truncation)]
776 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
777 if self.state != ConnectorState::Running {
778 return Err(ConnectorError::InvalidState {
779 expected: "Running".into(),
780 actual: self.state.to_string(),
781 });
782 }
783
784 if batch.num_rows() == 0 {
785 return Ok(WriteResult::new(0, 0));
786 }
787
788 self.buffer.push(batch.clone());
789 self.buffered_rows += batch.num_rows();
790
791 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
793 return Ok(WriteResult::new(0, 0));
794 }
795
796 let should_flush = self.buffered_rows >= self.config.batch_size
798 || self.last_flush.elapsed() >= self.config.flush_interval;
799
800 if should_flush {
801 let client = self
802 .pool()?
803 .get()
804 .await
805 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
806 let result = self.flush_to_client(&client).await?;
807 self.buffer.clear();
808 self.buffered_rows = 0;
809 self.last_flush = Instant::now();
810 return Ok(result);
811 }
812
813 Ok(WriteResult::new(0, 0))
814 }
815
816 fn schema(&self) -> SchemaRef {
817 self.schema.clone()
818 }
819
820 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
821 self.current_epoch = epoch;
822 debug!(epoch, "PostgreSQL sink epoch started");
823 Ok(())
824 }
825
826 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
827 if epoch != self.current_epoch {
828 return Err(ConnectorError::TransactionError(format!(
829 "epoch mismatch in pre_commit: expected {}, got {epoch}",
830 self.current_epoch
831 )));
832 }
833
834 if self.buffer.is_empty() {
835 return Ok(());
836 }
837
838 let client = self
839 .pool()?
840 .get()
841 .await
842 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
843
844 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
845 if self.is_epoch_committed(&client, epoch).await? {
847 info!(epoch, "epoch already committed in PostgreSQL, skipping");
848 self.buffer.clear();
849 self.buffered_rows = 0;
850 return Ok(());
851 }
852
853 client
855 .batch_execute("BEGIN")
856 .await
857 .map_err(|e| ConnectorError::WriteError(format!("BEGIN: {e}")))?;
858
859 match self.flush_to_client(&client).await {
860 Ok(_result) => {
861 let sink_id = self.config.effective_sink_id();
862 #[allow(clippy::cast_possible_wrap)]
863 let epoch_i64 = epoch as i64;
864 let params: Vec<&(dyn postgres_types::ToSql + Sync)> =
865 vec![&sink_id, &epoch_i64];
866 client
867 .execute(Self::build_epoch_commit_sql(), ¶ms)
868 .await
869 .map_err(|e| {
870 ConnectorError::WriteError(format!("epoch marker write: {e}"))
871 })?;
872
873 client
874 .batch_execute("COMMIT")
875 .await
876 .map_err(|e| ConnectorError::WriteError(format!("COMMIT: {e}")))?;
877 }
878 Err(e) => {
879 let _ = client.batch_execute("ROLLBACK").await;
880 return Err(e);
881 }
882 }
883 } else {
884 self.flush_to_client(&client).await?;
886 }
887
888 self.buffer.clear();
889 self.buffered_rows = 0;
890 self.last_flush = Instant::now();
891
892 debug!(epoch, "PostgreSQL sink pre-committed");
893 Ok(())
894 }
895
896 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
897 if epoch != self.current_epoch {
898 return Err(ConnectorError::TransactionError(format!(
899 "epoch mismatch: expected {}, got {epoch}",
900 self.current_epoch
901 )));
902 }
903
904 self.last_committed_epoch = epoch;
905 self.metrics.record_commit();
906
907 debug!(epoch, "PostgreSQL sink epoch committed");
908 Ok(())
909 }
910
911 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
912 self.buffer.clear();
913 self.buffered_rows = 0;
914
915 self.metrics.record_rollback();
916 warn!(epoch, "PostgreSQL sink epoch rolled back");
917 Ok(())
918 }
919
920 fn health_check(&self) -> HealthStatus {
921 match self.state {
922 ConnectorState::Running => {
923 if let Some(pool) = &self.pool {
924 let status = pool.status();
925 if status.available > 0 {
926 HealthStatus::Healthy
927 } else {
928 HealthStatus::Degraded(format!(
929 "no available connections ({} in use)",
930 status.size
931 ))
932 }
933 } else {
934 HealthStatus::Unhealthy("pool not initialized".into())
935 }
936 }
937 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
938 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
939 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
940 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
941 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
942 }
943 }
944
945 fn metrics(&self) -> ConnectorMetrics {
946 self.metrics.to_connector_metrics()
947 }
948
949 fn capabilities(&self) -> SinkConnectorCapabilities {
950 let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
952 let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
953
954 if self.config.write_mode == WriteMode::Upsert {
955 caps = caps.with_upsert();
956 }
957 if self.config.changelog_mode {
958 caps = caps.with_changelog();
959 }
960 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
961 caps = caps.with_exactly_once().with_two_phase_commit();
962 }
963
964 caps
965 }
966
967 async fn flush(&mut self) -> Result<(), ConnectorError> {
968 if self.buffer.is_empty() {
969 return Ok(());
970 }
971 let client = self
972 .pool()?
973 .get()
974 .await
975 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
976 self.flush_to_client(&client).await?;
977 self.buffer.clear();
978 self.buffered_rows = 0;
979 self.last_flush = Instant::now();
980 Ok(())
981 }
982
983 async fn close(&mut self) -> Result<(), ConnectorError> {
984 info!("closing PostgreSQL sink connector");
985
986 if !self.buffer.is_empty() {
987 if let Some(pool) = &self.pool {
988 if let Ok(client) = pool.get().await {
989 let _ = self.flush_to_client(&client).await;
990 }
991 }
992 }
993
994 self.buffer.clear();
995 self.buffered_rows = 0;
996 self.pool = None;
997 self.state = ConnectorState::Closed;
998
999 info!(
1000 table = %self.config.qualified_table_name(),
1001 records = self.metrics.records_written.get(),
1002 epochs = self.metrics.epochs_committed.get(),
1003 "PostgreSQL sink connector closed"
1004 );
1005
1006 Ok(())
1007 }
1008}
1009
1010#[cfg(not(feature = "postgres-sink"))]
1012#[async_trait]
1013impl SinkConnector for PostgresSink {
1014 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
1015 Err(ConnectorError::ConfigurationError(
1016 "PostgreSQL sink requires the 'postgres-sink' feature".into(),
1017 ))
1018 }
1019
1020 async fn write_batch(&mut self, _batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
1021 Err(ConnectorError::ConfigurationError(
1022 "PostgreSQL sink requires the 'postgres-sink' feature".into(),
1023 ))
1024 }
1025
1026 fn schema(&self) -> SchemaRef {
1027 self.schema.clone()
1028 }
1029
1030 fn health_check(&self) -> HealthStatus {
1031 match self.state {
1032 ConnectorState::Running => HealthStatus::Healthy,
1033 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1034 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1035 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1036 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1037 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1038 }
1039 }
1040
1041 fn metrics(&self) -> ConnectorMetrics {
1042 self.metrics.to_connector_metrics()
1043 }
1044
1045 fn capabilities(&self) -> SinkConnectorCapabilities {
1046 let write_timeout = self.config.statement_timeout + Duration::from_secs(5);
1047 let mut caps = SinkConnectorCapabilities::new(write_timeout).with_idempotent();
1048 if self.config.write_mode == WriteMode::Upsert {
1049 caps = caps.with_upsert();
1050 }
1051 if self.config.changelog_mode {
1052 caps = caps.with_changelog();
1053 }
1054 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1055 caps = caps.with_exactly_once().with_two_phase_commit();
1056 }
1057 caps
1058 }
1059
1060 async fn close(&mut self) -> Result<(), ConnectorError> {
1061 self.state = ConnectorState::Closed;
1062 Ok(())
1063 }
1064}
1065
1066impl std::fmt::Debug for PostgresSink {
1067 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1068 f.debug_struct("PostgresSink")
1069 .field("state", &self.state)
1070 .field("table", &self.config.qualified_table_name())
1071 .field("mode", &self.config.write_mode)
1072 .field("guarantee", &self.config.delivery_guarantee)
1073 .field("current_epoch", &self.current_epoch)
1074 .field("last_committed_epoch", &self.last_committed_epoch)
1075 .field("buffered_rows", &self.buffered_rows)
1076 .finish_non_exhaustive()
1077 }
1078}
1079
1080fn user_columns(schema: &SchemaRef) -> Vec<String> {
1084 schema
1085 .fields()
1086 .iter()
1087 .filter(|f| !f.name().starts_with('_'))
1088 .map(|f| f.name().clone())
1089 .collect()
1090}
1091
1092fn user_fields(schema: &SchemaRef) -> Vec<&Arc<Field>> {
1094 schema
1095 .fields()
1096 .iter()
1097 .filter(|f| !f.name().starts_with('_'))
1098 .collect()
1099}
1100
1101fn build_user_schema(schema: &SchemaRef) -> SchemaRef {
1103 Arc::new(Schema::new(
1104 schema
1105 .fields()
1106 .iter()
1107 .filter(|f| !f.name().starts_with('_'))
1108 .cloned()
1109 .collect::<Vec<_>>(),
1110 ))
1111}
1112
1113fn filter_batch_by_indices(
1117 batch: &RecordBatch,
1118 indices: &[u32],
1119) -> Result<RecordBatch, ConnectorError> {
1120 if indices.is_empty() {
1121 let user_schema = Arc::new(Schema::new(
1122 batch
1123 .schema()
1124 .fields()
1125 .iter()
1126 .filter(|f| !f.name().starts_with('_'))
1127 .cloned()
1128 .collect::<Vec<_>>(),
1129 ));
1130 return Ok(RecordBatch::new_empty(user_schema));
1131 }
1132
1133 let indices_array = arrow_array::UInt32Array::from(indices.to_vec());
1134
1135 let user_schema = Arc::new(Schema::new(
1136 batch
1137 .schema()
1138 .fields()
1139 .iter()
1140 .filter(|f| !f.name().starts_with('_'))
1141 .cloned()
1142 .collect::<Vec<_>>(),
1143 ));
1144
1145 let filtered_columns: Vec<Arc<dyn arrow_array::Array>> = batch
1146 .schema()
1147 .fields()
1148 .iter()
1149 .enumerate()
1150 .filter(|(_, f)| !f.name().starts_with('_'))
1151 .map(|(i, _)| {
1152 arrow_select::take::take(batch.column(i), &indices_array, None)
1153 .map_err(|e| ConnectorError::Internal(format!("arrow take failed: {e}")))
1154 })
1155 .collect::<Result<Vec<_>, _>>()?;
1156
1157 RecordBatch::try_new(user_schema, filtered_columns)
1158 .map_err(|e| ConnectorError::Internal(format!("batch construction failed: {e}")))
1159}
1160
1161#[cfg(feature = "postgres-sink")]
1163fn strip_metadata_columns(batch: &RecordBatch) -> Result<RecordBatch, ConnectorError> {
1164 let schema = batch.schema();
1165 let user_indices: Vec<usize> = schema
1166 .fields()
1167 .iter()
1168 .enumerate()
1169 .filter(|(_, f)| !f.name().starts_with('_'))
1170 .map(|(i, _)| i)
1171 .collect();
1172
1173 if user_indices.len() == schema.fields().len() {
1174 return Ok(batch.clone());
1175 }
1176
1177 let user_schema = Arc::new(Schema::new(
1178 user_indices
1179 .iter()
1180 .map(|&i| schema.field(i).clone())
1181 .collect::<Vec<_>>(),
1182 ));
1183 let columns: Vec<Arc<dyn Array>> = user_indices
1184 .iter()
1185 .map(|&i| batch.column(i).clone())
1186 .collect();
1187
1188 RecordBatch::try_new(user_schema, columns)
1189 .map_err(|e| ConnectorError::Internal(format!("strip metadata: {e}")))
1190}
1191
1192#[cfg(feature = "postgres-sink")]
1194async fn execute_unnest(
1195 client: &tokio_postgres::Client,
1196 sql: &str,
1197 batch: &RecordBatch,
1198) -> Result<u64, ConnectorError> {
1199 let params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = (0..batch.num_columns())
1200 .map(|i| arrow_column_to_pg_array(batch.column(i)))
1201 .collect::<Result<_, _>>()?;
1202
1203 let param_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = params
1204 .iter()
1205 .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
1206 .collect();
1207
1208 client
1209 .execute(sql, ¶m_refs)
1210 .await
1211 .map_err(|e| ConnectorError::WriteError(format!("UNNEST execute: {e}")))
1212}
1213
1214#[cfg(feature = "postgres-sink")]
1216fn estimate_batch_bytes(batch: &RecordBatch) -> u64 {
1217 (0..batch.num_columns())
1218 .map(|i| batch.column(i).get_buffer_memory_size() as u64)
1219 .sum()
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224 use super::*;
1225 use arrow_array::{Int64Array, StringArray};
1226 use arrow_schema::{DataType, Field, Schema};
1227
1228 fn test_schema() -> SchemaRef {
1229 Arc::new(Schema::new(vec![
1230 Field::new("id", DataType::Int64, false),
1231 Field::new("name", DataType::Utf8, true),
1232 Field::new("value", DataType::Float64, true),
1233 ]))
1234 }
1235
1236 fn test_config() -> PostgresSinkConfig {
1237 PostgresSinkConfig::new("localhost", "mydb", "events")
1238 }
1239
1240 fn upsert_config() -> PostgresSinkConfig {
1241 let mut cfg = test_config();
1242 cfg.write_mode = WriteMode::Upsert;
1243 cfg.primary_key_columns = vec!["id".to_string()];
1244 cfg
1245 }
1246
1247 fn test_batch(n: usize) -> RecordBatch {
1248 let ids: Vec<i64> = (0..n as i64).collect();
1249 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1250 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1251
1252 RecordBatch::try_new(
1253 test_schema(),
1254 vec![
1255 Arc::new(Int64Array::from(ids)),
1256 Arc::new(StringArray::from(names)),
1257 Arc::new(arrow_array::Float64Array::from(values)),
1258 ],
1259 )
1260 .expect("test batch creation")
1261 }
1262
1263 #[test]
1266 fn test_new_defaults() {
1267 let sink = PostgresSink::new(test_schema(), test_config(), None);
1268 assert_eq!(sink.state(), ConnectorState::Created);
1269 assert_eq!(sink.current_epoch(), 0);
1270 assert_eq!(sink.last_committed_epoch(), 0);
1271 assert_eq!(sink.buffered_rows(), 0);
1272 assert!(sink.upsert_sql.is_none());
1273 assert!(sink.copy_sql.is_none());
1274 }
1275
1276 #[test]
1277 fn test_user_schema_strips_metadata() {
1278 let schema = Arc::new(Schema::new(vec![
1279 Field::new("id", DataType::Int64, false),
1280 Field::new("_op", DataType::Utf8, false),
1281 Field::new("value", DataType::Utf8, true),
1282 ]));
1283 let sink = PostgresSink::new(schema, test_config(), None);
1284 assert_eq!(sink.user_schema.fields().len(), 2);
1285 assert_eq!(sink.user_schema.field(0).name(), "id");
1286 assert_eq!(sink.user_schema.field(1).name(), "value");
1287 }
1288
1289 #[test]
1290 fn test_schema_returned() {
1291 let schema = test_schema();
1292 let sink = PostgresSink::new(schema.clone(), test_config(), None);
1293 assert_eq!(sink.schema(), schema);
1294 }
1295
1296 #[test]
1299 fn test_build_copy_sql() {
1300 let schema = test_schema();
1301 let config = test_config();
1302 let sql = PostgresSink::build_copy_sql(&schema, &config);
1303 assert_eq!(
1304 sql,
1305 "COPY public.events (id, name, value) FROM STDIN BINARY"
1306 );
1307 }
1308
1309 #[test]
1310 fn test_build_copy_sql_custom_schema() {
1311 let schema = test_schema();
1312 let mut config = test_config();
1313 config.schema_name = "analytics".to_string();
1314 let sql = PostgresSink::build_copy_sql(&schema, &config);
1315 assert!(sql.starts_with("COPY analytics.events"));
1316 }
1317
1318 #[test]
1319 fn test_build_copy_sql_excludes_metadata_columns() {
1320 let schema = Arc::new(Schema::new(vec![
1321 Field::new("id", DataType::Int64, false),
1322 Field::new("_op", DataType::Utf8, false),
1323 Field::new("_ts_ms", DataType::Int64, false),
1324 Field::new("value", DataType::Utf8, true),
1325 ]));
1326 let config = test_config();
1327 let sql = PostgresSink::build_copy_sql(&schema, &config);
1328 assert_eq!(sql, "COPY public.events (id, value) FROM STDIN BINARY");
1329 }
1330
1331 #[test]
1332 fn test_build_upsert_sql() {
1333 let schema = test_schema();
1334 let config = upsert_config();
1335 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1336
1337 assert!(sql.starts_with("INSERT INTO public.events"));
1338 assert!(sql.contains("SELECT * FROM UNNEST"));
1339 assert!(sql.contains("$1::int8[]"));
1340 assert!(sql.contains("$2::text[]"));
1341 assert!(sql.contains("$3::float8[]"));
1342 assert!(sql.contains("ON CONFLICT (id)"));
1343 assert!(sql.contains("DO UPDATE SET"));
1344 assert!(sql.contains("name = EXCLUDED.name"));
1345 assert!(sql.contains("value = EXCLUDED.value"));
1346 assert!(!sql.contains("id = EXCLUDED.id"));
1347 }
1348
1349 #[test]
1350 fn test_build_upsert_sql_composite_key() {
1351 let schema = test_schema();
1352 let mut config = upsert_config();
1353 config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1354 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1355
1356 assert!(sql.contains("ON CONFLICT (id, name)"));
1357 assert!(sql.contains("value = EXCLUDED.value"));
1358 assert!(!sql.contains("id = EXCLUDED.id"));
1359 assert!(!sql.contains("name = EXCLUDED.name"));
1360 }
1361
1362 #[test]
1363 fn test_build_upsert_sql_key_only_table() {
1364 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1365 let mut config = test_config();
1366 config.write_mode = WriteMode::Upsert;
1367 config.primary_key_columns = vec!["id".to_string()];
1368
1369 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1370 assert!(sql.contains("DO NOTHING"), "sql: {sql}");
1371 }
1372
1373 #[test]
1374 fn test_build_delete_sql() {
1375 let schema = test_schema();
1376 let config = upsert_config();
1377 let sql = PostgresSink::build_delete_sql(&schema, &config);
1378
1379 assert_eq!(sql, "DELETE FROM public.events WHERE id = ANY($1::int8[])");
1380 }
1381
1382 #[test]
1383 fn test_build_delete_sql_composite_key() {
1384 let schema = test_schema();
1385 let mut config = upsert_config();
1386 config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1387 let sql = PostgresSink::build_delete_sql(&schema, &config);
1388
1389 assert!(sql.contains("id = ANY($1::int8[])"));
1390 assert!(sql.contains("name = ANY($2::text[])"));
1391 assert!(sql.contains(" AND "));
1392 }
1393
1394 #[test]
1395 fn test_build_create_table_sql() {
1396 let schema = test_schema();
1397 let config = upsert_config();
1398 let sql = PostgresSink::build_create_table_sql(&schema, &config);
1399
1400 assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS public.events"));
1401 assert!(sql.contains("id BIGINT NOT NULL"));
1402 assert!(sql.contains("name TEXT"));
1403 assert!(sql.contains("value DOUBLE PRECISION"));
1404 assert!(sql.contains("PRIMARY KEY (id)"));
1405 }
1406
1407 #[test]
1408 fn test_build_create_table_sql_no_pk() {
1409 let schema = test_schema();
1410 let config = test_config();
1411 let sql = PostgresSink::build_create_table_sql(&schema, &config);
1412
1413 assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS"));
1414 assert!(!sql.contains("PRIMARY KEY"));
1415 }
1416
1417 #[test]
1418 fn test_offset_table_sql() {
1419 let sql = PostgresSink::build_offset_table_sql();
1420 assert!(sql.contains("_laminardb_sink_offsets"));
1421 assert!(sql.contains("sink_id TEXT PRIMARY KEY"));
1422 assert!(sql.contains("epoch BIGINT NOT NULL"));
1423 }
1424
1425 #[test]
1426 fn test_epoch_commit_sql() {
1427 let sql = PostgresSink::build_epoch_commit_sql();
1428 assert!(sql.contains("INSERT INTO _laminardb_sink_offsets"));
1429 assert!(sql.contains("ON CONFLICT (sink_id) DO UPDATE"));
1430 }
1431
1432 #[test]
1433 fn test_epoch_recover_sql() {
1434 let sql = PostgresSink::build_epoch_recover_sql();
1435 assert!(sql.contains("SELECT epoch FROM _laminardb_sink_offsets"));
1436 }
1437
1438 fn changelog_schema() -> SchemaRef {
1441 Arc::new(Schema::new(vec![
1442 Field::new("id", DataType::Int64, false),
1443 Field::new("name", DataType::Utf8, true),
1444 Field::new("_op", DataType::Utf8, false),
1445 Field::new("_ts_ms", DataType::Int64, false),
1446 ]))
1447 }
1448
1449 fn changelog_batch() -> RecordBatch {
1450 RecordBatch::try_new(
1451 changelog_schema(),
1452 vec![
1453 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1454 Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1455 Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1456 Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1457 ],
1458 )
1459 .expect("changelog batch creation")
1460 }
1461
1462 #[test]
1463 fn test_split_changelog_batch() {
1464 let batch = changelog_batch();
1465 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1466
1467 assert_eq!(inserts.num_rows(), 3);
1468 assert_eq!(deletes.num_rows(), 2);
1469 assert_eq!(inserts.num_columns(), 2);
1470 assert_eq!(deletes.num_columns(), 2);
1471
1472 let insert_ids = inserts
1473 .column(0)
1474 .as_any()
1475 .downcast_ref::<Int64Array>()
1476 .expect("i64 array");
1477 assert_eq!(insert_ids.value(0), 1);
1478 assert_eq!(insert_ids.value(1), 2);
1479 assert_eq!(insert_ids.value(2), 4);
1480
1481 let delete_ids = deletes
1482 .column(0)
1483 .as_any()
1484 .downcast_ref::<Int64Array>()
1485 .expect("i64 array");
1486 assert_eq!(delete_ids.value(0), 3);
1487 assert_eq!(delete_ids.value(1), 5);
1488 }
1489
1490 #[test]
1491 fn test_split_changelog_all_inserts() {
1492 let schema = changelog_schema();
1493 let batch = RecordBatch::try_new(
1494 schema,
1495 vec![
1496 Arc::new(Int64Array::from(vec![1, 2])),
1497 Arc::new(StringArray::from(vec!["a", "b"])),
1498 Arc::new(StringArray::from(vec!["I", "I"])),
1499 Arc::new(Int64Array::from(vec![100, 200])),
1500 ],
1501 )
1502 .expect("batch");
1503
1504 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1505 assert_eq!(inserts.num_rows(), 2);
1506 assert_eq!(deletes.num_rows(), 0);
1507 }
1508
1509 #[test]
1510 fn test_split_changelog_all_deletes() {
1511 let schema = changelog_schema();
1512 let batch = RecordBatch::try_new(
1513 schema,
1514 vec![
1515 Arc::new(Int64Array::from(vec![1, 2])),
1516 Arc::new(StringArray::from(vec!["a", "b"])),
1517 Arc::new(StringArray::from(vec!["D", "D"])),
1518 Arc::new(Int64Array::from(vec![100, 200])),
1519 ],
1520 )
1521 .expect("batch");
1522
1523 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1524 assert_eq!(inserts.num_rows(), 0);
1525 assert_eq!(deletes.num_rows(), 2);
1526 }
1527
1528 #[test]
1529 fn test_split_changelog_missing_op_column() {
1530 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1531 let batch =
1532 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).expect("batch");
1533
1534 let result = PostgresSink::split_changelog_batch(&batch);
1535 assert!(result.is_err());
1536 }
1537
1538 #[test]
1539 fn test_split_changelog_snapshot_read() {
1540 let schema = changelog_schema();
1541 let batch = RecordBatch::try_new(
1542 schema,
1543 vec![
1544 Arc::new(Int64Array::from(vec![1])),
1545 Arc::new(StringArray::from(vec!["a"])),
1546 Arc::new(StringArray::from(vec!["r"])),
1547 Arc::new(Int64Array::from(vec![100])),
1548 ],
1549 )
1550 .expect("batch");
1551
1552 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1553 assert_eq!(inserts.num_rows(), 1);
1554 assert_eq!(deletes.num_rows(), 0);
1555 }
1556
1557 #[tokio::test]
1560 async fn test_write_batch_buffering() {
1561 let mut config = test_config();
1562 config.batch_size = 100;
1563 let mut sink = PostgresSink::new(test_schema(), config, None);
1564 sink.state = ConnectorState::Running;
1565
1566 let batch = test_batch(10);
1567 let result = sink.write_batch(&batch).await.expect("write");
1568
1569 assert_eq!(result.records_written, 0);
1570 assert_eq!(sink.buffered_rows(), 10);
1571 }
1572
1573 #[tokio::test]
1574 async fn test_write_batch_empty() {
1575 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1576 sink.state = ConnectorState::Running;
1577
1578 let batch = test_batch(0);
1579 let result = sink.write_batch(&batch).await.expect("write");
1580 assert_eq!(result.records_written, 0);
1581 assert_eq!(sink.buffered_rows(), 0);
1582 }
1583
1584 #[tokio::test]
1585 async fn test_write_batch_not_running() {
1586 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1587
1588 let batch = test_batch(10);
1589 let result = sink.write_batch(&batch).await;
1590 assert!(result.is_err());
1591 }
1592
1593 #[tokio::test]
1594 async fn test_exactly_once_defers_flush() {
1595 let mut config = test_config();
1596 config.batch_size = 5; config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1598 let mut sink = PostgresSink::new(test_schema(), config, None);
1599 sink.state = ConnectorState::Running;
1600
1601 let batch = test_batch(10);
1602 let result = sink.write_batch(&batch).await.expect("write");
1603
1604 assert_eq!(result.records_written, 0);
1606 assert_eq!(sink.buffered_rows(), 10);
1607 }
1608
1609 #[test]
1612 fn test_health_check_created() {
1613 let sink = PostgresSink::new(test_schema(), test_config(), None);
1614 assert_eq!(sink.health_check(), HealthStatus::Unknown);
1615 }
1616
1617 #[test]
1620 fn test_capabilities_append_at_least_once() {
1621 let sink = PostgresSink::new(test_schema(), test_config(), None);
1622 let caps = sink.capabilities();
1623 assert!(caps.idempotent);
1624 assert!(!caps.upsert);
1625 assert!(!caps.changelog);
1626 assert!(!caps.exactly_once);
1627 }
1628
1629 #[test]
1630 fn test_capabilities_upsert() {
1631 let sink = PostgresSink::new(test_schema(), upsert_config(), None);
1632 let caps = sink.capabilities();
1633 assert!(caps.upsert);
1634 assert!(caps.idempotent);
1635 }
1636
1637 #[test]
1638 fn test_capabilities_changelog() {
1639 let mut config = test_config();
1640 config.changelog_mode = true;
1641 let sink = PostgresSink::new(test_schema(), config, None);
1642 let caps = sink.capabilities();
1643 assert!(caps.changelog);
1644 }
1645
1646 #[test]
1647 fn test_capabilities_exactly_once() {
1648 let mut config = upsert_config();
1649 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1650 let sink = PostgresSink::new(test_schema(), config, None);
1651 let caps = sink.capabilities();
1652 assert!(caps.exactly_once);
1653 }
1654
1655 #[test]
1658 fn test_metrics_initial() {
1659 let sink = PostgresSink::new(test_schema(), test_config(), None);
1660 let m = sink.metrics();
1661 assert_eq!(m.records_total, 0);
1662 assert_eq!(m.bytes_total, 0);
1663 assert_eq!(m.errors_total, 0);
1664 }
1665
1666 #[tokio::test]
1669 async fn test_epoch_lifecycle_state() {
1670 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1671 sink.state = ConnectorState::Running;
1672
1673 sink.begin_epoch(1).await.expect("begin");
1674 assert_eq!(sink.current_epoch(), 1);
1675
1676 sink.commit_epoch(1).await.expect("commit");
1678 assert_eq!(sink.last_committed_epoch(), 1);
1679
1680 let m = sink.metrics();
1681 let committed = m.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
1682 assert_eq!(committed.expect("metric").1, 1.0);
1683 }
1684
1685 #[tokio::test]
1686 async fn test_epoch_mismatch_rejected() {
1687 let mut sink = PostgresSink::new(test_schema(), test_config(), None);
1688 sink.state = ConnectorState::Running;
1689
1690 sink.begin_epoch(1).await.expect("begin");
1691 let result = sink.commit_epoch(2).await;
1692 assert!(result.is_err());
1693 }
1694
1695 #[tokio::test]
1696 async fn test_rollback_clears_buffer() {
1697 let mut config = test_config();
1698 config.batch_size = 1000;
1699 let mut sink = PostgresSink::new(test_schema(), config, None);
1700 sink.state = ConnectorState::Running;
1701
1702 let batch = test_batch(50);
1703 sink.write_batch(&batch).await.expect("write");
1704 assert_eq!(sink.buffered_rows(), 50);
1705
1706 sink.rollback_epoch(0).await.expect("rollback");
1707 assert_eq!(sink.buffered_rows(), 0);
1708 }
1709
1710 #[test]
1713 fn test_debug_output() {
1714 let sink = PostgresSink::new(test_schema(), test_config(), None);
1715 let debug = format!("{sink:?}");
1716 assert!(debug.contains("PostgresSink"));
1717 assert!(debug.contains("public.events"));
1718 }
1719
1720 #[test]
1723 fn test_build_user_schema() {
1724 let schema = Arc::new(Schema::new(vec![
1725 Field::new("id", DataType::Int64, false),
1726 Field::new("_op", DataType::Utf8, false),
1727 Field::new("value", DataType::Float64, true),
1728 Field::new("_ts_ms", DataType::Int64, false),
1729 ]));
1730 let user = build_user_schema(&schema);
1731 assert_eq!(user.fields().len(), 2);
1732 assert_eq!(user.field(0).name(), "id");
1733 assert_eq!(user.field(1).name(), "value");
1734 }
1735
1736 #[test]
1737 fn test_build_user_schema_no_metadata() {
1738 let schema = test_schema();
1739 let user = build_user_schema(&schema);
1740 assert_eq!(user.fields().len(), 3);
1741 }
1742}