1use std::sync::Arc;
19use std::time::Instant;
20
21use arrow_array::{Array, RecordBatch};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use async_trait::async_trait;
24use tracing::{debug, info, warn};
25
26use crate::config::{ConnectorConfig, ConnectorState};
27use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
28use crate::error::ConnectorError;
29use crate::health::HealthStatus;
30use crate::metrics::ConnectorMetrics;
31
32use super::sink_config::{DeliveryGuarantee, PostgresSinkConfig, WriteMode};
33use super::sink_metrics::PostgresSinkMetrics;
34use super::types::{arrow_to_pg_ddl_type, arrow_type_to_pg_array_cast, arrow_type_to_pg_sql};
35
36#[cfg(feature = "postgres-sink")]
37use super::types::arrow_column_to_pg_array;
38#[cfg(feature = "postgres-sink")]
39use bytes::BytesMut;
40#[cfg(feature = "postgres-sink")]
41use deadpool_postgres::Pool;
42
43pub struct PostgresSink {
49 config: PostgresSinkConfig,
51 schema: SchemaRef,
53 user_schema: SchemaRef,
55 state: ConnectorState,
57 current_epoch: u64,
59 last_committed_epoch: u64,
61 buffer: Vec<RecordBatch>,
63 buffered_rows: usize,
65 last_flush: Instant,
67 metrics: PostgresSinkMetrics,
69 upsert_sql: Option<String>,
71 copy_sql: Option<String>,
73 create_table_sql: Option<String>,
75 delete_sql: Option<String>,
77 #[cfg(feature = "postgres-sink")]
79 pool: Option<Pool>,
80 #[cfg(feature = "postgres-sink")]
82 encode_buf: BytesMut,
83}
84
85impl PostgresSink {
86 #[must_use]
88 pub fn new(schema: SchemaRef, config: PostgresSinkConfig) -> Self {
89 let user_schema = build_user_schema(&schema);
90 let buf_capacity = (config.batch_size / 1024).max(4);
92 Self {
93 config,
94 schema,
95 user_schema,
96 state: ConnectorState::Created,
97 current_epoch: 0,
98 last_committed_epoch: 0,
99 buffer: Vec::with_capacity(buf_capacity),
100 buffered_rows: 0,
101 last_flush: Instant::now(),
102 metrics: PostgresSinkMetrics::new(),
103 upsert_sql: None,
104 copy_sql: None,
105 create_table_sql: None,
106 delete_sql: None,
107 #[cfg(feature = "postgres-sink")]
108 pool: None,
109 #[cfg(feature = "postgres-sink")]
110 encode_buf: BytesMut::with_capacity(64 * 1024),
111 }
112 }
113
114 #[must_use]
116 pub fn state(&self) -> ConnectorState {
117 self.state
118 }
119
120 #[must_use]
122 pub fn current_epoch(&self) -> u64 {
123 self.current_epoch
124 }
125
126 #[must_use]
128 pub fn last_committed_epoch(&self) -> u64 {
129 self.last_committed_epoch
130 }
131
132 #[must_use]
134 pub fn buffered_rows(&self) -> usize {
135 self.buffered_rows
136 }
137
138 #[must_use]
140 pub fn sink_metrics(&self) -> &PostgresSinkMetrics {
141 &self.metrics
142 }
143
144 #[must_use]
152 pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
153 let columns = user_columns(schema);
154 let col_list = columns.join(", ");
155 format!(
156 "COPY {} ({}) FROM STDIN BINARY",
157 config.qualified_table_name(),
158 col_list,
159 )
160 }
161
162 #[must_use]
172 pub fn build_upsert_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
173 let fields = user_fields(schema);
174
175 let columns: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
176
177 let unnest_params: Vec<String> = fields
178 .iter()
179 .enumerate()
180 .map(|(i, f)| arrow_type_to_pg_array_cast(f.data_type(), i + 1))
181 .collect();
182
183 let non_key_columns: Vec<&str> = columns
184 .iter()
185 .copied()
186 .filter(|c| {
187 !config
188 .primary_key_columns
189 .iter()
190 .any(|pk| pk.as_str() == *c)
191 })
192 .collect();
193
194 let update_clause: Vec<String> = non_key_columns
195 .iter()
196 .map(|c| format!("{c} = EXCLUDED.{c}"))
197 .collect();
198
199 let pk_list = config.primary_key_columns.join(", ");
200
201 if update_clause.is_empty() {
202 format!(
204 "INSERT INTO {} ({}) \
205 SELECT * FROM UNNEST({}) \
206 ON CONFLICT ({}) DO NOTHING",
207 config.qualified_table_name(),
208 columns.join(", "),
209 unnest_params.join(", "),
210 pk_list,
211 )
212 } else {
213 format!(
214 "INSERT INTO {} ({}) \
215 SELECT * FROM UNNEST({}) \
216 ON CONFLICT ({}) DO UPDATE SET {}",
217 config.qualified_table_name(),
218 columns.join(", "),
219 unnest_params.join(", "),
220 pk_list,
221 update_clause.join(", "),
222 )
223 }
224 }
225
226 #[must_use]
232 pub fn build_delete_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
233 let pk_conditions: Vec<String> = config
234 .primary_key_columns
235 .iter()
236 .enumerate()
237 .map(|(i, col)| {
238 let dt = schema
239 .field_with_name(col)
240 .map(|f| f.data_type().clone())
241 .unwrap_or(DataType::Utf8);
242 let pg_type = arrow_type_to_pg_sql(&dt);
243 format!("{col} = ANY(${}::{}[])", i + 1, pg_type)
244 })
245 .collect();
246
247 format!(
248 "DELETE FROM {} WHERE {}",
249 config.qualified_table_name(),
250 pk_conditions.join(" AND "),
251 )
252 }
253
254 #[must_use]
265 pub fn build_create_table_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String {
266 let fields = user_fields(schema);
267
268 let column_defs: Vec<String> = fields
269 .iter()
270 .map(|f| {
271 let pg_type = arrow_to_pg_ddl_type(f.data_type());
272 let nullable = if f.is_nullable() { "" } else { " NOT NULL" };
273 format!(" {} {}{}", f.name(), pg_type, nullable)
274 })
275 .collect();
276
277 let mut ddl = format!(
278 "CREATE TABLE IF NOT EXISTS {} (\n{}\n",
279 config.qualified_table_name(),
280 column_defs.join(",\n"),
281 );
282
283 if !config.primary_key_columns.is_empty() {
284 use std::fmt::Write;
285 let _ = write!(
286 ddl,
287 ",\n PRIMARY KEY ({})\n",
288 config.primary_key_columns.join(", ")
289 );
290 }
291
292 ddl.push(')');
293 ddl
294 }
295
296 #[must_use]
298 pub fn build_offset_table_sql() -> &'static str {
299 "CREATE TABLE IF NOT EXISTS _laminardb_sink_offsets (\
300 \n sink_id TEXT PRIMARY KEY,\
301 \n epoch BIGINT NOT NULL,\
302 \n source_offsets JSONB,\
303 \n watermark BIGINT,\
304 \n updated_at TIMESTAMPTZ DEFAULT NOW()\
305 \n)"
306 }
307
308 #[must_use]
310 pub fn build_epoch_commit_sql() -> &'static str {
311 "INSERT INTO _laminardb_sink_offsets (sink_id, epoch, updated_at) \
312 VALUES ($1, $2, NOW()) \
313 ON CONFLICT (sink_id) DO UPDATE SET epoch = $2, updated_at = NOW()"
314 }
315
316 #[must_use]
318 pub fn build_epoch_recover_sql() -> &'static str {
319 "SELECT epoch FROM _laminardb_sink_offsets WHERE sink_id = $1"
320 }
321
322 pub fn split_changelog_batch(
337 batch: &RecordBatch,
338 ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
339 let op_idx = batch.schema().index_of("_op").map_err(|_| {
340 ConnectorError::ConfigurationError(
341 "changelog mode requires '_op' column in input schema".into(),
342 )
343 })?;
344
345 let op_array = batch
346 .column(op_idx)
347 .as_any()
348 .downcast_ref::<arrow_array::StringArray>()
349 .ok_or_else(|| {
350 ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
351 })?;
352
353 let mut insert_indices = Vec::new();
354 let mut delete_indices = Vec::new();
355
356 for i in 0..op_array.len() {
357 if op_array.is_null(i) {
358 continue;
359 }
360 match op_array.value(i) {
361 "I" | "U" | "r" => {
362 insert_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
363 }
364 "D" => {
365 delete_indices.push(u32::try_from(i).unwrap_or(u32::MAX));
366 }
367 _ => {} }
369 }
370
371 let insert_batch = filter_batch_by_indices(batch, &insert_indices)?;
372 let delete_batch = filter_batch_by_indices(batch, &delete_indices)?;
373
374 Ok((insert_batch, delete_batch))
375 }
376
377 fn prepare_statements(&mut self) {
381 self.copy_sql = Some(Self::build_copy_sql(&self.schema, &self.config));
382
383 if self.config.write_mode == WriteMode::Upsert {
384 self.upsert_sql = Some(Self::build_upsert_sql(&self.schema, &self.config));
385 }
386
387 if self.config.auto_create_table {
388 self.create_table_sql = Some(Self::build_create_table_sql(&self.schema, &self.config));
389 }
390
391 if self.config.changelog_mode {
392 self.delete_sql = Some(Self::build_delete_sql(&self.schema, &self.config));
393 }
394 }
395
396 #[cfg(feature = "postgres-sink")]
398 fn pool(&self) -> Result<&Pool, ConnectorError> {
399 self.pool.as_ref().ok_or(ConnectorError::InvalidState {
400 expected: "pool initialized (call open() first)".into(),
401 actual: "pool not initialized".into(),
402 })
403 }
404
405 #[cfg(feature = "postgres-sink")]
407 fn concat_buffer(&self) -> Result<RecordBatch, ConnectorError> {
408 if self.buffer.is_empty() {
409 return Ok(RecordBatch::new_empty(self.user_schema.clone()));
410 }
411
412 let stripped: Result<Vec<RecordBatch>, ConnectorError> =
413 self.buffer.iter().map(strip_metadata_columns).collect();
414 let stripped = stripped?;
415
416 arrow_select::concat::concat_batches(&self.user_schema, &stripped)
417 .map_err(|e| ConnectorError::Internal(format!("batch concat failed: {e}")))
418 }
419
420 #[cfg(feature = "postgres-sink")]
422 async fn flush_append(
423 &mut self,
424 client: &tokio_postgres::Client,
425 ) -> Result<WriteResult, ConnectorError> {
426 let user_batch = self.concat_buffer()?;
427 if user_batch.num_rows() == 0 {
428 return Ok(WriteResult::new(0, 0));
429 }
430
431 let mut encoder = pgpq::ArrowToPostgresBinaryEncoder::try_new(self.user_schema.as_ref())
432 .map_err(|e| ConnectorError::Internal(format!("pgpq encoder init: {e}")))?;
433
434 self.encode_buf.clear();
435 encoder.write_header(&mut self.encode_buf);
436 encoder
437 .write_batch(&user_batch, &mut self.encode_buf)
438 .map_err(|e| ConnectorError::Internal(format!("pgpq encode: {e}")))?;
439 encoder
440 .write_footer(&mut self.encode_buf)
441 .map_err(|e| ConnectorError::Internal(format!("pgpq footer: {e}")))?;
442
443 let encoded_bytes = self.encode_buf.len();
444 let bytes_to_send = self.encode_buf.split().freeze();
445
446 let copy_sql = self
447 .copy_sql
448 .as_deref()
449 .ok_or_else(|| ConnectorError::Internal("COPY SQL not prepared".into()))?;
450
451 let sink = client
452 .copy_in(copy_sql)
453 .await
454 .map_err(|e| ConnectorError::WriteError(format!("COPY start: {e}")))?;
455
456 {
457 use futures_util::SinkExt;
458 futures_util::pin_mut!(sink);
459 sink.send(bytes_to_send)
460 .await
461 .map_err(|e| ConnectorError::WriteError(format!("COPY send: {e}")))?;
462 sink.close()
463 .await
464 .map_err(|e| ConnectorError::WriteError(format!("COPY finish: {e}")))?;
465 }
466
467 let rows = user_batch.num_rows();
468 self.metrics.record_write(rows as u64, encoded_bytes as u64);
469 self.metrics.record_flush();
470 self.metrics.record_copy();
471
472 Ok(WriteResult::new(rows, encoded_bytes as u64))
473 }
474
475 #[cfg(feature = "postgres-sink")]
477 #[allow(clippy::cast_possible_truncation)]
478 async fn flush_upsert(
479 &mut self,
480 client: &tokio_postgres::Client,
481 ) -> Result<WriteResult, ConnectorError> {
482 if self.config.changelog_mode {
483 return self.flush_changelog(client).await;
484 }
485
486 let user_batch = self.concat_buffer()?;
487 if user_batch.num_rows() == 0 {
488 return Ok(WriteResult::new(0, 0));
489 }
490
491 let upsert_sql = self
492 .upsert_sql
493 .as_deref()
494 .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
495
496 let rows = execute_unnest(client, upsert_sql, &user_batch).await?;
497
498 let byte_estimate = estimate_batch_bytes(&user_batch);
499 self.metrics.record_write(rows, byte_estimate);
500 self.metrics.record_flush();
501 self.metrics.record_upsert();
502
503 Ok(WriteResult::new(rows as usize, byte_estimate))
504 }
505
506 #[cfg(feature = "postgres-sink")]
508 #[allow(clippy::cast_possible_truncation)]
509 async fn flush_changelog(
510 &mut self,
511 client: &tokio_postgres::Client,
512 ) -> Result<WriteResult, ConnectorError> {
513 let mut total_rows: u64 = 0;
514 let mut total_bytes: u64 = 0;
515
516 let mut all_inserts = Vec::new();
518 let mut all_deletes = Vec::new();
519 for batch in &self.buffer {
520 let (ins, del) = Self::split_changelog_batch(batch)?;
521 if ins.num_rows() > 0 {
522 all_inserts.push(ins);
523 }
524 if del.num_rows() > 0 {
525 all_deletes.push(del);
526 }
527 }
528
529 if !all_inserts.is_empty() {
535 let insert_batch =
536 arrow_select::concat::concat_batches(&self.user_schema, &all_inserts)
537 .map_err(|e| ConnectorError::Internal(format!("concat inserts: {e}")))?;
538
539 let upsert_sql = self
540 .upsert_sql
541 .as_deref()
542 .ok_or_else(|| ConnectorError::Internal("upsert SQL not prepared".into()))?;
543
544 let rows = execute_unnest(client, upsert_sql, &insert_batch).await?;
545 let bytes = estimate_batch_bytes(&insert_batch);
546 self.metrics.record_write(rows, bytes);
547 self.metrics.record_upsert();
548 total_rows += rows;
549 total_bytes += bytes;
550 }
551
552 if !all_deletes.is_empty() {
553 let delete_batch =
554 arrow_select::concat::concat_batches(&self.user_schema, &all_deletes)
555 .map_err(|e| ConnectorError::Internal(format!("concat deletes: {e}")))?;
556 let deleted = self.execute_deletes(client, &delete_batch).await?;
557 self.metrics.record_deletes(deleted as u64);
558 total_rows += deleted as u64;
559 }
560
561 self.metrics.record_flush();
562 Ok(WriteResult::new(total_rows as usize, total_bytes))
563 }
564
565 #[cfg(feature = "postgres-sink")]
567 #[allow(clippy::cast_possible_truncation)]
568 async fn execute_deletes(
569 &self,
570 client: &tokio_postgres::Client,
571 delete_batch: &RecordBatch,
572 ) -> Result<usize, ConnectorError> {
573 if delete_batch.num_rows() == 0 {
574 return Ok(0);
575 }
576
577 let delete_sql = self
578 .delete_sql
579 .as_deref()
580 .ok_or_else(|| ConnectorError::Internal("DELETE SQL not prepared".into()))?;
581
582 let pk_params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = self
583 .config
584 .primary_key_columns
585 .iter()
586 .map(|col| {
587 let idx = delete_batch.schema().index_of(col).map_err(|_| {
588 ConnectorError::ConfigurationError(format!(
589 "primary key column '{col}' not in delete batch"
590 ))
591 })?;
592 arrow_column_to_pg_array(delete_batch.column(idx))
593 })
594 .collect::<Result<_, _>>()?;
595
596 let pk_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = pk_params
597 .iter()
598 .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
599 .collect();
600
601 let rows = client
602 .execute(delete_sql, &pk_refs)
603 .await
604 .map_err(|e| ConnectorError::WriteError(format!("DELETE: {e}")))?;
605
606 Ok(rows as usize)
607 }
608
609 #[cfg(feature = "postgres-sink")]
611 async fn flush_to_client(
612 &mut self,
613 client: &tokio_postgres::Client,
614 ) -> Result<WriteResult, ConnectorError> {
615 match self.config.write_mode {
616 WriteMode::Append => self.flush_append(client).await,
617 WriteMode::Upsert => self.flush_upsert(client).await,
618 }
619 }
620
621 #[cfg(feature = "postgres-sink")]
623 #[allow(clippy::cast_sign_loss)]
624 async fn is_epoch_committed(
625 &self,
626 client: &tokio_postgres::Client,
627 epoch: u64,
628 ) -> Result<bool, ConnectorError> {
629 let sink_id = self.config.effective_sink_id();
630 let row = client
631 .query_opt(Self::build_epoch_recover_sql(), &[&sink_id])
632 .await
633 .map_err(|e| ConnectorError::Internal(format!("epoch recovery query: {e}")))?;
634
635 if let Some(row) = row {
636 let committed: i64 = row.get(0);
637 Ok(committed as u64 >= epoch)
638 } else {
639 Ok(false)
640 }
641 }
642}
643
644#[cfg(feature = "postgres-sink")]
648#[async_trait]
649impl SinkConnector for PostgresSink {
650 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
651 self.state = ConnectorState::Initializing;
652
653 if !config.properties().is_empty() {
654 self.config = PostgresSinkConfig::from_config(config)?;
655 }
656
657 if self.config.changelog_mode && self.config.write_mode != WriteMode::Upsert {
659 return Err(ConnectorError::ConfigurationError(
660 "changelog mode requires write.mode = 'upsert'".into(),
661 ));
662 }
663
664 info!(
665 table = %self.config.qualified_table_name(),
666 mode = %self.config.write_mode,
667 guarantee = %self.config.delivery_guarantee,
668 "opening PostgreSQL sink connector"
669 );
670
671 self.prepare_statements();
672
673 if self.config.sink_id.is_empty() {
674 self.config.sink_id = self.config.effective_sink_id();
675 }
676
677 let mut pool_cfg = deadpool_postgres::Config::new();
679 pool_cfg.host = Some(self.config.hostname.clone());
680 pool_cfg.port = Some(self.config.port);
681 pool_cfg.dbname = Some(self.config.database.clone());
682 pool_cfg.user = Some(self.config.username.clone());
683 pool_cfg.password = Some(self.config.password.clone());
684 pool_cfg.pool = Some(deadpool_postgres::PoolConfig::new(self.config.pool_size));
685
686 let pool = pool_cfg
687 .create_pool(
688 Some(deadpool_postgres::Runtime::Tokio1),
689 tokio_postgres::NoTls,
690 )
691 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool creation failed: {e}")))?;
692
693 let client = pool.get().await.map_err(|e| {
695 ConnectorError::ConnectionFailed(format!("initial connection failed: {e}"))
696 })?;
697
698 if self.config.auto_create_table {
700 if let Some(ddl) = &self.create_table_sql {
701 client.batch_execute(ddl.as_str()).await.map_err(|e| {
702 ConnectorError::Internal(format!("auto-create table failed: {e}"))
703 })?;
704 debug!(table = %self.config.qualified_table_name(), "target table ensured");
705 }
706 }
707
708 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
710 client
711 .batch_execute(Self::build_offset_table_sql())
712 .await
713 .map_err(|e| {
714 ConnectorError::Internal(format!("create offset table failed: {e}"))
715 })?;
716
717 {
719 let recover_sink_id = self.config.effective_sink_id();
720 let row = client
721 .query_opt(Self::build_epoch_recover_sql(), &[&recover_sink_id])
722 .await
723 .map_err(|e| ConnectorError::Internal(format!("epoch recovery: {e}")))?;
724 if let Some(row) = row {
725 let epoch: i64 = row.get(0);
726 #[allow(clippy::cast_sign_loss)]
727 {
728 self.last_committed_epoch = epoch as u64;
729 }
730 info!(epoch, "recovered last committed epoch");
731 }
732 }
733 }
734
735 self.pool = Some(pool);
736 self.state = ConnectorState::Running;
737
738 info!(
739 table = %self.config.qualified_table_name(),
740 pool_size = self.config.pool_size,
741 "PostgreSQL sink connector opened"
742 );
743
744 Ok(())
745 }
746
747 #[allow(clippy::cast_possible_truncation)]
748 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
749 if self.state != ConnectorState::Running {
750 return Err(ConnectorError::InvalidState {
751 expected: "Running".into(),
752 actual: self.state.to_string(),
753 });
754 }
755
756 if batch.num_rows() == 0 {
757 return Ok(WriteResult::new(0, 0));
758 }
759
760 self.buffer.push(batch.clone());
761 self.buffered_rows += batch.num_rows();
762
763 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
765 return Ok(WriteResult::new(0, 0));
766 }
767
768 let should_flush = self.buffered_rows >= self.config.batch_size
770 || self.last_flush.elapsed() >= self.config.flush_interval;
771
772 if should_flush {
773 let client = self
774 .pool()?
775 .get()
776 .await
777 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
778 let result = self.flush_to_client(&client).await?;
779 self.buffer.clear();
780 self.buffered_rows = 0;
781 self.last_flush = Instant::now();
782 return Ok(result);
783 }
784
785 Ok(WriteResult::new(0, 0))
786 }
787
788 fn schema(&self) -> SchemaRef {
789 self.schema.clone()
790 }
791
792 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
793 self.current_epoch = epoch;
794 debug!(epoch, "PostgreSQL sink epoch started");
795 Ok(())
796 }
797
798 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
799 if epoch != self.current_epoch {
800 return Err(ConnectorError::TransactionError(format!(
801 "epoch mismatch in pre_commit: expected {}, got {epoch}",
802 self.current_epoch
803 )));
804 }
805
806 if self.buffer.is_empty() {
807 return Ok(());
808 }
809
810 let client = self
811 .pool()?
812 .get()
813 .await
814 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
815
816 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
817 if self.is_epoch_committed(&client, epoch).await? {
819 info!(epoch, "epoch already committed in PostgreSQL, skipping");
820 self.buffer.clear();
821 self.buffered_rows = 0;
822 return Ok(());
823 }
824
825 client
827 .batch_execute("BEGIN")
828 .await
829 .map_err(|e| ConnectorError::WriteError(format!("BEGIN: {e}")))?;
830
831 match self.flush_to_client(&client).await {
832 Ok(_result) => {
833 let sink_id = self.config.effective_sink_id();
834 #[allow(clippy::cast_possible_wrap)]
835 let epoch_i64 = epoch as i64;
836 let params: Vec<&(dyn postgres_types::ToSql + Sync)> =
837 vec![&sink_id, &epoch_i64];
838 client
839 .execute(Self::build_epoch_commit_sql(), ¶ms)
840 .await
841 .map_err(|e| {
842 ConnectorError::WriteError(format!("epoch marker write: {e}"))
843 })?;
844
845 client
846 .batch_execute("COMMIT")
847 .await
848 .map_err(|e| ConnectorError::WriteError(format!("COMMIT: {e}")))?;
849 }
850 Err(e) => {
851 let _ = client.batch_execute("ROLLBACK").await;
852 return Err(e);
853 }
854 }
855 } else {
856 self.flush_to_client(&client).await?;
858 }
859
860 self.buffer.clear();
861 self.buffered_rows = 0;
862 self.last_flush = Instant::now();
863
864 debug!(epoch, "PostgreSQL sink pre-committed");
865 Ok(())
866 }
867
868 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
869 if epoch != self.current_epoch {
870 return Err(ConnectorError::TransactionError(format!(
871 "epoch mismatch: expected {}, got {epoch}",
872 self.current_epoch
873 )));
874 }
875
876 self.last_committed_epoch = epoch;
877 self.metrics.record_commit();
878
879 debug!(epoch, "PostgreSQL sink epoch committed");
880 Ok(())
881 }
882
883 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
884 self.buffer.clear();
885 self.buffered_rows = 0;
886
887 self.metrics.record_rollback();
888 warn!(epoch, "PostgreSQL sink epoch rolled back");
889 Ok(())
890 }
891
892 fn health_check(&self) -> HealthStatus {
893 match self.state {
894 ConnectorState::Running => {
895 if let Some(pool) = &self.pool {
896 let status = pool.status();
897 if status.available > 0 {
898 HealthStatus::Healthy
899 } else {
900 HealthStatus::Degraded(format!(
901 "no available connections ({} in use)",
902 status.size
903 ))
904 }
905 } else {
906 HealthStatus::Unhealthy("pool not initialized".into())
907 }
908 }
909 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
910 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
911 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
912 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
913 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
914 }
915 }
916
917 fn metrics(&self) -> ConnectorMetrics {
918 self.metrics.to_connector_metrics()
919 }
920
921 fn capabilities(&self) -> SinkConnectorCapabilities {
922 let mut caps = SinkConnectorCapabilities::default().with_idempotent();
923
924 if self.config.write_mode == WriteMode::Upsert {
925 caps = caps.with_upsert();
926 }
927 if self.config.changelog_mode {
928 caps = caps.with_changelog();
929 }
930 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
931 caps = caps.with_exactly_once().with_two_phase_commit();
932 }
933
934 caps
935 }
936
937 async fn flush(&mut self) -> Result<(), ConnectorError> {
938 if self.buffer.is_empty() {
939 return Ok(());
940 }
941 let client = self
942 .pool()?
943 .get()
944 .await
945 .map_err(|e| ConnectorError::ConnectionFailed(format!("pool checkout: {e}")))?;
946 self.flush_to_client(&client).await?;
947 self.buffer.clear();
948 self.buffered_rows = 0;
949 self.last_flush = Instant::now();
950 Ok(())
951 }
952
953 async fn close(&mut self) -> Result<(), ConnectorError> {
954 info!("closing PostgreSQL sink connector");
955
956 if !self.buffer.is_empty() {
957 if let Some(pool) = &self.pool {
958 if let Ok(client) = pool.get().await {
959 let _ = self.flush_to_client(&client).await;
960 }
961 }
962 }
963
964 self.buffer.clear();
965 self.buffered_rows = 0;
966 self.pool = None;
967 self.state = ConnectorState::Closed;
968
969 info!(
970 table = %self.config.qualified_table_name(),
971 records = self.metrics.records_written.load(
972 std::sync::atomic::Ordering::Relaxed
973 ),
974 epochs = self.metrics.epochs_committed.load(
975 std::sync::atomic::Ordering::Relaxed
976 ),
977 "PostgreSQL sink connector closed"
978 );
979
980 Ok(())
981 }
982}
983
984#[cfg(not(feature = "postgres-sink"))]
986#[async_trait]
987impl SinkConnector for PostgresSink {
988 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
989 Err(ConnectorError::UnsupportedOperation(
990 "PostgreSQL sink requires the 'postgres-sink' feature".into(),
991 ))
992 }
993
994 async fn write_batch(&mut self, _batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
995 Err(ConnectorError::UnsupportedOperation(
996 "PostgreSQL sink requires the 'postgres-sink' feature".into(),
997 ))
998 }
999
1000 fn schema(&self) -> SchemaRef {
1001 self.schema.clone()
1002 }
1003
1004 fn health_check(&self) -> HealthStatus {
1005 match self.state {
1006 ConnectorState::Running => HealthStatus::Healthy,
1007 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1008 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1009 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1010 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1011 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1012 }
1013 }
1014
1015 fn metrics(&self) -> ConnectorMetrics {
1016 self.metrics.to_connector_metrics()
1017 }
1018
1019 fn capabilities(&self) -> SinkConnectorCapabilities {
1020 let mut caps = SinkConnectorCapabilities::default().with_idempotent();
1021 if self.config.write_mode == WriteMode::Upsert {
1022 caps = caps.with_upsert();
1023 }
1024 if self.config.changelog_mode {
1025 caps = caps.with_changelog();
1026 }
1027 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1028 caps = caps.with_exactly_once().with_two_phase_commit();
1029 }
1030 caps
1031 }
1032
1033 async fn close(&mut self) -> Result<(), ConnectorError> {
1034 self.state = ConnectorState::Closed;
1035 Ok(())
1036 }
1037}
1038
1039impl std::fmt::Debug for PostgresSink {
1040 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1041 f.debug_struct("PostgresSink")
1042 .field("state", &self.state)
1043 .field("table", &self.config.qualified_table_name())
1044 .field("mode", &self.config.write_mode)
1045 .field("guarantee", &self.config.delivery_guarantee)
1046 .field("current_epoch", &self.current_epoch)
1047 .field("last_committed_epoch", &self.last_committed_epoch)
1048 .field("buffered_rows", &self.buffered_rows)
1049 .finish_non_exhaustive()
1050 }
1051}
1052
1053fn user_columns(schema: &SchemaRef) -> Vec<String> {
1057 schema
1058 .fields()
1059 .iter()
1060 .filter(|f| !f.name().starts_with('_'))
1061 .map(|f| f.name().clone())
1062 .collect()
1063}
1064
1065fn user_fields(schema: &SchemaRef) -> Vec<&Arc<Field>> {
1067 schema
1068 .fields()
1069 .iter()
1070 .filter(|f| !f.name().starts_with('_'))
1071 .collect()
1072}
1073
1074fn build_user_schema(schema: &SchemaRef) -> SchemaRef {
1076 Arc::new(Schema::new(
1077 schema
1078 .fields()
1079 .iter()
1080 .filter(|f| !f.name().starts_with('_'))
1081 .cloned()
1082 .collect::<Vec<_>>(),
1083 ))
1084}
1085
1086fn filter_batch_by_indices(
1090 batch: &RecordBatch,
1091 indices: &[u32],
1092) -> Result<RecordBatch, ConnectorError> {
1093 if indices.is_empty() {
1094 let user_schema = Arc::new(Schema::new(
1095 batch
1096 .schema()
1097 .fields()
1098 .iter()
1099 .filter(|f| !f.name().starts_with('_'))
1100 .cloned()
1101 .collect::<Vec<_>>(),
1102 ));
1103 return Ok(RecordBatch::new_empty(user_schema));
1104 }
1105
1106 let indices_array = arrow_array::UInt32Array::from(indices.to_vec());
1107
1108 let user_schema = Arc::new(Schema::new(
1109 batch
1110 .schema()
1111 .fields()
1112 .iter()
1113 .filter(|f| !f.name().starts_with('_'))
1114 .cloned()
1115 .collect::<Vec<_>>(),
1116 ));
1117
1118 let filtered_columns: Vec<Arc<dyn arrow_array::Array>> = batch
1119 .schema()
1120 .fields()
1121 .iter()
1122 .enumerate()
1123 .filter(|(_, f)| !f.name().starts_with('_'))
1124 .map(|(i, _)| {
1125 arrow_select::take::take(batch.column(i), &indices_array, None)
1126 .map_err(|e| ConnectorError::Internal(format!("arrow take failed: {e}")))
1127 })
1128 .collect::<Result<Vec<_>, _>>()?;
1129
1130 RecordBatch::try_new(user_schema, filtered_columns)
1131 .map_err(|e| ConnectorError::Internal(format!("batch construction failed: {e}")))
1132}
1133
1134#[cfg(feature = "postgres-sink")]
1136fn strip_metadata_columns(batch: &RecordBatch) -> Result<RecordBatch, ConnectorError> {
1137 let schema = batch.schema();
1138 let user_indices: Vec<usize> = schema
1139 .fields()
1140 .iter()
1141 .enumerate()
1142 .filter(|(_, f)| !f.name().starts_with('_'))
1143 .map(|(i, _)| i)
1144 .collect();
1145
1146 if user_indices.len() == schema.fields().len() {
1147 return Ok(batch.clone());
1148 }
1149
1150 let user_schema = Arc::new(Schema::new(
1151 user_indices
1152 .iter()
1153 .map(|&i| schema.field(i).clone())
1154 .collect::<Vec<_>>(),
1155 ));
1156 let columns: Vec<Arc<dyn Array>> = user_indices
1157 .iter()
1158 .map(|&i| batch.column(i).clone())
1159 .collect();
1160
1161 RecordBatch::try_new(user_schema, columns)
1162 .map_err(|e| ConnectorError::Internal(format!("strip metadata: {e}")))
1163}
1164
1165#[cfg(feature = "postgres-sink")]
1167async fn execute_unnest(
1168 client: &tokio_postgres::Client,
1169 sql: &str,
1170 batch: &RecordBatch,
1171) -> Result<u64, ConnectorError> {
1172 let params: Vec<Box<dyn postgres_types::ToSql + Sync + Send>> = (0..batch.num_columns())
1173 .map(|i| arrow_column_to_pg_array(batch.column(i)))
1174 .collect::<Result<_, _>>()?;
1175
1176 let param_refs: Vec<&(dyn postgres_types::ToSql + Sync)> = params
1177 .iter()
1178 .map(|p| p.as_ref() as &(dyn postgres_types::ToSql + Sync))
1179 .collect();
1180
1181 client
1182 .execute(sql, ¶m_refs)
1183 .await
1184 .map_err(|e| ConnectorError::WriteError(format!("UNNEST execute: {e}")))
1185}
1186
1187#[cfg(feature = "postgres-sink")]
1189fn estimate_batch_bytes(batch: &RecordBatch) -> u64 {
1190 (0..batch.num_columns())
1191 .map(|i| batch.column(i).get_buffer_memory_size() as u64)
1192 .sum()
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197 use super::*;
1198 use arrow_array::{Int64Array, StringArray};
1199 use arrow_schema::{DataType, Field, Schema};
1200
1201 fn test_schema() -> SchemaRef {
1202 Arc::new(Schema::new(vec![
1203 Field::new("id", DataType::Int64, false),
1204 Field::new("name", DataType::Utf8, true),
1205 Field::new("value", DataType::Float64, true),
1206 ]))
1207 }
1208
1209 fn test_config() -> PostgresSinkConfig {
1210 PostgresSinkConfig::new("localhost", "mydb", "events")
1211 }
1212
1213 fn upsert_config() -> PostgresSinkConfig {
1214 let mut cfg = test_config();
1215 cfg.write_mode = WriteMode::Upsert;
1216 cfg.primary_key_columns = vec!["id".to_string()];
1217 cfg
1218 }
1219
1220 fn test_batch(n: usize) -> RecordBatch {
1221 let ids: Vec<i64> = (0..n as i64).collect();
1222 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1223 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1224
1225 RecordBatch::try_new(
1226 test_schema(),
1227 vec![
1228 Arc::new(Int64Array::from(ids)),
1229 Arc::new(StringArray::from(names)),
1230 Arc::new(arrow_array::Float64Array::from(values)),
1231 ],
1232 )
1233 .expect("test batch creation")
1234 }
1235
1236 #[test]
1239 fn test_new_defaults() {
1240 let sink = PostgresSink::new(test_schema(), test_config());
1241 assert_eq!(sink.state(), ConnectorState::Created);
1242 assert_eq!(sink.current_epoch(), 0);
1243 assert_eq!(sink.last_committed_epoch(), 0);
1244 assert_eq!(sink.buffered_rows(), 0);
1245 assert!(sink.upsert_sql.is_none());
1246 assert!(sink.copy_sql.is_none());
1247 }
1248
1249 #[test]
1250 fn test_user_schema_strips_metadata() {
1251 let schema = Arc::new(Schema::new(vec![
1252 Field::new("id", DataType::Int64, false),
1253 Field::new("_op", DataType::Utf8, false),
1254 Field::new("value", DataType::Utf8, true),
1255 ]));
1256 let sink = PostgresSink::new(schema, test_config());
1257 assert_eq!(sink.user_schema.fields().len(), 2);
1258 assert_eq!(sink.user_schema.field(0).name(), "id");
1259 assert_eq!(sink.user_schema.field(1).name(), "value");
1260 }
1261
1262 #[test]
1263 fn test_schema_returned() {
1264 let schema = test_schema();
1265 let sink = PostgresSink::new(schema.clone(), test_config());
1266 assert_eq!(sink.schema(), schema);
1267 }
1268
1269 #[test]
1272 fn test_build_copy_sql() {
1273 let schema = test_schema();
1274 let config = test_config();
1275 let sql = PostgresSink::build_copy_sql(&schema, &config);
1276 assert_eq!(
1277 sql,
1278 "COPY public.events (id, name, value) FROM STDIN BINARY"
1279 );
1280 }
1281
1282 #[test]
1283 fn test_build_copy_sql_custom_schema() {
1284 let schema = test_schema();
1285 let mut config = test_config();
1286 config.schema_name = "analytics".to_string();
1287 let sql = PostgresSink::build_copy_sql(&schema, &config);
1288 assert!(sql.starts_with("COPY analytics.events"));
1289 }
1290
1291 #[test]
1292 fn test_build_copy_sql_excludes_metadata_columns() {
1293 let schema = Arc::new(Schema::new(vec![
1294 Field::new("id", DataType::Int64, false),
1295 Field::new("_op", DataType::Utf8, false),
1296 Field::new("_ts_ms", DataType::Int64, false),
1297 Field::new("value", DataType::Utf8, true),
1298 ]));
1299 let config = test_config();
1300 let sql = PostgresSink::build_copy_sql(&schema, &config);
1301 assert_eq!(sql, "COPY public.events (id, value) FROM STDIN BINARY");
1302 }
1303
1304 #[test]
1305 fn test_build_upsert_sql() {
1306 let schema = test_schema();
1307 let config = upsert_config();
1308 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1309
1310 assert!(sql.starts_with("INSERT INTO public.events"));
1311 assert!(sql.contains("SELECT * FROM UNNEST"));
1312 assert!(sql.contains("$1::int8[]"));
1313 assert!(sql.contains("$2::text[]"));
1314 assert!(sql.contains("$3::float8[]"));
1315 assert!(sql.contains("ON CONFLICT (id)"));
1316 assert!(sql.contains("DO UPDATE SET"));
1317 assert!(sql.contains("name = EXCLUDED.name"));
1318 assert!(sql.contains("value = EXCLUDED.value"));
1319 assert!(!sql.contains("id = EXCLUDED.id"));
1320 }
1321
1322 #[test]
1323 fn test_build_upsert_sql_composite_key() {
1324 let schema = test_schema();
1325 let mut config = upsert_config();
1326 config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1327 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1328
1329 assert!(sql.contains("ON CONFLICT (id, name)"));
1330 assert!(sql.contains("value = EXCLUDED.value"));
1331 assert!(!sql.contains("id = EXCLUDED.id"));
1332 assert!(!sql.contains("name = EXCLUDED.name"));
1333 }
1334
1335 #[test]
1336 fn test_build_upsert_sql_key_only_table() {
1337 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1338 let mut config = test_config();
1339 config.write_mode = WriteMode::Upsert;
1340 config.primary_key_columns = vec!["id".to_string()];
1341
1342 let sql = PostgresSink::build_upsert_sql(&schema, &config);
1343 assert!(sql.contains("DO NOTHING"), "sql: {sql}");
1344 }
1345
1346 #[test]
1347 fn test_build_delete_sql() {
1348 let schema = test_schema();
1349 let config = upsert_config();
1350 let sql = PostgresSink::build_delete_sql(&schema, &config);
1351
1352 assert_eq!(sql, "DELETE FROM public.events WHERE id = ANY($1::int8[])");
1353 }
1354
1355 #[test]
1356 fn test_build_delete_sql_composite_key() {
1357 let schema = test_schema();
1358 let mut config = upsert_config();
1359 config.primary_key_columns = vec!["id".to_string(), "name".to_string()];
1360 let sql = PostgresSink::build_delete_sql(&schema, &config);
1361
1362 assert!(sql.contains("id = ANY($1::int8[])"));
1363 assert!(sql.contains("name = ANY($2::text[])"));
1364 assert!(sql.contains(" AND "));
1365 }
1366
1367 #[test]
1368 fn test_build_create_table_sql() {
1369 let schema = test_schema();
1370 let config = upsert_config();
1371 let sql = PostgresSink::build_create_table_sql(&schema, &config);
1372
1373 assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS public.events"));
1374 assert!(sql.contains("id BIGINT NOT NULL"));
1375 assert!(sql.contains("name TEXT"));
1376 assert!(sql.contains("value DOUBLE PRECISION"));
1377 assert!(sql.contains("PRIMARY KEY (id)"));
1378 }
1379
1380 #[test]
1381 fn test_build_create_table_sql_no_pk() {
1382 let schema = test_schema();
1383 let config = test_config();
1384 let sql = PostgresSink::build_create_table_sql(&schema, &config);
1385
1386 assert!(sql.starts_with("CREATE TABLE IF NOT EXISTS"));
1387 assert!(!sql.contains("PRIMARY KEY"));
1388 }
1389
1390 #[test]
1391 fn test_offset_table_sql() {
1392 let sql = PostgresSink::build_offset_table_sql();
1393 assert!(sql.contains("_laminardb_sink_offsets"));
1394 assert!(sql.contains("sink_id TEXT PRIMARY KEY"));
1395 assert!(sql.contains("epoch BIGINT NOT NULL"));
1396 }
1397
1398 #[test]
1399 fn test_epoch_commit_sql() {
1400 let sql = PostgresSink::build_epoch_commit_sql();
1401 assert!(sql.contains("INSERT INTO _laminardb_sink_offsets"));
1402 assert!(sql.contains("ON CONFLICT (sink_id) DO UPDATE"));
1403 }
1404
1405 #[test]
1406 fn test_epoch_recover_sql() {
1407 let sql = PostgresSink::build_epoch_recover_sql();
1408 assert!(sql.contains("SELECT epoch FROM _laminardb_sink_offsets"));
1409 }
1410
1411 fn changelog_schema() -> SchemaRef {
1414 Arc::new(Schema::new(vec![
1415 Field::new("id", DataType::Int64, false),
1416 Field::new("name", DataType::Utf8, true),
1417 Field::new("_op", DataType::Utf8, false),
1418 Field::new("_ts_ms", DataType::Int64, false),
1419 ]))
1420 }
1421
1422 fn changelog_batch() -> RecordBatch {
1423 RecordBatch::try_new(
1424 changelog_schema(),
1425 vec![
1426 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1427 Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1428 Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1429 Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1430 ],
1431 )
1432 .expect("changelog batch creation")
1433 }
1434
1435 #[test]
1436 fn test_split_changelog_batch() {
1437 let batch = changelog_batch();
1438 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1439
1440 assert_eq!(inserts.num_rows(), 3);
1441 assert_eq!(deletes.num_rows(), 2);
1442 assert_eq!(inserts.num_columns(), 2);
1443 assert_eq!(deletes.num_columns(), 2);
1444
1445 let insert_ids = inserts
1446 .column(0)
1447 .as_any()
1448 .downcast_ref::<Int64Array>()
1449 .expect("i64 array");
1450 assert_eq!(insert_ids.value(0), 1);
1451 assert_eq!(insert_ids.value(1), 2);
1452 assert_eq!(insert_ids.value(2), 4);
1453
1454 let delete_ids = deletes
1455 .column(0)
1456 .as_any()
1457 .downcast_ref::<Int64Array>()
1458 .expect("i64 array");
1459 assert_eq!(delete_ids.value(0), 3);
1460 assert_eq!(delete_ids.value(1), 5);
1461 }
1462
1463 #[test]
1464 fn test_split_changelog_all_inserts() {
1465 let schema = changelog_schema();
1466 let batch = RecordBatch::try_new(
1467 schema,
1468 vec![
1469 Arc::new(Int64Array::from(vec![1, 2])),
1470 Arc::new(StringArray::from(vec!["a", "b"])),
1471 Arc::new(StringArray::from(vec!["I", "I"])),
1472 Arc::new(Int64Array::from(vec![100, 200])),
1473 ],
1474 )
1475 .expect("batch");
1476
1477 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1478 assert_eq!(inserts.num_rows(), 2);
1479 assert_eq!(deletes.num_rows(), 0);
1480 }
1481
1482 #[test]
1483 fn test_split_changelog_all_deletes() {
1484 let schema = changelog_schema();
1485 let batch = RecordBatch::try_new(
1486 schema,
1487 vec![
1488 Arc::new(Int64Array::from(vec![1, 2])),
1489 Arc::new(StringArray::from(vec!["a", "b"])),
1490 Arc::new(StringArray::from(vec!["D", "D"])),
1491 Arc::new(Int64Array::from(vec![100, 200])),
1492 ],
1493 )
1494 .expect("batch");
1495
1496 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1497 assert_eq!(inserts.num_rows(), 0);
1498 assert_eq!(deletes.num_rows(), 2);
1499 }
1500
1501 #[test]
1502 fn test_split_changelog_missing_op_column() {
1503 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1504 let batch =
1505 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).expect("batch");
1506
1507 let result = PostgresSink::split_changelog_batch(&batch);
1508 assert!(result.is_err());
1509 }
1510
1511 #[test]
1512 fn test_split_changelog_snapshot_read() {
1513 let schema = changelog_schema();
1514 let batch = RecordBatch::try_new(
1515 schema,
1516 vec![
1517 Arc::new(Int64Array::from(vec![1])),
1518 Arc::new(StringArray::from(vec!["a"])),
1519 Arc::new(StringArray::from(vec!["r"])),
1520 Arc::new(Int64Array::from(vec![100])),
1521 ],
1522 )
1523 .expect("batch");
1524
1525 let (inserts, deletes) = PostgresSink::split_changelog_batch(&batch).expect("split");
1526 assert_eq!(inserts.num_rows(), 1);
1527 assert_eq!(deletes.num_rows(), 0);
1528 }
1529
1530 #[tokio::test]
1533 async fn test_write_batch_buffering() {
1534 let mut config = test_config();
1535 config.batch_size = 100;
1536 let mut sink = PostgresSink::new(test_schema(), config);
1537 sink.state = ConnectorState::Running;
1538
1539 let batch = test_batch(10);
1540 let result = sink.write_batch(&batch).await.expect("write");
1541
1542 assert_eq!(result.records_written, 0);
1543 assert_eq!(sink.buffered_rows(), 10);
1544 }
1545
1546 #[tokio::test]
1547 async fn test_write_batch_empty() {
1548 let mut sink = PostgresSink::new(test_schema(), test_config());
1549 sink.state = ConnectorState::Running;
1550
1551 let batch = test_batch(0);
1552 let result = sink.write_batch(&batch).await.expect("write");
1553 assert_eq!(result.records_written, 0);
1554 assert_eq!(sink.buffered_rows(), 0);
1555 }
1556
1557 #[tokio::test]
1558 async fn test_write_batch_not_running() {
1559 let mut sink = PostgresSink::new(test_schema(), test_config());
1560
1561 let batch = test_batch(10);
1562 let result = sink.write_batch(&batch).await;
1563 assert!(result.is_err());
1564 }
1565
1566 #[tokio::test]
1567 async fn test_exactly_once_defers_flush() {
1568 let mut config = test_config();
1569 config.batch_size = 5; config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1571 let mut sink = PostgresSink::new(test_schema(), config);
1572 sink.state = ConnectorState::Running;
1573
1574 let batch = test_batch(10);
1575 let result = sink.write_batch(&batch).await.expect("write");
1576
1577 assert_eq!(result.records_written, 0);
1579 assert_eq!(sink.buffered_rows(), 10);
1580 }
1581
1582 #[test]
1585 fn test_health_check_created() {
1586 let sink = PostgresSink::new(test_schema(), test_config());
1587 assert_eq!(sink.health_check(), HealthStatus::Unknown);
1588 }
1589
1590 #[test]
1593 fn test_capabilities_append_at_least_once() {
1594 let sink = PostgresSink::new(test_schema(), test_config());
1595 let caps = sink.capabilities();
1596 assert!(caps.idempotent);
1597 assert!(!caps.upsert);
1598 assert!(!caps.changelog);
1599 assert!(!caps.exactly_once);
1600 }
1601
1602 #[test]
1603 fn test_capabilities_upsert() {
1604 let sink = PostgresSink::new(test_schema(), upsert_config());
1605 let caps = sink.capabilities();
1606 assert!(caps.upsert);
1607 assert!(caps.idempotent);
1608 }
1609
1610 #[test]
1611 fn test_capabilities_changelog() {
1612 let mut config = test_config();
1613 config.changelog_mode = true;
1614 let sink = PostgresSink::new(test_schema(), config);
1615 let caps = sink.capabilities();
1616 assert!(caps.changelog);
1617 }
1618
1619 #[test]
1620 fn test_capabilities_exactly_once() {
1621 let mut config = upsert_config();
1622 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1623 let sink = PostgresSink::new(test_schema(), config);
1624 let caps = sink.capabilities();
1625 assert!(caps.exactly_once);
1626 }
1627
1628 #[test]
1631 fn test_metrics_initial() {
1632 let sink = PostgresSink::new(test_schema(), test_config());
1633 let m = sink.metrics();
1634 assert_eq!(m.records_total, 0);
1635 assert_eq!(m.bytes_total, 0);
1636 assert_eq!(m.errors_total, 0);
1637 }
1638
1639 #[tokio::test]
1642 async fn test_epoch_lifecycle_state() {
1643 let mut sink = PostgresSink::new(test_schema(), test_config());
1644 sink.state = ConnectorState::Running;
1645
1646 sink.begin_epoch(1).await.expect("begin");
1647 assert_eq!(sink.current_epoch(), 1);
1648
1649 sink.commit_epoch(1).await.expect("commit");
1651 assert_eq!(sink.last_committed_epoch(), 1);
1652
1653 let m = sink.metrics();
1654 let committed = m.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
1655 assert_eq!(committed.expect("metric").1, 1.0);
1656 }
1657
1658 #[tokio::test]
1659 async fn test_epoch_mismatch_rejected() {
1660 let mut sink = PostgresSink::new(test_schema(), test_config());
1661 sink.state = ConnectorState::Running;
1662
1663 sink.begin_epoch(1).await.expect("begin");
1664 let result = sink.commit_epoch(2).await;
1665 assert!(result.is_err());
1666 }
1667
1668 #[tokio::test]
1669 async fn test_rollback_clears_buffer() {
1670 let mut config = test_config();
1671 config.batch_size = 1000;
1672 let mut sink = PostgresSink::new(test_schema(), config);
1673 sink.state = ConnectorState::Running;
1674
1675 let batch = test_batch(50);
1676 sink.write_batch(&batch).await.expect("write");
1677 assert_eq!(sink.buffered_rows(), 50);
1678
1679 sink.rollback_epoch(0).await.expect("rollback");
1680 assert_eq!(sink.buffered_rows(), 0);
1681 }
1682
1683 #[test]
1686 fn test_debug_output() {
1687 let sink = PostgresSink::new(test_schema(), test_config());
1688 let debug = format!("{sink:?}");
1689 assert!(debug.contains("PostgresSink"));
1690 assert!(debug.contains("public.events"));
1691 }
1692
1693 #[test]
1696 fn test_build_user_schema() {
1697 let schema = Arc::new(Schema::new(vec![
1698 Field::new("id", DataType::Int64, false),
1699 Field::new("_op", DataType::Utf8, false),
1700 Field::new("value", DataType::Float64, true),
1701 Field::new("_ts_ms", DataType::Int64, false),
1702 ]));
1703 let user = build_user_schema(&schema);
1704 assert_eq!(user.fields().len(), 2);
1705 assert_eq!(user.field(0).name(), "id");
1706 assert_eq!(user.field(1).name(), "value");
1707 }
1708
1709 #[test]
1710 fn test_build_user_schema_no_metadata() {
1711 let schema = test_schema();
1712 let user = build_user_schema(&schema);
1713 assert_eq!(user.fields().len(), 3);
1714 }
1715}