1use arrow_array::RecordBatch;
13use arrow_schema::SchemaRef;
14use async_trait::async_trait;
15use std::collections::VecDeque;
16use std::sync::Arc;
17use tokio::sync::Notify;
18
19use crate::checkpoint::SourceCheckpoint;
20use crate::config::{ConnectorConfig, ConnectorState};
21use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
22use crate::error::ConnectorError;
23use crate::health::HealthStatus;
24use crate::metrics::ConnectorMetrics;
25
26use super::changelog::{events_to_record_batch, tuple_to_json, CdcOperation, ChangeEvent};
27use super::config::PostgresCdcConfig;
28use super::decoder::{decode_message, WalMessage};
29use super::lsn::Lsn;
30use super::metrics::PostgresCdcMetrics;
31use super::schema::{cdc_envelope_schema, RelationCache, RelationInfo};
32pub struct PostgresCdcSource {
49 config: PostgresCdcConfig,
51
52 state: ConnectorState,
54
55 schema: SchemaRef,
57
58 metrics: Arc<PostgresCdcMetrics>,
60
61 relation_cache: RelationCache,
63
64 event_buffer: VecDeque<ChangeEvent>,
66
67 current_txn: Option<TransactionState>,
69
70 confirmed_flush_lsn: Lsn,
72
73 write_lsn: Lsn,
75
76 polled_lsn: Lsn,
80
81 pending_messages: VecDeque<Vec<u8>>,
83
84 data_ready: Arc<Notify>,
86
87 #[cfg(feature = "postgres-cdc")]
89 connection_handle: Option<tokio::task::JoinHandle<()>>,
90
91 #[cfg(feature = "postgres-cdc")]
93 wal_rx: Option<WalPayloadRx>,
94
95 #[cfg(feature = "postgres-cdc")]
97 reader_handle: Option<tokio::task::JoinHandle<()>>,
98
99 #[cfg(feature = "postgres-cdc")]
101 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
102
103 #[cfg(feature = "postgres-cdc")]
107 confirmed_lsn_tx: Option<tokio::sync::watch::Sender<u64>>,
108}
109
110#[derive(Debug, Clone)]
112struct TransactionState {
113 final_lsn: Lsn,
115 commit_ts_ms: i64,
117 events: Vec<ChangeEvent>,
119}
120
121#[cfg(feature = "postgres-cdc")]
123type WalPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<WalPayload>>;
124
125#[allow(dead_code)] enum WalPayload {
128 Begin {
129 final_lsn: u64,
130 commit_ts_us: i64,
131 xid: u32,
132 },
133 Commit {
134 end_lsn: u64,
135 commit_ts_us: i64,
136 lsn: u64,
137 },
138 XLogData {
139 wal_end: u64,
140 data: Vec<u8>,
141 },
142 KeepAlive {
143 wal_end: u64,
144 },
145 Error(String),
147}
148
149impl PostgresCdcSource {
150 #[must_use]
152 pub fn new(config: PostgresCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
153 Self {
154 config,
155 state: ConnectorState::Created,
156 schema: cdc_envelope_schema(),
157 metrics: Arc::new(PostgresCdcMetrics::new(registry)),
158 relation_cache: RelationCache::new(),
159 event_buffer: VecDeque::new(),
160 current_txn: None,
161 confirmed_flush_lsn: Lsn::ZERO,
162 write_lsn: Lsn::ZERO,
163 polled_lsn: Lsn::ZERO,
164 pending_messages: VecDeque::new(),
165 data_ready: Arc::new(Notify::new()),
166 #[cfg(feature = "postgres-cdc")]
167 connection_handle: None,
168 #[cfg(feature = "postgres-cdc")]
169 wal_rx: None,
170 #[cfg(feature = "postgres-cdc")]
171 reader_handle: None,
172 #[cfg(feature = "postgres-cdc")]
173 reader_shutdown: None,
174 #[cfg(feature = "postgres-cdc")]
175 confirmed_lsn_tx: None,
176 }
177 }
178
179 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
185 let pg_config = PostgresCdcConfig::from_config(config)?;
186 Ok(Self::new(pg_config, None))
187 }
188
189 #[must_use]
191 pub fn config(&self) -> &PostgresCdcConfig {
192 &self.config
193 }
194
195 #[must_use]
197 pub fn confirmed_flush_lsn(&self) -> Lsn {
198 self.confirmed_flush_lsn
199 }
200
201 #[must_use]
203 pub fn write_lsn(&self) -> Lsn {
204 self.write_lsn
205 }
206
207 #[must_use]
209 pub fn replication_lag_bytes(&self) -> u64 {
210 self.write_lsn.diff(self.confirmed_flush_lsn)
211 }
212
213 #[must_use]
215 pub fn relation_cache(&self) -> &RelationCache {
216 &self.relation_cache
217 }
218
219 #[must_use]
221 pub fn buffered_events(&self) -> usize {
222 self.event_buffer.len()
223 }
224
225 pub fn enqueue_wal_data(&mut self, data: Vec<u8>) {
230 self.pending_messages.push_back(data);
231 }
232
233 pub fn process_pending_messages(&mut self) -> Result<(), ConnectorError> {
239 while let Some(data) = self.pending_messages.pop_front() {
240 self.metrics.record_bytes(data.len() as u64);
241 let msg = decode_message(&data)
242 .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
243 self.process_wal_message(msg)?;
244 }
245 Ok(())
246 }
247
248 fn process_wal_message(&mut self, msg: WalMessage) -> Result<(), ConnectorError> {
250 match msg {
251 WalMessage::Begin(begin) => {
252 self.current_txn = Some(TransactionState {
253 final_lsn: begin.final_lsn,
254 commit_ts_ms: begin.commit_ts_ms,
255 events: Vec::new(),
256 });
257 }
258 WalMessage::Commit(commit) => {
259 if let Some(txn) = self.current_txn.take() {
260 self.event_buffer.extend(txn.events);
261 self.write_lsn = commit.end_lsn;
262 self.metrics.record_transaction();
263 self.metrics
264 .set_replication_lag_bytes(self.replication_lag_bytes());
265 }
266 }
267 WalMessage::Relation(rel) => {
268 let info = RelationInfo {
269 relation_id: rel.relation_id,
270 namespace: rel.namespace,
271 name: rel.name,
272 replica_identity: rel.replica_identity as char,
273 columns: rel.columns,
274 };
275 self.relation_cache.insert(info);
276 }
277 WalMessage::Insert(ins) => {
278 self.process_insert(ins.relation_id, &ins.new_tuple)?;
279 }
280 WalMessage::Update(upd) => {
281 self.process_update(upd.relation_id, upd.old_tuple.as_ref(), &upd.new_tuple)?;
282 }
283 WalMessage::Delete(del) => {
284 self.process_delete(del.relation_id, &del.old_tuple)?;
285 }
286 WalMessage::Truncate(trunc) => {
287 let table_names: Vec<String> = trunc
288 .relation_ids
289 .iter()
290 .map(|id| {
291 self.relation_cache
292 .get(*id)
293 .map_or_else(|| format!("oid:{id}"), RelationInfo::full_name)
294 })
295 .collect();
296 return Err(ConnectorError::ReadError(format!(
297 "TRUNCATE received on table(s): {}. \
298 Cannot produce retraction events — restart the pipeline with a fresh snapshot.",
299 table_names.join(", ")
300 )));
301 }
302 WalMessage::Origin(_) | WalMessage::Type(_) => {
303 }
306 }
307 Ok(())
308 }
309
310 fn process_insert(
311 &mut self,
312 relation_id: u32,
313 new_tuple: &super::decoder::TupleData,
314 ) -> Result<(), ConnectorError> {
315 let relation = self.require_relation(relation_id)?;
316 let table = relation.full_name();
317
318 if !self.config.should_include_table(&table) {
319 return Ok(());
320 }
321
322 let after_json = tuple_to_json(new_tuple, relation);
323 let (lsn, ts_ms) = self.current_txn_context();
324
325 let event = ChangeEvent {
326 table,
327 op: CdcOperation::Insert,
328 lsn,
329 ts_ms,
330 before: None,
331 after: Some(after_json),
332 };
333
334 self.push_event(event);
335 self.metrics.record_insert();
336 Ok(())
337 }
338
339 fn process_update(
340 &mut self,
341 relation_id: u32,
342 old_tuple: Option<&super::decoder::TupleData>,
343 new_tuple: &super::decoder::TupleData,
344 ) -> Result<(), ConnectorError> {
345 let relation = self.require_relation(relation_id)?;
346 let table = relation.full_name();
347
348 if !self.config.should_include_table(&table) {
349 return Ok(());
350 }
351
352 let before_json = old_tuple.map(|t| tuple_to_json(t, relation));
353 let after_json = tuple_to_json(new_tuple, relation);
354 let (lsn, ts_ms) = self.current_txn_context();
355
356 let event = ChangeEvent {
357 table,
358 op: CdcOperation::Update,
359 lsn,
360 ts_ms,
361 before: before_json,
362 after: Some(after_json),
363 };
364
365 self.push_event(event);
366 self.metrics.record_update();
367 Ok(())
368 }
369
370 fn process_delete(
371 &mut self,
372 relation_id: u32,
373 old_tuple: &super::decoder::TupleData,
374 ) -> Result<(), ConnectorError> {
375 let relation = self.require_relation(relation_id)?;
376 let table = relation.full_name();
377
378 if !self.config.should_include_table(&table) {
379 return Ok(());
380 }
381
382 let before_json = tuple_to_json(old_tuple, relation);
383 let (lsn, ts_ms) = self.current_txn_context();
384
385 let event = ChangeEvent {
386 table,
387 op: CdcOperation::Delete,
388 lsn,
389 ts_ms,
390 before: Some(before_json),
391 after: None,
392 };
393
394 self.push_event(event);
395 self.metrics.record_delete();
396 Ok(())
397 }
398
399 fn require_relation(&self, relation_id: u32) -> Result<&RelationInfo, ConnectorError> {
405 self.relation_cache.get(relation_id).ok_or_else(|| {
406 ConnectorError::ReadError(format!(
407 "unknown relation ID {relation_id} (no Relation message received yet)"
408 ))
409 })
410 }
411
412 fn current_txn_context(&self) -> (Lsn, i64) {
413 match &self.current_txn {
414 Some(txn) => (txn.final_lsn, txn.commit_ts_ms),
415 None => (self.write_lsn, 0),
416 }
417 }
418
419 fn push_event(&mut self, event: ChangeEvent) {
420 if let Some(txn) = &mut self.current_txn {
421 txn.events.push(event);
422 } else {
423 self.event_buffer.push_back(event);
424 }
425 }
426
427 #[cfg(feature = "postgres-cdc")]
429 fn process_wal_payload(&mut self, payload: WalPayload) -> Result<(), ConnectorError> {
430 use super::decoder::pg_timestamp_to_unix_ms;
431
432 match payload {
433 WalPayload::Begin {
434 final_lsn,
435 commit_ts_us,
436 xid,
437 } => {
438 let begin = super::decoder::BeginMessage {
439 final_lsn: Lsn::new(final_lsn),
440 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
441 xid,
442 };
443 self.process_wal_message(WalMessage::Begin(begin))
444 }
445 WalPayload::Commit {
446 end_lsn,
447 commit_ts_us,
448 lsn,
449 } => {
450 let commit = super::decoder::CommitMessage {
451 flags: 0,
452 commit_lsn: Lsn::new(lsn),
453 end_lsn: Lsn::new(end_lsn),
454 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
455 };
456 self.process_wal_message(WalMessage::Commit(commit))
457 }
458 WalPayload::XLogData { wal_end, data } => {
459 if !data.is_empty() && (data[0] == b'B' || data[0] == b'C') {
462 self.write_lsn = Lsn::new(wal_end);
463 return Ok(());
464 }
465 let msg = decode_message(&data)
466 .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
467 self.process_wal_message(msg)?;
468 self.write_lsn = Lsn::new(wal_end);
469 Ok(())
470 }
471 WalPayload::KeepAlive { wal_end } => {
472 self.write_lsn = Lsn::new(wal_end);
473 Ok(())
474 }
475 WalPayload::Error(msg) => Err(ConnectorError::ReadError(msg)),
476 }
477 }
478
479 fn drain_events(&mut self, max: usize) -> Result<Option<RecordBatch>, ConnectorError> {
481 if self.event_buffer.is_empty() {
482 return Ok(None);
483 }
484
485 let count = max.min(self.event_buffer.len());
486 let events: Vec<ChangeEvent> = self.event_buffer.drain(..count).collect();
487
488 let batch = events_to_record_batch(&events)
489 .map_err(|e| ConnectorError::Internal(format!("Arrow batch build: {e}")))?;
490
491 self.metrics.record_batch();
492 Ok(Some(batch))
493 }
494}
495
496#[async_trait]
497#[allow(clippy::too_many_lines)]
498impl SourceConnector for PostgresCdcSource {
499 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
500 self.state = ConnectorState::Initializing;
501
502 if !config.properties().is_empty() {
504 self.config = PostgresCdcConfig::from_config(config)?;
505 }
506
507 if let Some(lsn) = self.config.start_lsn {
509 self.confirmed_flush_lsn = lsn;
510 self.write_lsn = lsn;
511 self.polled_lsn = lsn;
512 }
513
514 #[cfg(not(feature = "postgres-cdc"))]
517 {
518 return Err(ConnectorError::ConfigurationError(
519 "PostgreSQL CDC source requires the `postgres-cdc` feature flag. \
520 Rebuild with `--features postgres-cdc` to enable."
521 .to_string(),
522 ));
523 }
524
525 #[cfg(all(feature = "postgres-cdc", not(test)))]
526 {
527 use super::postgres_io;
528
529 let (client, handle) = postgres_io::connect(&self.config).await?;
531 self.connection_handle = Some(handle);
532
533 let slot_lsn = postgres_io::ensure_replication_slot(
535 &client,
536 &self.config.slot_name,
537 &self.config.output_plugin,
538 )
539 .await?;
540
541 if self.config.start_lsn.is_none() {
543 if let Some(lsn) = slot_lsn {
544 self.confirmed_flush_lsn = lsn;
545 self.write_lsn = lsn;
546 self.polled_lsn = lsn;
547 }
548 }
549
550 let mut repl_config = postgres_io::build_replication_config(&self.config);
552 if self.confirmed_flush_lsn != Lsn::ZERO {
554 repl_config.start_lsn =
555 pgwire_replication::Lsn::from_u64(self.confirmed_flush_lsn.as_u64());
556 }
557
558 let repl_client = pgwire_replication::ReplicationClient::connect(repl_config)
559 .await
560 .map_err(|e| {
561 ConnectorError::ConnectionFailed(format!("pgwire-replication connect: {e}"))
562 })?;
563
564 let (wal_tx, wal_rx) = crossfire::mpsc::bounded_async::<WalPayload>(4096);
566 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
567 let (confirmed_lsn_tx, mut confirmed_lsn_rx) =
568 tokio::sync::watch::channel(self.confirmed_flush_lsn.as_u64());
569 let data_ready = Arc::clone(&self.data_ready);
570 let reader_config = self.config.clone();
571
572 let reader_handle = tokio::spawn(async move {
573 const MAX_FAILURES: u32 = 10;
574 let mut repl_client = repl_client;
575 let mut consecutive_failures: u32 = 0;
576
577 'reconnect: loop {
578 'recv: loop {
580 if confirmed_lsn_rx.has_changed().unwrap_or(false) {
584 let confirmed = *confirmed_lsn_rx.borrow_and_update();
585 if confirmed > 0 {
586 repl_client.update_applied_lsn(pgwire_replication::Lsn::from_u64(
587 confirmed,
588 ));
589 }
590 }
591
592 tokio::select! {
593 biased;
594 _ = shutdown_rx.changed() => break 'reconnect,
595 event = repl_client.recv() => {
596 match event {
597 Ok(Some(event)) => {
598 consecutive_failures = 0;
599 let payload = match &event {
600 pgwire_replication::ReplicationEvent::Begin {
601 final_lsn,
602 xid,
603 commit_time_micros,
604 } => Some(WalPayload::Begin {
605 final_lsn: final_lsn.as_u64(),
606 commit_ts_us: *commit_time_micros,
607 xid: *xid,
608 }),
609 pgwire_replication::ReplicationEvent::Commit {
610 end_lsn,
611 commit_time_micros,
612 lsn,
613 } => Some(WalPayload::Commit {
614 end_lsn: end_lsn.as_u64(),
615 commit_ts_us: *commit_time_micros,
616 lsn: lsn.as_u64(),
617 }),
618 pgwire_replication::ReplicationEvent::XLogData {
619 wal_end,
620 data,
621 ..
622 } => Some(WalPayload::XLogData {
623 wal_end: wal_end.as_u64(),
624 data: data.to_vec(),
625 }),
626 pgwire_replication::ReplicationEvent::KeepAlive {
627 wal_end,
628 ..
629 } => Some(WalPayload::KeepAlive {
630 wal_end: wal_end.as_u64(),
631 }),
632 _ => None,
633 };
634 if let Some(p) = payload {
635 if wal_tx.send(p).await.is_err() {
636 break 'reconnect;
637 }
638 data_ready.notify_one();
639 }
640 }
641 Ok(None) => break 'recv,
642 Err(e) => {
643 tracing::error!(error = %e, "WAL reader error");
644 break 'recv;
645 }
646 }
647 }
648 }
649 }
650
651 let _ = repl_client.shutdown().await;
653 consecutive_failures += 1;
654
655 if consecutive_failures >= MAX_FAILURES {
656 tracing::error!(
657 failures = consecutive_failures,
658 "WAL reader exhausted reconnect attempts"
659 );
660 let _ = wal_tx.send(WalPayload::Error(format!(
661 "WAL reader failed after {consecutive_failures} consecutive reconnect attempts"
662 ))).await;
663 data_ready.notify_one();
664 break 'reconnect;
665 }
666
667 let backoff_secs = (1u64 << consecutive_failures).min(30);
669 tracing::warn!(
670 attempt = consecutive_failures,
671 backoff_secs,
672 "WAL reader reconnecting"
673 );
674 tokio::select! {
675 biased;
676 _ = shutdown_rx.changed() => break 'reconnect,
677 () = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
678 }
679
680 let resume_lsn = *confirmed_lsn_rx.borrow();
682 let mut new_config = postgres_io::build_replication_config(&reader_config);
683 if resume_lsn > 0 {
684 new_config.start_lsn = pgwire_replication::Lsn::from_u64(resume_lsn);
685 }
686
687 match pgwire_replication::ReplicationClient::connect(new_config).await {
688 Ok(new_client) => {
689 tracing::info!("WAL reader reconnected");
690 repl_client = new_client;
691 }
692 Err(e) => {
693 tracing::error!(error = %e, "WAL reader reconnect failed");
694 }
695 }
696 }
697 });
698
699 self.wal_rx = Some(wal_rx);
700 self.reader_handle = Some(reader_handle);
701 self.reader_shutdown = Some(shutdown_tx);
702 self.confirmed_lsn_tx = Some(confirmed_lsn_tx);
703 }
704
705 self.state = ConnectorState::Running;
706 Ok(())
707 }
708
709 async fn poll_batch(
710 &mut self,
711 max_records: usize,
712 ) -> Result<Option<SourceBatch>, ConnectorError> {
713 if self.state != ConnectorState::Running {
714 return Err(ConnectorError::InvalidState {
715 expected: "Running".to_string(),
716 actual: self.state.to_string(),
717 });
718 }
719
720 #[cfg(feature = "postgres-cdc")]
730 {
731 let high_watermark = self.config.backpressure_high_watermark();
732 let mut payloads = Vec::new();
733 let mut reader_closed = false;
734
735 if self.event_buffer.len() < high_watermark {
736 if let Some(ref mut rx) = self.wal_rx {
737 while payloads.len() + self.event_buffer.len() < max_records
738 && self.event_buffer.len() + payloads.len() < high_watermark
739 {
740 match rx.try_recv() {
741 Ok(payload) => payloads.push(payload),
742 Err(crossfire::TryRecvError::Empty) => break,
743 Err(crossfire::TryRecvError::Disconnected) => {
744 reader_closed = true;
745 break;
746 }
747 }
748 }
749 }
750 } else {
751 tracing::debug!(
752 buffered = self.event_buffer.len(),
753 high_watermark,
754 "CDC backpressure active — pausing WAL reader drain"
755 );
756 }
757
758 for payload in payloads {
759 self.process_wal_payload(payload)?;
760 }
761 if reader_closed && self.event_buffer.is_empty() {
762 self.state = ConnectorState::Failed;
763 return Err(ConnectorError::ReadError(
764 "WAL reader task terminated unexpectedly — replication stream lost".to_string(),
765 ));
766 }
767 }
768
769 self.process_pending_messages()?;
771
772 match self.drain_events(max_records)? {
779 Some(batch) => {
780 if self.event_buffer.is_empty() {
783 self.polled_lsn = self.write_lsn;
784 }
785 self.metrics
786 .set_confirmed_flush_lsn(self.confirmed_flush_lsn.as_u64());
787 self.metrics
788 .set_replication_lag_bytes(self.replication_lag_bytes());
789
790 let lsn_str = self.write_lsn.to_string();
791 let partition = PartitionInfo::new(&self.config.slot_name, lsn_str);
792 Ok(Some(SourceBatch::with_partition(batch, partition)))
793 }
794 None => Ok(None),
795 }
796 }
797
798 fn schema(&self) -> SchemaRef {
799 Arc::clone(&self.schema)
800 }
801
802 fn checkpoint(&self) -> SourceCheckpoint {
803 let mut cp = SourceCheckpoint::new(0);
804 cp.set_offset("lsn", self.polled_lsn.to_string());
808 cp.set_offset("write_lsn", self.write_lsn.to_string());
809 cp.set_metadata("slot_name", &self.config.slot_name);
810 cp.set_metadata("publication", &self.config.publication);
811
812 #[cfg(feature = "postgres-cdc")]
815 if let Some(ref tx) = self.confirmed_lsn_tx {
816 let _ = tx.send(self.polled_lsn.as_u64());
817 }
818
819 cp
820 }
821
822 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
823 if let Some(lsn_str) = checkpoint.get_offset("lsn") {
824 let lsn: Lsn = lsn_str
825 .parse()
826 .map_err(|e| ConnectorError::Internal(format!("invalid LSN in checkpoint: {e}")))?;
827 self.confirmed_flush_lsn = lsn;
828 self.polled_lsn = lsn;
829 self.metrics.set_confirmed_flush_lsn(lsn.as_u64());
830 }
831 if let Some(write_lsn_str) = checkpoint.get_offset("write_lsn") {
832 if let Ok(lsn) = write_lsn_str.parse::<Lsn>() {
833 self.write_lsn = lsn;
834 }
835 }
836 Ok(())
837 }
838
839 fn health_check(&self) -> HealthStatus {
840 match self.state {
841 ConnectorState::Running => {
842 let lag = self.replication_lag_bytes();
843 if lag > 100_000_000 {
844 HealthStatus::Degraded(format!("replication lag: {lag} bytes"))
846 } else {
847 HealthStatus::Healthy
848 }
849 }
850 ConnectorState::Failed => HealthStatus::Unhealthy("connector failed".to_string()),
851 _ => HealthStatus::Unknown,
852 }
853 }
854
855 fn metrics(&self) -> ConnectorMetrics {
856 self.metrics.to_connector_metrics()
857 }
858
859 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
860 Some(Arc::clone(&self.data_ready))
861 }
862
863 async fn close(&mut self) -> Result<(), ConnectorError> {
864 #[cfg(feature = "postgres-cdc")]
866 {
867 if let Some(tx) = self.reader_shutdown.take() {
868 let _ = tx.send(true);
869 }
870 if let Some(handle) = self.reader_handle.take() {
871 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
872 }
873 self.wal_rx = None;
874 self.confirmed_lsn_tx = None;
875 }
876
877 #[cfg(feature = "postgres-cdc")]
880 if let Some(handle) = self.connection_handle.take() {
881 let abort = handle.abort_handle();
882 match tokio::time::timeout(std::time::Duration::from_secs(2), handle).await {
883 Ok(Ok(())) => {}
884 Ok(Err(join_err)) => {
885 tracing::warn!(
886 error = %join_err,
887 "[postgres-cdc] control-plane task exited with error"
888 );
889 }
890 Err(_elapsed) => {
891 tracing::warn!(
892 "[postgres-cdc] control-plane task did not exit within 2s; aborting"
893 );
894 abort.abort();
895 }
896 }
897 }
898
899 self.state = ConnectorState::Closed;
900 self.event_buffer.clear();
901 self.pending_messages.clear();
902 Ok(())
903 }
904}
905
906#[cfg(test)]
909impl PostgresCdcSource {
910 fn inject_event(&mut self, event: ChangeEvent) {
912 self.event_buffer.push_back(event);
913 }
914
915 fn build_relation_message(
917 relation_id: u32,
918 namespace: &str,
919 name: &str,
920 columns: &[(u8, &str, u32, i32)], ) -> Vec<u8> {
922 let mut buf = vec![b'R'];
923 buf.extend_from_slice(&relation_id.to_be_bytes());
924 buf.extend_from_slice(namespace.as_bytes());
925 buf.push(0);
926 buf.extend_from_slice(name.as_bytes());
927 buf.push(0);
928 buf.push(b'd'); buf.extend_from_slice(&(columns.len() as i16).to_be_bytes());
930 for (flags, col_name, oid, modifier) in columns {
931 buf.push(*flags);
932 buf.extend_from_slice(col_name.as_bytes());
933 buf.push(0);
934 buf.extend_from_slice(&oid.to_be_bytes());
935 buf.extend_from_slice(&modifier.to_be_bytes());
936 }
937 buf
938 }
939
940 fn build_begin_message(final_lsn: u64, commit_ts_us: i64, xid: u32) -> Vec<u8> {
942 let mut buf = vec![b'B'];
943 buf.extend_from_slice(&final_lsn.to_be_bytes());
944 buf.extend_from_slice(&commit_ts_us.to_be_bytes());
945 buf.extend_from_slice(&xid.to_be_bytes());
946 buf
947 }
948
949 fn build_commit_message(commit_lsn: u64, end_lsn: u64, commit_ts_us: i64) -> Vec<u8> {
951 let mut buf = vec![b'C'];
952 buf.push(0); buf.extend_from_slice(&commit_lsn.to_be_bytes());
954 buf.extend_from_slice(&end_lsn.to_be_bytes());
955 buf.extend_from_slice(&commit_ts_us.to_be_bytes());
956 buf
957 }
958
959 fn build_insert_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
961 let mut buf = vec![b'I'];
962 buf.extend_from_slice(&relation_id.to_be_bytes());
963 buf.push(b'N');
964 buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
965 for val in values {
966 match val {
967 Some(s) => {
968 buf.push(b't');
969 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
970 buf.extend_from_slice(s.as_bytes());
971 }
972 None => buf.push(b'n'),
973 }
974 }
975 buf
976 }
977
978 fn build_delete_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
980 let mut buf = vec![b'D'];
981 buf.extend_from_slice(&relation_id.to_be_bytes());
982 buf.push(b'K'); buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
984 for val in values {
985 match val {
986 Some(s) => {
987 buf.push(b't');
988 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
989 buf.extend_from_slice(s.as_bytes());
990 }
991 None => buf.push(b'n'),
992 }
993 }
994 buf
995 }
996
997 fn build_truncate_message(relation_ids: &[u32], options: u8) -> Vec<u8> {
999 let mut buf = vec![b'T'];
1000 buf.extend_from_slice(&(relation_ids.len() as i32).to_be_bytes());
1001 buf.push(options);
1002 for id in relation_ids {
1003 buf.extend_from_slice(&id.to_be_bytes());
1004 }
1005 buf
1006 }
1007
1008 fn build_update_message(
1010 relation_id: u32,
1011 old_values: &[Option<&str>],
1012 new_values: &[Option<&str>],
1013 ) -> Vec<u8> {
1014 let mut buf = vec![b'U'];
1015 buf.extend_from_slice(&relation_id.to_be_bytes());
1016 buf.push(b'O');
1018 buf.extend_from_slice(&(old_values.len() as i16).to_be_bytes());
1019 for val in old_values {
1020 match val {
1021 Some(s) => {
1022 buf.push(b't');
1023 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1024 buf.extend_from_slice(s.as_bytes());
1025 }
1026 None => buf.push(b'n'),
1027 }
1028 }
1029 buf.push(b'N');
1031 buf.extend_from_slice(&(new_values.len() as i16).to_be_bytes());
1032 for val in new_values {
1033 match val {
1034 Some(s) => {
1035 buf.push(b't');
1036 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1037 buf.extend_from_slice(s.as_bytes());
1038 }
1039 None => buf.push(b'n'),
1040 }
1041 }
1042 buf
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use super::*;
1049 use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
1050 use arrow_array::cast::AsArray;
1051
1052 fn default_source() -> PostgresCdcSource {
1053 PostgresCdcSource::new(PostgresCdcConfig::default(), None)
1054 }
1055
1056 fn running_source() -> PostgresCdcSource {
1057 let mut src = default_source();
1058 src.state = ConnectorState::Running;
1059 src
1060 }
1061
1062 #[test]
1065 fn test_new_source() {
1066 let src = default_source();
1067 assert_eq!(src.state, ConnectorState::Created);
1068 assert!(src.confirmed_flush_lsn.is_zero());
1069 assert_eq!(src.event_buffer.len(), 0);
1070 assert_eq!(src.schema().fields().len(), 6);
1071 }
1072
1073 #[test]
1074 fn test_from_config() {
1075 let mut config = ConnectorConfig::new("postgres-cdc");
1076 config.set("host", "pg.local");
1077 config.set("database", "testdb");
1078 config.set("slot.name", "my_slot");
1079 config.set("publication", "my_pub");
1080
1081 let src = PostgresCdcSource::from_config(&config).unwrap();
1082 assert_eq!(src.config().host, "pg.local");
1083 assert_eq!(src.config().database, "testdb");
1084 }
1085
1086 #[test]
1087 fn test_from_config_invalid() {
1088 let config = ConnectorConfig::new("postgres-cdc");
1089 assert!(PostgresCdcSource::from_config(&config).is_err());
1090 }
1091
1092 #[cfg(not(feature = "postgres-cdc"))]
1096 #[tokio::test]
1097 async fn test_open_fails_without_feature() {
1098 let mut src = default_source();
1099 let config = ConnectorConfig::new("postgres-cdc");
1100 let result = src.open(&config).await;
1101 assert!(result.is_err());
1102 let err = result.unwrap_err().to_string();
1103 assert!(
1104 err.contains("postgres-cdc"),
1105 "error should mention feature flag: {err}"
1106 );
1107 }
1108
1109 #[tokio::test]
1110 async fn test_close() {
1111 let mut src = running_source();
1112 src.inject_event(ChangeEvent {
1113 table: "t".to_string(),
1114 op: CdcOperation::Insert,
1115 lsn: Lsn::ZERO,
1116 ts_ms: 0,
1117 before: None,
1118 after: Some("{}".to_string()),
1119 });
1120
1121 src.close().await.unwrap();
1122 assert_eq!(src.state, ConnectorState::Closed);
1123 assert_eq!(src.event_buffer.len(), 0);
1124 }
1125
1126 #[test]
1129 fn test_checkpoint() {
1130 let mut src = default_source();
1131 src.confirmed_flush_lsn = "1/ABCD".parse().unwrap();
1132 src.polled_lsn = "1/ABCD".parse().unwrap();
1133 src.write_lsn = "1/ABCE".parse().unwrap();
1134
1135 let cp = src.checkpoint();
1136 assert_eq!(cp.get_offset("lsn"), Some("1/ABCD"));
1137 assert_eq!(cp.get_offset("write_lsn"), Some("1/ABCE"));
1138 assert_eq!(cp.get_metadata("slot_name"), Some("laminar_slot"));
1139 }
1140
1141 #[tokio::test]
1142 async fn test_restore() {
1143 let mut src = default_source();
1144 let mut cp = SourceCheckpoint::new(1);
1145 cp.set_offset("lsn", "2/FF00");
1146 cp.set_offset("write_lsn", "2/FF10");
1147
1148 src.restore(&cp).await.unwrap();
1149 assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1150 assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1151 }
1152
1153 #[tokio::test]
1154 async fn test_restore_invalid_lsn() {
1155 let mut src = default_source();
1156 let mut cp = SourceCheckpoint::new(1);
1157 cp.set_offset("lsn", "not_an_lsn");
1158
1159 assert!(src.restore(&cp).await.is_err());
1160 }
1161
1162 #[tokio::test]
1165 async fn test_poll_empty() {
1166 let mut src = running_source();
1167 let result = src.poll_batch(100).await.unwrap();
1168 assert!(result.is_none());
1169 }
1170
1171 #[tokio::test]
1172 async fn test_poll_not_running() {
1173 let mut src = default_source();
1174 assert!(src.poll_batch(100).await.is_err());
1175 }
1176
1177 #[tokio::test]
1180 async fn test_process_insert_transaction() {
1181 let mut src = running_source();
1182
1183 let rel_msg = PostgresCdcSource::build_relation_message(
1184 16384,
1185 "public",
1186 "users",
1187 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1188 );
1189 let begin_msg = PostgresCdcSource::build_begin_message(0x100, 0, 1);
1190 let insert_msg =
1191 PostgresCdcSource::build_insert_message(16384, &[Some("42"), Some("Alice")]);
1192 let commit_msg = PostgresCdcSource::build_commit_message(0x100, 0x200, 0);
1193
1194 src.enqueue_wal_data(rel_msg);
1195 src.enqueue_wal_data(begin_msg);
1196 src.enqueue_wal_data(insert_msg);
1197 src.enqueue_wal_data(commit_msg);
1198
1199 let batch = src.poll_batch(100).await.unwrap().unwrap();
1200 assert_eq!(batch.num_rows(), 1);
1201
1202 let records = &batch.records;
1203 let table_col = records.column(0).as_string::<i32>();
1204 assert_eq!(table_col.value(0), "users");
1205
1206 let op_col = records.column(1).as_string::<i32>();
1207 assert_eq!(op_col.value(0), "I");
1208
1209 let after_col = records.column(5).as_string::<i32>();
1210 let after_json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1211 assert_eq!(after_json["id"], "42");
1212 assert_eq!(after_json["name"], "Alice");
1213
1214 assert!(records.column(4).is_null(0));
1216 }
1217
1218 #[tokio::test]
1221 async fn test_multi_event_transaction() {
1222 let mut src = running_source();
1223
1224 let rel_msg = PostgresCdcSource::build_relation_message(
1226 16384,
1227 "public",
1228 "users",
1229 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1230 );
1231 src.enqueue_wal_data(rel_msg);
1232
1233 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x300, 0, 2));
1235 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1236 16384,
1237 &[Some("1"), Some("Alice")],
1238 ));
1239 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1240 16384,
1241 &[Some("2"), Some("Bob")],
1242 ));
1243 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1244 16384,
1245 &[Some("3"), Some("Charlie")],
1246 ));
1247 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x300, 0x400, 0));
1248
1249 let batch = src.poll_batch(100).await.unwrap().unwrap();
1250 assert_eq!(batch.num_rows(), 3);
1251 }
1252
1253 #[tokio::test]
1256 async fn test_events_buffered_until_commit() {
1257 let mut src = running_source();
1258
1259 let rel_msg = PostgresCdcSource::build_relation_message(
1260 16384,
1261 "public",
1262 "users",
1263 &[(1, "id", INT8_OID, -1)],
1264 );
1265 src.enqueue_wal_data(rel_msg);
1266
1267 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1269 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1270
1271 let result = src.poll_batch(100).await.unwrap();
1273 assert!(result.is_none());
1274
1275 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1277
1278 let batch = src.poll_batch(100).await.unwrap().unwrap();
1279 assert_eq!(batch.num_rows(), 1);
1280 }
1281
1282 #[tokio::test]
1285 async fn test_process_update() {
1286 let mut src = running_source();
1287
1288 let rel_msg = PostgresCdcSource::build_relation_message(
1289 16384,
1290 "public",
1291 "users",
1292 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1293 );
1294 src.enqueue_wal_data(rel_msg);
1295
1296 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1297 src.enqueue_wal_data(PostgresCdcSource::build_update_message(
1298 16384,
1299 &[Some("42"), Some("Alice")],
1300 &[Some("42"), Some("Bob")],
1301 ));
1302 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1303
1304 let batch = src.poll_batch(100).await.unwrap().unwrap();
1305 assert_eq!(batch.num_rows(), 1);
1306
1307 let op_col = batch.records.column(1).as_string::<i32>();
1308 assert_eq!(op_col.value(0), "U");
1309
1310 assert!(!batch.records.column(4).is_null(0)); assert!(!batch.records.column(5).is_null(0)); }
1314
1315 #[tokio::test]
1318 async fn test_process_delete() {
1319 let mut src = running_source();
1320
1321 let rel_msg = PostgresCdcSource::build_relation_message(
1322 16384,
1323 "public",
1324 "users",
1325 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1326 );
1327 src.enqueue_wal_data(rel_msg);
1328
1329 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1330 src.enqueue_wal_data(PostgresCdcSource::build_delete_message(
1331 16384,
1332 &[Some("42")],
1333 ));
1334 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1335
1336 let batch = src.poll_batch(100).await.unwrap().unwrap();
1337 let op_col = batch.records.column(1).as_string::<i32>();
1338 assert_eq!(op_col.value(0), "D");
1339
1340 assert!(!batch.records.column(4).is_null(0));
1342 assert!(batch.records.column(5).is_null(0));
1343 }
1344
1345 #[tokio::test]
1348 async fn test_table_exclude_filter() {
1349 let mut config = PostgresCdcConfig::default();
1350 config.table_exclude = vec!["users".to_string()];
1351 let mut src = PostgresCdcSource::new(config, None);
1352 src.state = ConnectorState::Running;
1353
1354 let rel_msg = PostgresCdcSource::build_relation_message(
1355 16384,
1356 "public",
1357 "users",
1358 &[(1, "id", INT8_OID, -1)],
1359 );
1360 src.enqueue_wal_data(rel_msg);
1361
1362 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1363 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1364 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1365
1366 let result = src.poll_batch(100).await.unwrap();
1367 assert!(result.is_none()); }
1369
1370 #[tokio::test]
1373 async fn test_max_poll_records() {
1374 let mut src = running_source();
1375
1376 for i in 0..5 {
1378 src.inject_event(ChangeEvent {
1379 table: "t".to_string(),
1380 op: CdcOperation::Insert,
1381 lsn: Lsn::new(i as u64),
1382 ts_ms: 0,
1383 before: None,
1384 after: Some(format!("{{\"id\":\"{i}\"}}")),
1385 });
1386 }
1387
1388 let batch = src.poll_batch(2).await.unwrap().unwrap();
1390 assert_eq!(batch.num_rows(), 2);
1391 assert_eq!(src.buffered_events(), 3);
1392
1393 let batch = src.poll_batch(100).await.unwrap().unwrap();
1395 assert_eq!(batch.num_rows(), 3);
1396 assert_eq!(src.buffered_events(), 0);
1397 }
1398
1399 #[tokio::test]
1402 async fn test_partition_info() {
1403 let mut src = running_source();
1404 src.write_lsn = "1/ABCD".parse().unwrap();
1405
1406 src.inject_event(ChangeEvent {
1407 table: "t".to_string(),
1408 op: CdcOperation::Insert,
1409 lsn: Lsn::ZERO,
1410 ts_ms: 0,
1411 before: None,
1412 after: Some("{}".to_string()),
1413 });
1414
1415 let batch = src.poll_batch(100).await.unwrap().unwrap();
1416 let partition = batch.partition.unwrap();
1417 assert_eq!(partition.id, "laminar_slot");
1418 assert_eq!(partition.offset, "1/ABCD");
1419 }
1420
1421 #[test]
1424 fn test_health_check_healthy() {
1425 let src = running_source();
1426 assert!(src.health_check().is_healthy());
1427 }
1428
1429 #[test]
1430 fn test_health_check_degraded() {
1431 let mut src = running_source();
1432 src.write_lsn = Lsn::new(200_000_000);
1433 src.confirmed_flush_lsn = Lsn::ZERO;
1434 assert!(matches!(src.health_check(), HealthStatus::Degraded(_)));
1435 }
1436
1437 #[test]
1438 fn test_health_check_unknown_when_created() {
1439 let src = default_source();
1440 assert!(matches!(src.health_check(), HealthStatus::Unknown));
1441 }
1442
1443 #[tokio::test]
1446 async fn test_metrics_after_processing() {
1447 let mut src = running_source();
1448
1449 let rel_msg = PostgresCdcSource::build_relation_message(
1450 16384,
1451 "public",
1452 "users",
1453 &[(1, "id", INT8_OID, -1)],
1454 );
1455 src.enqueue_wal_data(rel_msg);
1456
1457 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1458 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1459 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("2")]));
1460 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1461
1462 let _ = src.poll_batch(100).await.unwrap();
1463
1464 let metrics = src.metrics();
1465 assert_eq!(metrics.records_total, 2); }
1467
1468 #[test]
1471 fn test_replication_lag() {
1472 let mut src = default_source();
1473 src.write_lsn = Lsn::new(1000);
1474 src.confirmed_flush_lsn = Lsn::new(500);
1475 assert_eq!(src.replication_lag_bytes(), 500);
1476 }
1477
1478 #[tokio::test]
1481 async fn test_unknown_relation_error() {
1482 let mut src = running_source();
1483
1484 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1486 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(99999, &[Some("1")]));
1487
1488 let result = src.poll_batch(100).await;
1489 assert!(result.is_err());
1490 }
1491
1492 #[tokio::test]
1495 async fn test_multi_table_transaction() {
1496 let mut src = running_source();
1497
1498 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1500 100,
1501 "public",
1502 "users",
1503 &[(1, "id", INT4_OID, -1)],
1504 ));
1505 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1506 200,
1507 "public",
1508 "orders",
1509 &[(1, "order_id", INT4_OID, -1)],
1510 ));
1511
1512 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x500, 0, 5));
1513 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1514 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1515 200,
1516 &[Some("1001")],
1517 ));
1518 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x500, 0x600, 0));
1519
1520 let batch = src.poll_batch(100).await.unwrap().unwrap();
1521 assert_eq!(batch.num_rows(), 2);
1522
1523 let table_col = batch.records.column(0).as_string::<i32>();
1524 assert_eq!(table_col.value(0), "users");
1525 assert_eq!(table_col.value(1), "orders");
1526 }
1527
1528 #[tokio::test]
1531 async fn test_schema_change_mid_stream() {
1532 let mut src = running_source();
1533
1534 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1536 100,
1537 "public",
1538 "users",
1539 &[(1, "id", INT4_OID, -1)],
1540 ));
1541
1542 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1543 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1544 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1545
1546 let batch1 = src.poll_batch(100).await.unwrap().unwrap();
1547 assert_eq!(batch1.num_rows(), 1);
1548
1549 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1551 100,
1552 "public",
1553 "users",
1554 &[(1, "id", INT4_OID, -1), (0, "email", TEXT_OID, -1)],
1555 ));
1556
1557 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x200, 0, 2));
1558 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1559 100,
1560 &[Some("2"), Some("alice@example.com")],
1561 ));
1562 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x200, 0x300, 0));
1563
1564 let batch2 = src.poll_batch(100).await.unwrap().unwrap();
1565 assert_eq!(batch2.num_rows(), 1);
1566
1567 let after_col = batch2.records.column(5).as_string::<i32>();
1569 let json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1570 assert_eq!(json["email"], "alice@example.com");
1571 }
1572
1573 #[tokio::test]
1576 async fn test_write_lsn_advances() {
1577 let mut src = running_source();
1578
1579 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1580 100,
1581 "public",
1582 "t",
1583 &[(1, "id", INT4_OID, -1)],
1584 ));
1585
1586 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1587 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1588 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1589
1590 let _ = src.poll_batch(100).await;
1591 assert_eq!(src.write_lsn().as_u64(), 0x500);
1592 }
1593
1594 #[tokio::test]
1597 async fn test_truncate_returns_error() {
1598 let mut src = running_source();
1599
1600 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1602 16384,
1603 "public",
1604 "users",
1605 &[(1, "id", INT8_OID, -1)],
1606 ));
1607
1608 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1609 src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[16384], 0));
1610
1611 let result = src.poll_batch(100).await;
1612 assert!(result.is_err());
1613 let err = result.unwrap_err().to_string();
1614 assert!(
1615 err.contains("TRUNCATE"),
1616 "error should mention TRUNCATE: {err}"
1617 );
1618 assert!(
1619 err.contains("users"),
1620 "error should mention table name: {err}"
1621 );
1622 }
1623
1624 #[tokio::test]
1625 async fn test_truncate_unknown_relation_uses_oid() {
1626 let mut src = running_source();
1627
1628 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1630 src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[99999], 0));
1631
1632 let result = src.poll_batch(100).await;
1633 assert!(result.is_err());
1634 let err = result.unwrap_err().to_string();
1635 assert!(err.contains("oid:99999"), "error should mention oid: {err}");
1636 }
1637
1638 #[tokio::test]
1641 async fn test_confirmed_lsn_not_advanced_until_checkpoint() {
1642 let mut src = running_source();
1643
1644 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1645 100,
1646 "public",
1647 "t",
1648 &[(1, "id", INT4_OID, -1)],
1649 ));
1650
1651 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1652 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1653 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1654
1655 assert!(src.confirmed_flush_lsn().is_zero());
1657
1658 let _ = src.poll_batch(100).await.unwrap().unwrap();
1660 assert!(
1661 src.confirmed_flush_lsn().is_zero(),
1662 "confirmed_flush_lsn should not advance on poll, got {}",
1663 src.confirmed_flush_lsn()
1664 );
1665
1666 assert_eq!(src.polled_lsn.as_u64(), 0x500);
1668
1669 let cp = src.checkpoint();
1671 assert_eq!(cp.get_offset("lsn"), Some("0/500"));
1672 }
1673
1674 #[tokio::test]
1677 async fn test_restore_sets_polled_lsn() {
1678 let mut src = default_source();
1679 let mut cp = SourceCheckpoint::new(1);
1680 cp.set_offset("lsn", "2/FF00");
1681 cp.set_offset("write_lsn", "2/FF10");
1682
1683 src.restore(&cp).await.unwrap();
1684 assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1685 assert_eq!(src.polled_lsn.as_u64(), 0x2_0000_FF00);
1686 assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1687 }
1688
1689 #[tokio::test]
1692 async fn test_backpressure_does_not_drop_buffered_events() {
1693 let mut src = running_source();
1694 src.config.max_buffered_events = 100;
1695
1696 for i in 0..200u64 {
1701 src.inject_event(ChangeEvent {
1702 table: "public.t".to_string(),
1703 op: CdcOperation::Insert,
1704 before: None,
1705 after: Some(format!("{{\"id\": {i}}}")),
1706 ts_ms: i as i64,
1707 lsn: Lsn::new(i),
1708 });
1709 }
1710 assert_eq!(src.event_buffer.len(), 200);
1711
1712 let batch = src.poll_batch(50).await.unwrap().unwrap();
1714 assert_eq!(batch.records.num_rows(), 50);
1715 assert_eq!(src.event_buffer.len(), 150);
1717 assert_eq!(src.metrics.events_dropped.get(), 0);
1718 }
1719}