1use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Instant;
15
16use arrow_array::RecordBatch;
17use arrow_schema::SchemaRef;
18use async_trait::async_trait;
19use tokio::sync::Notify;
20
21use crate::checkpoint::SourceCheckpoint;
22use crate::config::{ConnectorConfig, ConnectorState};
23use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
24use crate::error::ConnectorError;
25use crate::health::HealthStatus;
26use crate::metrics::ConnectorMetrics;
27
28use super::changelog::{events_to_record_batch, tuple_to_json, CdcOperation, ChangeEvent};
29use super::config::PostgresCdcConfig;
30use super::decoder::{decode_message, WalMessage};
31use super::lsn::Lsn;
32use super::metrics::CdcMetrics;
33use super::schema::{cdc_envelope_schema, RelationCache, RelationInfo};
34pub struct PostgresCdcSource {
51 config: PostgresCdcConfig,
53
54 state: ConnectorState,
56
57 schema: SchemaRef,
59
60 metrics: Arc<CdcMetrics>,
62
63 relation_cache: RelationCache,
65
66 event_buffer: VecDeque<ChangeEvent>,
68
69 current_txn: Option<TransactionState>,
71
72 confirmed_flush_lsn: Lsn,
74
75 write_lsn: Lsn,
77
78 #[allow(dead_code)] last_keepalive: Instant,
81
82 pending_messages: VecDeque<Vec<u8>>,
84
85 data_ready: Arc<Notify>,
87
88 #[cfg(feature = "postgres-cdc")]
90 connection_handle: Option<tokio::task::JoinHandle<()>>,
91
92 #[cfg(feature = "postgres-cdc")]
94 wal_rx: Option<tokio::sync::mpsc::Receiver<WalPayload>>,
95
96 #[cfg(feature = "postgres-cdc")]
98 reader_handle: Option<tokio::task::JoinHandle<()>>,
99
100 #[cfg(feature = "postgres-cdc")]
102 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
103}
104
105#[derive(Debug, Clone)]
107struct TransactionState {
108 _xid: u32,
110 final_lsn: Lsn,
112 commit_ts_ms: i64,
114 events: Vec<ChangeEvent>,
116}
117
118#[allow(dead_code)] enum WalPayload {
121 Begin {
122 final_lsn: u64,
123 commit_ts_us: i64,
124 xid: u32,
125 },
126 Commit {
127 end_lsn: u64,
128 commit_ts_us: i64,
129 lsn: u64,
130 },
131 XLogData {
132 wal_end: u64,
133 data: Vec<u8>,
134 },
135 KeepAlive {
136 wal_end: u64,
137 },
138}
139
140impl PostgresCdcSource {
141 #[must_use]
143 pub fn new(config: PostgresCdcConfig) -> Self {
144 Self {
145 config,
146 state: ConnectorState::Created,
147 schema: cdc_envelope_schema(),
148 metrics: Arc::new(CdcMetrics::new()),
149 relation_cache: RelationCache::new(),
150 event_buffer: VecDeque::new(),
151 current_txn: None,
152 confirmed_flush_lsn: Lsn::ZERO,
153 write_lsn: Lsn::ZERO,
154 last_keepalive: Instant::now(),
155 pending_messages: VecDeque::new(),
156 data_ready: Arc::new(Notify::new()),
157 #[cfg(feature = "postgres-cdc")]
158 connection_handle: None,
159 #[cfg(feature = "postgres-cdc")]
160 wal_rx: None,
161 #[cfg(feature = "postgres-cdc")]
162 reader_handle: None,
163 #[cfg(feature = "postgres-cdc")]
164 reader_shutdown: None,
165 }
166 }
167
168 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
174 let pg_config = PostgresCdcConfig::from_config(config)?;
175 Ok(Self::new(pg_config))
176 }
177
178 #[must_use]
180 pub fn config(&self) -> &PostgresCdcConfig {
181 &self.config
182 }
183
184 #[must_use]
186 pub fn confirmed_flush_lsn(&self) -> Lsn {
187 self.confirmed_flush_lsn
188 }
189
190 #[must_use]
192 pub fn write_lsn(&self) -> Lsn {
193 self.write_lsn
194 }
195
196 #[must_use]
198 pub fn replication_lag_bytes(&self) -> u64 {
199 self.write_lsn.diff(self.confirmed_flush_lsn)
200 }
201
202 #[must_use]
204 pub fn relation_cache(&self) -> &RelationCache {
205 &self.relation_cache
206 }
207
208 #[must_use]
210 pub fn buffered_events(&self) -> usize {
211 self.event_buffer.len()
212 }
213
214 pub fn enqueue_wal_data(&mut self, data: Vec<u8>) {
219 self.pending_messages.push_back(data);
220 }
221
222 pub fn process_pending_messages(&mut self) -> Result<(), ConnectorError> {
228 while let Some(data) = self.pending_messages.pop_front() {
229 self.metrics.record_bytes(data.len() as u64);
230 let msg = decode_message(&data)
231 .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
232 self.process_wal_message(msg)?;
233 }
234 Ok(())
235 }
236
237 fn process_wal_message(&mut self, msg: WalMessage) -> Result<(), ConnectorError> {
239 match msg {
240 WalMessage::Begin(begin) => {
241 self.current_txn = Some(TransactionState {
242 _xid: begin.xid,
243 final_lsn: begin.final_lsn,
244 commit_ts_ms: begin.commit_ts_ms,
245 events: Vec::new(),
246 });
247 }
248 WalMessage::Commit(commit) => {
249 if let Some(txn) = self.current_txn.take() {
250 for event in txn.events {
252 self.event_buffer.push_back(event);
253 }
254 self.write_lsn = commit.end_lsn;
255 self.metrics.record_transaction();
256 self.metrics
257 .set_replication_lag_bytes(self.replication_lag_bytes());
258 }
259 }
260 WalMessage::Relation(rel) => {
261 let info = RelationInfo {
262 relation_id: rel.relation_id,
263 namespace: rel.namespace,
264 name: rel.name,
265 replica_identity: rel.replica_identity as char,
266 columns: rel.columns,
267 };
268 self.relation_cache.insert(info);
269 }
270 WalMessage::Insert(ins) => {
271 self.process_insert(ins.relation_id, &ins.new_tuple)?;
272 }
273 WalMessage::Update(upd) => {
274 self.process_update(upd.relation_id, upd.old_tuple.as_ref(), &upd.new_tuple)?;
275 }
276 WalMessage::Delete(del) => {
277 self.process_delete(del.relation_id, &del.old_tuple)?;
278 }
279 WalMessage::Truncate(_) | WalMessage::Origin(_) | WalMessage::Type(_) => {
280 }
283 }
284 Ok(())
285 }
286
287 fn process_insert(
288 &mut self,
289 relation_id: u32,
290 new_tuple: &super::decoder::TupleData,
291 ) -> Result<(), ConnectorError> {
292 let relation = self.get_relation(relation_id)?;
293 let table = relation.full_name();
294
295 if !self.config.should_include_table(&table) {
296 return Ok(());
297 }
298
299 let after_json = tuple_to_json(new_tuple, &relation);
300 let (lsn, ts_ms) = self.current_txn_context();
301
302 let event = ChangeEvent {
303 table,
304 op: CdcOperation::Insert,
305 lsn,
306 ts_ms,
307 before: None,
308 after: Some(after_json),
309 };
310
311 self.push_event(event);
312 self.metrics.record_insert();
313 Ok(())
314 }
315
316 fn process_update(
317 &mut self,
318 relation_id: u32,
319 old_tuple: Option<&super::decoder::TupleData>,
320 new_tuple: &super::decoder::TupleData,
321 ) -> Result<(), ConnectorError> {
322 let relation = self.get_relation(relation_id)?;
323 let table = relation.full_name();
324
325 if !self.config.should_include_table(&table) {
326 return Ok(());
327 }
328
329 let before_json = old_tuple.map(|t| tuple_to_json(t, &relation));
330 let after_json = tuple_to_json(new_tuple, &relation);
331 let (lsn, ts_ms) = self.current_txn_context();
332
333 let event = ChangeEvent {
334 table,
335 op: CdcOperation::Update,
336 lsn,
337 ts_ms,
338 before: before_json,
339 after: Some(after_json),
340 };
341
342 self.push_event(event);
343 self.metrics.record_update();
344 Ok(())
345 }
346
347 fn process_delete(
348 &mut self,
349 relation_id: u32,
350 old_tuple: &super::decoder::TupleData,
351 ) -> Result<(), ConnectorError> {
352 let relation = self.get_relation(relation_id)?;
353 let table = relation.full_name();
354
355 if !self.config.should_include_table(&table) {
356 return Ok(());
357 }
358
359 let before_json = tuple_to_json(old_tuple, &relation);
360 let (lsn, ts_ms) = self.current_txn_context();
361
362 let event = ChangeEvent {
363 table,
364 op: CdcOperation::Delete,
365 lsn,
366 ts_ms,
367 before: Some(before_json),
368 after: None,
369 };
370
371 self.push_event(event);
372 self.metrics.record_delete();
373 Ok(())
374 }
375
376 fn get_relation(&self, relation_id: u32) -> Result<RelationInfo, ConnectorError> {
377 self.relation_cache
378 .get(relation_id)
379 .cloned()
380 .ok_or_else(|| {
381 ConnectorError::ReadError(format!(
382 "unknown relation ID {relation_id} (no Relation message received yet)"
383 ))
384 })
385 }
386
387 fn current_txn_context(&self) -> (Lsn, i64) {
388 match &self.current_txn {
389 Some(txn) => (txn.final_lsn, txn.commit_ts_ms),
390 None => (self.write_lsn, 0),
391 }
392 }
393
394 fn push_event(&mut self, event: ChangeEvent) {
395 if let Some(txn) = &mut self.current_txn {
396 txn.events.push(event);
397 } else {
398 self.event_buffer.push_back(event);
399 }
400 }
401
402 #[cfg(feature = "postgres-cdc")]
404 fn process_wal_payload(&mut self, payload: WalPayload) -> Result<(), ConnectorError> {
405 use super::decoder::pg_timestamp_to_unix_ms;
406
407 match payload {
408 WalPayload::Begin {
409 final_lsn,
410 commit_ts_us,
411 xid,
412 } => {
413 let begin = super::decoder::BeginMessage {
414 final_lsn: Lsn::new(final_lsn),
415 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
416 xid,
417 };
418 self.process_wal_message(WalMessage::Begin(begin))
419 }
420 WalPayload::Commit {
421 end_lsn,
422 commit_ts_us,
423 lsn,
424 } => {
425 let commit = super::decoder::CommitMessage {
426 flags: 0,
427 commit_lsn: Lsn::new(lsn),
428 end_lsn: Lsn::new(end_lsn),
429 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
430 };
431 self.process_wal_message(WalMessage::Commit(commit))
432 }
433 WalPayload::XLogData { wal_end, data } => {
434 if !data.is_empty() && (data[0] == b'B' || data[0] == b'C') {
437 self.write_lsn = Lsn::new(wal_end);
438 return Ok(());
439 }
440 let msg = decode_message(&data)
441 .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
442 self.process_wal_message(msg)?;
443 self.write_lsn = Lsn::new(wal_end);
444 Ok(())
445 }
446 WalPayload::KeepAlive { wal_end } => {
447 self.write_lsn = Lsn::new(wal_end);
448 self.last_keepalive = Instant::now();
449 Ok(())
450 }
451 }
452 }
453
454 fn drain_events(&mut self, max: usize) -> Result<Option<RecordBatch>, ConnectorError> {
456 if self.event_buffer.is_empty() {
457 return Ok(None);
458 }
459
460 let count = max.min(self.event_buffer.len());
461 let events: Vec<ChangeEvent> = self.event_buffer.drain(..count).collect();
462
463 let batch = events_to_record_batch(&events)
464 .map_err(|e| ConnectorError::Internal(format!("Arrow batch build: {e}")))?;
465
466 self.metrics.record_batch();
467 Ok(Some(batch))
468 }
469}
470
471#[async_trait]
472#[allow(clippy::too_many_lines)]
473impl SourceConnector for PostgresCdcSource {
474 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
475 self.state = ConnectorState::Initializing;
476
477 if !config.properties().is_empty() {
479 self.config = PostgresCdcConfig::from_config(config)?;
480 }
481
482 if let Some(lsn) = self.config.start_lsn {
484 self.confirmed_flush_lsn = lsn;
485 self.write_lsn = lsn;
486 }
487
488 #[cfg(not(feature = "postgres-cdc"))]
491 {
492 return Err(ConnectorError::ConfigurationError(
493 "PostgreSQL CDC source requires the `postgres-cdc` feature flag. \
494 Rebuild with `--features postgres-cdc` to enable."
495 .to_string(),
496 ));
497 }
498
499 #[cfg(all(feature = "postgres-cdc", not(test)))]
500 {
501 use super::postgres_io;
502
503 let (client, handle) = postgres_io::connect(&self.config).await?;
505 self.connection_handle = Some(handle);
506
507 let slot_lsn = postgres_io::ensure_replication_slot(
509 &client,
510 &self.config.slot_name,
511 &self.config.output_plugin,
512 )
513 .await?;
514
515 if self.config.start_lsn.is_none() {
517 if let Some(lsn) = slot_lsn {
518 self.confirmed_flush_lsn = lsn;
519 self.write_lsn = lsn;
520 }
521 }
522
523 let mut repl_config = postgres_io::build_replication_config(&self.config);
525 if self.confirmed_flush_lsn != Lsn::ZERO {
527 repl_config.start_lsn =
528 pgwire_replication::Lsn::from_u64(self.confirmed_flush_lsn.as_u64());
529 }
530
531 let repl_client = pgwire_replication::ReplicationClient::connect(repl_config)
532 .await
533 .map_err(|e| {
534 ConnectorError::ConnectionFailed(format!("pgwire-replication connect: {e}"))
535 })?;
536
537 let (wal_tx, wal_rx) = tokio::sync::mpsc::channel(4096);
539 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
540 let data_ready = Arc::clone(&self.data_ready);
541
542 let reader_handle = tokio::spawn(async move {
543 let mut repl_client = repl_client;
544 loop {
545 tokio::select! {
546 biased;
547 _ = shutdown_rx.changed() => break,
548 event = repl_client.recv() => {
549 match event {
550 Ok(Some(event)) => {
551 let payload = match &event {
552 pgwire_replication::ReplicationEvent::Begin {
553 final_lsn,
554 xid,
555 commit_time_micros,
556 } => Some(WalPayload::Begin {
557 final_lsn: final_lsn.as_u64(),
558 commit_ts_us: *commit_time_micros,
559 xid: *xid,
560 }),
561 pgwire_replication::ReplicationEvent::Commit {
562 end_lsn,
563 commit_time_micros,
564 lsn,
565 } => {
566 repl_client.update_applied_lsn(*end_lsn);
567 Some(WalPayload::Commit {
568 end_lsn: end_lsn.as_u64(),
569 commit_ts_us: *commit_time_micros,
570 lsn: lsn.as_u64(),
571 })
572 }
573 pgwire_replication::ReplicationEvent::XLogData {
574 wal_end,
575 data,
576 ..
577 } => Some(WalPayload::XLogData {
578 wal_end: wal_end.as_u64(),
579 data: data.to_vec(),
580 }),
581 pgwire_replication::ReplicationEvent::KeepAlive {
582 wal_end,
583 ..
584 } => Some(WalPayload::KeepAlive {
585 wal_end: wal_end.as_u64(),
586 }),
587 _ => None,
588 };
589 if let Some(p) = payload {
590 if wal_tx.send(p).await.is_err() {
591 break;
592 }
593 data_ready.notify_one();
594 }
595 }
596 Ok(None) => break,
597 Err(e) => {
598 tracing::warn!(error = %e, "WAL reader error");
599 break;
600 }
601 }
602 }
603 }
604 }
605 let _ = repl_client.shutdown().await;
606 });
607
608 self.wal_rx = Some(wal_rx);
609 self.reader_handle = Some(reader_handle);
610 self.reader_shutdown = Some(shutdown_tx);
611 self.last_keepalive = Instant::now();
612 }
613
614 self.state = ConnectorState::Running;
615 Ok(())
616 }
617
618 async fn poll_batch(
619 &mut self,
620 max_records: usize,
621 ) -> Result<Option<SourceBatch>, ConnectorError> {
622 if self.state != ConnectorState::Running {
623 return Err(ConnectorError::InvalidState {
624 expected: "Running".to_string(),
625 actual: self.state.to_string(),
626 });
627 }
628
629 #[cfg(feature = "postgres-cdc")]
631 if let Some(mut rx) = self.wal_rx.take() {
632 while self.event_buffer.len() < max_records {
633 match rx.try_recv() {
634 Ok(payload) => self.process_wal_payload(payload)?,
635 Err(_) => break,
636 }
637 }
638 self.wal_rx = Some(rx);
639 }
640
641 self.process_pending_messages()?;
643
644 match self.drain_events(max_records)? {
651 Some(batch) => {
652 let lsn_str = self.write_lsn.to_string();
653 let partition = PartitionInfo::new(&self.config.slot_name, lsn_str);
654 Ok(Some(SourceBatch::with_partition(batch, partition)))
655 }
656 None => Ok(None),
657 }
658 }
659
660 fn schema(&self) -> SchemaRef {
661 Arc::clone(&self.schema)
662 }
663
664 fn checkpoint(&self) -> SourceCheckpoint {
665 let mut cp = SourceCheckpoint::new(0);
666 cp.set_offset("lsn", self.confirmed_flush_lsn.to_string());
667 cp.set_offset("write_lsn", self.write_lsn.to_string());
668 cp.set_metadata("slot_name", &self.config.slot_name);
669 cp.set_metadata("publication", &self.config.publication);
670 cp
671 }
672
673 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
674 if let Some(lsn_str) = checkpoint.get_offset("lsn") {
675 let lsn: Lsn = lsn_str.parse().map_err(|e| {
676 ConnectorError::CheckpointError(format!("invalid LSN in checkpoint: {e}"))
677 })?;
678 self.confirmed_flush_lsn = lsn;
679 self.metrics.set_confirmed_flush_lsn(lsn.as_u64());
680 }
681 if let Some(write_lsn_str) = checkpoint.get_offset("write_lsn") {
682 if let Ok(lsn) = write_lsn_str.parse::<Lsn>() {
683 self.write_lsn = lsn;
684 }
685 }
686 Ok(())
687 }
688
689 fn health_check(&self) -> HealthStatus {
690 match self.state {
691 ConnectorState::Running => {
692 let lag = self.replication_lag_bytes();
693 if lag > 100_000_000 {
694 HealthStatus::Degraded(format!("replication lag: {lag} bytes"))
696 } else {
697 HealthStatus::Healthy
698 }
699 }
700 ConnectorState::Failed => HealthStatus::Unhealthy("connector failed".to_string()),
701 _ => HealthStatus::Unknown,
702 }
703 }
704
705 fn metrics(&self) -> ConnectorMetrics {
706 self.metrics.to_connector_metrics()
707 }
708
709 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
710 Some(Arc::clone(&self.data_ready))
711 }
712
713 async fn close(&mut self) -> Result<(), ConnectorError> {
714 #[cfg(feature = "postgres-cdc")]
716 {
717 if let Some(tx) = self.reader_shutdown.take() {
718 let _ = tx.send(true);
719 }
720 if let Some(handle) = self.reader_handle.take() {
721 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
722 }
723 self.wal_rx = None;
724 }
725
726 #[cfg(feature = "postgres-cdc")]
728 if let Some(handle) = self.connection_handle.take() {
729 handle.abort();
730 }
731
732 self.state = ConnectorState::Closed;
733 self.event_buffer.clear();
734 self.pending_messages.clear();
735 Ok(())
736 }
737}
738
739#[cfg(test)]
742impl PostgresCdcSource {
743 fn inject_event(&mut self, event: ChangeEvent) {
745 self.event_buffer.push_back(event);
746 }
747
748 fn build_relation_message(
750 relation_id: u32,
751 namespace: &str,
752 name: &str,
753 columns: &[(u8, &str, u32, i32)], ) -> Vec<u8> {
755 let mut buf = vec![b'R'];
756 buf.extend_from_slice(&relation_id.to_be_bytes());
757 buf.extend_from_slice(namespace.as_bytes());
758 buf.push(0);
759 buf.extend_from_slice(name.as_bytes());
760 buf.push(0);
761 buf.push(b'd'); buf.extend_from_slice(&(columns.len() as i16).to_be_bytes());
763 for (flags, col_name, oid, modifier) in columns {
764 buf.push(*flags);
765 buf.extend_from_slice(col_name.as_bytes());
766 buf.push(0);
767 buf.extend_from_slice(&oid.to_be_bytes());
768 buf.extend_from_slice(&modifier.to_be_bytes());
769 }
770 buf
771 }
772
773 fn build_begin_message(final_lsn: u64, commit_ts_us: i64, xid: u32) -> Vec<u8> {
775 let mut buf = vec![b'B'];
776 buf.extend_from_slice(&final_lsn.to_be_bytes());
777 buf.extend_from_slice(&commit_ts_us.to_be_bytes());
778 buf.extend_from_slice(&xid.to_be_bytes());
779 buf
780 }
781
782 fn build_commit_message(commit_lsn: u64, end_lsn: u64, commit_ts_us: i64) -> Vec<u8> {
784 let mut buf = vec![b'C'];
785 buf.push(0); buf.extend_from_slice(&commit_lsn.to_be_bytes());
787 buf.extend_from_slice(&end_lsn.to_be_bytes());
788 buf.extend_from_slice(&commit_ts_us.to_be_bytes());
789 buf
790 }
791
792 fn build_insert_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
794 let mut buf = vec![b'I'];
795 buf.extend_from_slice(&relation_id.to_be_bytes());
796 buf.push(b'N');
797 buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
798 for val in values {
799 match val {
800 Some(s) => {
801 buf.push(b't');
802 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
803 buf.extend_from_slice(s.as_bytes());
804 }
805 None => buf.push(b'n'),
806 }
807 }
808 buf
809 }
810
811 fn build_delete_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
813 let mut buf = vec![b'D'];
814 buf.extend_from_slice(&relation_id.to_be_bytes());
815 buf.push(b'K'); buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
817 for val in values {
818 match val {
819 Some(s) => {
820 buf.push(b't');
821 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
822 buf.extend_from_slice(s.as_bytes());
823 }
824 None => buf.push(b'n'),
825 }
826 }
827 buf
828 }
829
830 fn build_update_message(
832 relation_id: u32,
833 old_values: &[Option<&str>],
834 new_values: &[Option<&str>],
835 ) -> Vec<u8> {
836 let mut buf = vec![b'U'];
837 buf.extend_from_slice(&relation_id.to_be_bytes());
838 buf.push(b'O');
840 buf.extend_from_slice(&(old_values.len() as i16).to_be_bytes());
841 for val in old_values {
842 match val {
843 Some(s) => {
844 buf.push(b't');
845 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
846 buf.extend_from_slice(s.as_bytes());
847 }
848 None => buf.push(b'n'),
849 }
850 }
851 buf.push(b'N');
853 buf.extend_from_slice(&(new_values.len() as i16).to_be_bytes());
854 for val in new_values {
855 match val {
856 Some(s) => {
857 buf.push(b't');
858 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
859 buf.extend_from_slice(s.as_bytes());
860 }
861 None => buf.push(b'n'),
862 }
863 }
864 buf
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use super::*;
871 use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
872 use arrow_array::cast::AsArray;
873
874 fn default_source() -> PostgresCdcSource {
875 PostgresCdcSource::new(PostgresCdcConfig::default())
876 }
877
878 fn running_source() -> PostgresCdcSource {
879 let mut src = default_source();
880 src.state = ConnectorState::Running;
881 src
882 }
883
884 #[test]
887 fn test_new_source() {
888 let src = default_source();
889 assert_eq!(src.state, ConnectorState::Created);
890 assert!(src.confirmed_flush_lsn.is_zero());
891 assert_eq!(src.event_buffer.len(), 0);
892 assert_eq!(src.schema().fields().len(), 6);
893 }
894
895 #[test]
896 fn test_from_config() {
897 let mut config = ConnectorConfig::new("postgres-cdc");
898 config.set("host", "pg.local");
899 config.set("database", "testdb");
900 config.set("slot.name", "my_slot");
901 config.set("publication", "my_pub");
902
903 let src = PostgresCdcSource::from_config(&config).unwrap();
904 assert_eq!(src.config().host, "pg.local");
905 assert_eq!(src.config().database, "testdb");
906 }
907
908 #[test]
909 fn test_from_config_invalid() {
910 let config = ConnectorConfig::new("postgres-cdc");
911 assert!(PostgresCdcSource::from_config(&config).is_err());
912 }
913
914 #[cfg(not(feature = "postgres-cdc"))]
918 #[tokio::test]
919 async fn test_open_fails_without_feature() {
920 let mut src = default_source();
921 let config = ConnectorConfig::new("postgres-cdc");
922 let result = src.open(&config).await;
923 assert!(result.is_err());
924 let err = result.unwrap_err().to_string();
925 assert!(
926 err.contains("postgres-cdc"),
927 "error should mention feature flag: {err}"
928 );
929 }
930
931 #[tokio::test]
932 async fn test_close() {
933 let mut src = running_source();
934 src.inject_event(ChangeEvent {
935 table: "t".to_string(),
936 op: CdcOperation::Insert,
937 lsn: Lsn::ZERO,
938 ts_ms: 0,
939 before: None,
940 after: Some("{}".to_string()),
941 });
942
943 src.close().await.unwrap();
944 assert_eq!(src.state, ConnectorState::Closed);
945 assert_eq!(src.event_buffer.len(), 0);
946 }
947
948 #[test]
951 fn test_checkpoint() {
952 let mut src = default_source();
953 src.confirmed_flush_lsn = "1/ABCD".parse().unwrap();
954 src.write_lsn = "1/ABCE".parse().unwrap();
955
956 let cp = src.checkpoint();
957 assert_eq!(cp.get_offset("lsn"), Some("1/ABCD"));
958 assert_eq!(cp.get_offset("write_lsn"), Some("1/ABCE"));
959 assert_eq!(cp.get_metadata("slot_name"), Some("laminar_slot"));
960 }
961
962 #[tokio::test]
963 async fn test_restore() {
964 let mut src = default_source();
965 let mut cp = SourceCheckpoint::new(1);
966 cp.set_offset("lsn", "2/FF00");
967 cp.set_offset("write_lsn", "2/FF10");
968
969 src.restore(&cp).await.unwrap();
970 assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
971 assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
972 }
973
974 #[tokio::test]
975 async fn test_restore_invalid_lsn() {
976 let mut src = default_source();
977 let mut cp = SourceCheckpoint::new(1);
978 cp.set_offset("lsn", "not_an_lsn");
979
980 assert!(src.restore(&cp).await.is_err());
981 }
982
983 #[tokio::test]
986 async fn test_poll_empty() {
987 let mut src = running_source();
988 let result = src.poll_batch(100).await.unwrap();
989 assert!(result.is_none());
990 }
991
992 #[tokio::test]
993 async fn test_poll_not_running() {
994 let mut src = default_source();
995 assert!(src.poll_batch(100).await.is_err());
996 }
997
998 #[tokio::test]
1001 async fn test_process_insert_transaction() {
1002 let mut src = running_source();
1003
1004 let rel_msg = PostgresCdcSource::build_relation_message(
1005 16384,
1006 "public",
1007 "users",
1008 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1009 );
1010 let begin_msg = PostgresCdcSource::build_begin_message(0x100, 0, 1);
1011 let insert_msg =
1012 PostgresCdcSource::build_insert_message(16384, &[Some("42"), Some("Alice")]);
1013 let commit_msg = PostgresCdcSource::build_commit_message(0x100, 0x200, 0);
1014
1015 src.enqueue_wal_data(rel_msg);
1016 src.enqueue_wal_data(begin_msg);
1017 src.enqueue_wal_data(insert_msg);
1018 src.enqueue_wal_data(commit_msg);
1019
1020 let batch = src.poll_batch(100).await.unwrap().unwrap();
1021 assert_eq!(batch.num_rows(), 1);
1022
1023 let records = &batch.records;
1024 let table_col = records.column(0).as_string::<i32>();
1025 assert_eq!(table_col.value(0), "users");
1026
1027 let op_col = records.column(1).as_string::<i32>();
1028 assert_eq!(op_col.value(0), "I");
1029
1030 let after_col = records.column(5).as_string::<i32>();
1031 let after_json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1032 assert_eq!(after_json["id"], "42");
1033 assert_eq!(after_json["name"], "Alice");
1034
1035 assert!(records.column(4).is_null(0));
1037 }
1038
1039 #[tokio::test]
1042 async fn test_multi_event_transaction() {
1043 let mut src = running_source();
1044
1045 let rel_msg = PostgresCdcSource::build_relation_message(
1047 16384,
1048 "public",
1049 "users",
1050 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1051 );
1052 src.enqueue_wal_data(rel_msg);
1053
1054 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x300, 0, 2));
1056 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1057 16384,
1058 &[Some("1"), Some("Alice")],
1059 ));
1060 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1061 16384,
1062 &[Some("2"), Some("Bob")],
1063 ));
1064 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1065 16384,
1066 &[Some("3"), Some("Charlie")],
1067 ));
1068 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x300, 0x400, 0));
1069
1070 let batch = src.poll_batch(100).await.unwrap().unwrap();
1071 assert_eq!(batch.num_rows(), 3);
1072 }
1073
1074 #[tokio::test]
1077 async fn test_events_buffered_until_commit() {
1078 let mut src = running_source();
1079
1080 let rel_msg = PostgresCdcSource::build_relation_message(
1081 16384,
1082 "public",
1083 "users",
1084 &[(1, "id", INT8_OID, -1)],
1085 );
1086 src.enqueue_wal_data(rel_msg);
1087
1088 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1090 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1091
1092 let result = src.poll_batch(100).await.unwrap();
1094 assert!(result.is_none());
1095
1096 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1098
1099 let batch = src.poll_batch(100).await.unwrap().unwrap();
1100 assert_eq!(batch.num_rows(), 1);
1101 }
1102
1103 #[tokio::test]
1106 async fn test_process_update() {
1107 let mut src = running_source();
1108
1109 let rel_msg = PostgresCdcSource::build_relation_message(
1110 16384,
1111 "public",
1112 "users",
1113 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1114 );
1115 src.enqueue_wal_data(rel_msg);
1116
1117 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1118 src.enqueue_wal_data(PostgresCdcSource::build_update_message(
1119 16384,
1120 &[Some("42"), Some("Alice")],
1121 &[Some("42"), Some("Bob")],
1122 ));
1123 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1124
1125 let batch = src.poll_batch(100).await.unwrap().unwrap();
1126 assert_eq!(batch.num_rows(), 1);
1127
1128 let op_col = batch.records.column(1).as_string::<i32>();
1129 assert_eq!(op_col.value(0), "U");
1130
1131 assert!(!batch.records.column(4).is_null(0)); assert!(!batch.records.column(5).is_null(0)); }
1135
1136 #[tokio::test]
1139 async fn test_process_delete() {
1140 let mut src = running_source();
1141
1142 let rel_msg = PostgresCdcSource::build_relation_message(
1143 16384,
1144 "public",
1145 "users",
1146 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1147 );
1148 src.enqueue_wal_data(rel_msg);
1149
1150 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1151 src.enqueue_wal_data(PostgresCdcSource::build_delete_message(
1152 16384,
1153 &[Some("42")],
1154 ));
1155 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1156
1157 let batch = src.poll_batch(100).await.unwrap().unwrap();
1158 let op_col = batch.records.column(1).as_string::<i32>();
1159 assert_eq!(op_col.value(0), "D");
1160
1161 assert!(!batch.records.column(4).is_null(0));
1163 assert!(batch.records.column(5).is_null(0));
1164 }
1165
1166 #[tokio::test]
1169 async fn test_table_exclude_filter() {
1170 let mut config = PostgresCdcConfig::default();
1171 config.table_exclude = vec!["users".to_string()];
1172 let mut src = PostgresCdcSource::new(config);
1173 src.state = ConnectorState::Running;
1174
1175 let rel_msg = PostgresCdcSource::build_relation_message(
1176 16384,
1177 "public",
1178 "users",
1179 &[(1, "id", INT8_OID, -1)],
1180 );
1181 src.enqueue_wal_data(rel_msg);
1182
1183 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1184 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1185 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1186
1187 let result = src.poll_batch(100).await.unwrap();
1188 assert!(result.is_none()); }
1190
1191 #[tokio::test]
1194 async fn test_max_poll_records() {
1195 let mut src = running_source();
1196
1197 for i in 0..5 {
1199 src.inject_event(ChangeEvent {
1200 table: "t".to_string(),
1201 op: CdcOperation::Insert,
1202 lsn: Lsn::new(i as u64),
1203 ts_ms: 0,
1204 before: None,
1205 after: Some(format!("{{\"id\":\"{i}\"}}")),
1206 });
1207 }
1208
1209 let batch = src.poll_batch(2).await.unwrap().unwrap();
1211 assert_eq!(batch.num_rows(), 2);
1212 assert_eq!(src.buffered_events(), 3);
1213
1214 let batch = src.poll_batch(100).await.unwrap().unwrap();
1216 assert_eq!(batch.num_rows(), 3);
1217 assert_eq!(src.buffered_events(), 0);
1218 }
1219
1220 #[tokio::test]
1223 async fn test_partition_info() {
1224 let mut src = running_source();
1225 src.write_lsn = "1/ABCD".parse().unwrap();
1226
1227 src.inject_event(ChangeEvent {
1228 table: "t".to_string(),
1229 op: CdcOperation::Insert,
1230 lsn: Lsn::ZERO,
1231 ts_ms: 0,
1232 before: None,
1233 after: Some("{}".to_string()),
1234 });
1235
1236 let batch = src.poll_batch(100).await.unwrap().unwrap();
1237 let partition = batch.partition.unwrap();
1238 assert_eq!(partition.id, "laminar_slot");
1239 assert_eq!(partition.offset, "1/ABCD");
1240 }
1241
1242 #[test]
1245 fn test_health_check_healthy() {
1246 let src = running_source();
1247 assert!(src.health_check().is_healthy());
1248 }
1249
1250 #[test]
1251 fn test_health_check_degraded() {
1252 let mut src = running_source();
1253 src.write_lsn = Lsn::new(200_000_000);
1254 src.confirmed_flush_lsn = Lsn::ZERO;
1255 assert!(matches!(src.health_check(), HealthStatus::Degraded(_)));
1256 }
1257
1258 #[test]
1259 fn test_health_check_unknown_when_created() {
1260 let src = default_source();
1261 assert!(matches!(src.health_check(), HealthStatus::Unknown));
1262 }
1263
1264 #[tokio::test]
1267 async fn test_metrics_after_processing() {
1268 let mut src = running_source();
1269
1270 let rel_msg = PostgresCdcSource::build_relation_message(
1271 16384,
1272 "public",
1273 "users",
1274 &[(1, "id", INT8_OID, -1)],
1275 );
1276 src.enqueue_wal_data(rel_msg);
1277
1278 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1279 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1280 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("2")]));
1281 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1282
1283 let _ = src.poll_batch(100).await.unwrap();
1284
1285 let metrics = src.metrics();
1286 assert_eq!(metrics.records_total, 2); }
1288
1289 #[test]
1292 fn test_replication_lag() {
1293 let mut src = default_source();
1294 src.write_lsn = Lsn::new(1000);
1295 src.confirmed_flush_lsn = Lsn::new(500);
1296 assert_eq!(src.replication_lag_bytes(), 500);
1297 }
1298
1299 #[tokio::test]
1302 async fn test_unknown_relation_error() {
1303 let mut src = running_source();
1304
1305 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1307 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(99999, &[Some("1")]));
1308
1309 let result = src.poll_batch(100).await;
1310 assert!(result.is_err());
1311 }
1312
1313 #[tokio::test]
1316 async fn test_multi_table_transaction() {
1317 let mut src = running_source();
1318
1319 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1321 100,
1322 "public",
1323 "users",
1324 &[(1, "id", INT4_OID, -1)],
1325 ));
1326 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1327 200,
1328 "public",
1329 "orders",
1330 &[(1, "order_id", INT4_OID, -1)],
1331 ));
1332
1333 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x500, 0, 5));
1334 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1335 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1336 200,
1337 &[Some("1001")],
1338 ));
1339 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x500, 0x600, 0));
1340
1341 let batch = src.poll_batch(100).await.unwrap().unwrap();
1342 assert_eq!(batch.num_rows(), 2);
1343
1344 let table_col = batch.records.column(0).as_string::<i32>();
1345 assert_eq!(table_col.value(0), "users");
1346 assert_eq!(table_col.value(1), "orders");
1347 }
1348
1349 #[tokio::test]
1352 async fn test_schema_change_mid_stream() {
1353 let mut src = running_source();
1354
1355 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1357 100,
1358 "public",
1359 "users",
1360 &[(1, "id", INT4_OID, -1)],
1361 ));
1362
1363 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1364 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1365 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1366
1367 let batch1 = src.poll_batch(100).await.unwrap().unwrap();
1368 assert_eq!(batch1.num_rows(), 1);
1369
1370 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1372 100,
1373 "public",
1374 "users",
1375 &[(1, "id", INT4_OID, -1), (0, "email", TEXT_OID, -1)],
1376 ));
1377
1378 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x200, 0, 2));
1379 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1380 100,
1381 &[Some("2"), Some("alice@example.com")],
1382 ));
1383 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x200, 0x300, 0));
1384
1385 let batch2 = src.poll_batch(100).await.unwrap().unwrap();
1386 assert_eq!(batch2.num_rows(), 1);
1387
1388 let after_col = batch2.records.column(5).as_string::<i32>();
1390 let json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1391 assert_eq!(json["email"], "alice@example.com");
1392 }
1393
1394 #[tokio::test]
1397 async fn test_write_lsn_advances() {
1398 let mut src = running_source();
1399
1400 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1401 100,
1402 "public",
1403 "t",
1404 &[(1, "id", INT4_OID, -1)],
1405 ));
1406
1407 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1408 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1409 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1410
1411 let _ = src.poll_batch(100).await;
1412 assert_eq!(src.write_lsn().as_u64(), 0x500);
1413 }
1414}