1use arrow_array::RecordBatch;
13use arrow_schema::SchemaRef;
14use async_trait::async_trait;
15use std::collections::VecDeque;
16use std::sync::Arc;
17use tokio::sync::Notify;
18
19use crate::checkpoint::SourceCheckpoint;
20use crate::config::{ConnectorConfig, ConnectorState};
21use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
22use crate::error::ConnectorError;
23
24use super::changelog::{events_to_record_batch, tuple_to_json, CdcOperation, ChangeEvent};
25use super::config::PostgresCdcConfig;
26use super::decoder::{decode_message, WalMessage};
27use super::lsn::Lsn;
28use super::metrics::PostgresCdcMetrics;
29use super::schema::{cdc_envelope_schema, RelationCache, RelationInfo};
30pub struct PostgresCdcSource {
47 config: PostgresCdcConfig,
49
50 state: ConnectorState,
52
53 schema: SchemaRef,
55
56 metrics: Arc<PostgresCdcMetrics>,
58
59 relation_cache: RelationCache,
61
62 event_buffer: VecDeque<ChangeEvent>,
64
65 current_txn: Option<TransactionState>,
67
68 confirmed_flush_lsn: Lsn,
70
71 write_lsn: Lsn,
73
74 polled_lsn: Lsn,
78
79 pending_messages: VecDeque<Vec<u8>>,
81
82 data_ready: Arc<Notify>,
84
85 #[cfg(feature = "postgres-cdc")]
87 connection_handle: Option<tokio::task::JoinHandle<()>>,
88
89 #[cfg(feature = "postgres-cdc")]
91 wal_rx: Option<WalPayloadRx>,
92
93 #[cfg(feature = "postgres-cdc")]
95 reader_handle: Option<tokio::task::JoinHandle<()>>,
96
97 #[cfg(feature = "postgres-cdc")]
99 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
100
101 #[cfg(feature = "postgres-cdc")]
105 confirmed_lsn_tx: Option<tokio::sync::watch::Sender<u64>>,
106}
107
108#[derive(Debug, Clone)]
110struct TransactionState {
111 final_lsn: Lsn,
113 commit_ts_ms: i64,
115 events: Vec<ChangeEvent>,
117}
118
119#[cfg(feature = "postgres-cdc")]
121type WalPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<WalPayload>>;
122
123#[allow(dead_code)] enum WalPayload {
126 Begin {
127 final_lsn: u64,
128 commit_ts_us: i64,
129 xid: u32,
130 },
131 Commit {
132 end_lsn: u64,
133 commit_ts_us: i64,
134 lsn: u64,
135 },
136 XLogData {
137 wal_end: u64,
138 data: Vec<u8>,
139 },
140 KeepAlive {
141 wal_end: u64,
142 },
143 Error(String),
145}
146
147impl PostgresCdcSource {
148 #[must_use]
150 pub fn new(config: PostgresCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
151 Self {
152 config,
153 state: ConnectorState::Created,
154 schema: cdc_envelope_schema(),
155 metrics: Arc::new(PostgresCdcMetrics::new(registry)),
156 relation_cache: RelationCache::new(),
157 event_buffer: VecDeque::new(),
158 current_txn: None,
159 confirmed_flush_lsn: Lsn::ZERO,
160 write_lsn: Lsn::ZERO,
161 polled_lsn: Lsn::ZERO,
162 pending_messages: VecDeque::new(),
163 data_ready: Arc::new(Notify::new()),
164 #[cfg(feature = "postgres-cdc")]
165 connection_handle: None,
166 #[cfg(feature = "postgres-cdc")]
167 wal_rx: None,
168 #[cfg(feature = "postgres-cdc")]
169 reader_handle: None,
170 #[cfg(feature = "postgres-cdc")]
171 reader_shutdown: None,
172 #[cfg(feature = "postgres-cdc")]
173 confirmed_lsn_tx: None,
174 }
175 }
176
177 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
183 let pg_config = PostgresCdcConfig::from_config(config)?;
184 Ok(Self::new(pg_config, None))
185 }
186
187 #[must_use]
189 pub fn config(&self) -> &PostgresCdcConfig {
190 &self.config
191 }
192
193 #[must_use]
195 pub fn confirmed_flush_lsn(&self) -> Lsn {
196 self.confirmed_flush_lsn
197 }
198
199 #[must_use]
201 pub fn write_lsn(&self) -> Lsn {
202 self.write_lsn
203 }
204
205 #[must_use]
207 pub fn replication_lag_bytes(&self) -> u64 {
208 self.write_lsn.diff(self.confirmed_flush_lsn)
209 }
210
211 #[must_use]
213 pub fn relation_cache(&self) -> &RelationCache {
214 &self.relation_cache
215 }
216
217 #[must_use]
219 pub fn buffered_events(&self) -> usize {
220 self.event_buffer.len()
221 }
222
223 pub fn enqueue_wal_data(&mut self, data: Vec<u8>) {
228 self.pending_messages.push_back(data);
229 }
230
231 pub fn process_pending_messages(&mut self) -> Result<(), ConnectorError> {
237 while let Some(data) = self.pending_messages.pop_front() {
238 self.metrics.record_bytes(data.len() as u64);
239 let msg = decode_message(&data)
240 .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
241 self.process_wal_message(msg)?;
242 }
243 Ok(())
244 }
245
246 fn process_wal_message(&mut self, msg: WalMessage) -> Result<(), ConnectorError> {
248 match msg {
249 WalMessage::Begin(begin) => {
250 self.current_txn = Some(TransactionState {
251 final_lsn: begin.final_lsn,
252 commit_ts_ms: begin.commit_ts_ms,
253 events: Vec::new(),
254 });
255 }
256 WalMessage::Commit(commit) => {
257 if let Some(txn) = self.current_txn.take() {
258 self.event_buffer.extend(txn.events);
259 self.write_lsn = commit.end_lsn;
260 self.metrics.record_transaction();
261 self.metrics
262 .set_replication_lag_bytes(self.replication_lag_bytes());
263 }
264 }
265 WalMessage::Relation(rel) => {
266 let info = RelationInfo {
267 relation_id: rel.relation_id,
268 namespace: rel.namespace,
269 name: rel.name,
270 replica_identity: rel.replica_identity as char,
271 columns: rel.columns,
272 };
273 self.relation_cache.insert(info);
274 }
275 WalMessage::Insert(ins) => {
276 self.process_insert(ins.relation_id, &ins.new_tuple)?;
277 }
278 WalMessage::Update(upd) => {
279 self.process_update(upd.relation_id, upd.old_tuple.as_ref(), &upd.new_tuple)?;
280 }
281 WalMessage::Delete(del) => {
282 self.process_delete(del.relation_id, &del.old_tuple)?;
283 }
284 WalMessage::Truncate(trunc) => {
285 let table_names: Vec<String> = trunc
286 .relation_ids
287 .iter()
288 .map(|id| {
289 self.relation_cache
290 .get(*id)
291 .map_or_else(|| format!("oid:{id}"), RelationInfo::full_name)
292 })
293 .collect();
294 return Err(ConnectorError::ReadError(format!(
295 "TRUNCATE received on table(s): {}. \
296 Cannot produce retraction events — restart the pipeline with a fresh snapshot.",
297 table_names.join(", ")
298 )));
299 }
300 WalMessage::Origin(_) | WalMessage::Type(_) => {
301 }
304 }
305 Ok(())
306 }
307
308 fn process_insert(
309 &mut self,
310 relation_id: u32,
311 new_tuple: &super::decoder::TupleData,
312 ) -> Result<(), ConnectorError> {
313 let relation = self.require_relation(relation_id)?;
314 let table = relation.full_name();
315
316 if !self.config.should_include_table(&table) {
317 return Ok(());
318 }
319
320 let after_json = tuple_to_json(new_tuple, relation);
321 let (lsn, ts_ms) = self.current_txn_context();
322
323 let event = ChangeEvent {
324 table,
325 op: CdcOperation::Insert,
326 lsn,
327 ts_ms,
328 before: None,
329 after: Some(after_json),
330 };
331
332 self.push_event(event);
333 self.metrics.record_insert();
334 Ok(())
335 }
336
337 fn process_update(
338 &mut self,
339 relation_id: u32,
340 old_tuple: Option<&super::decoder::TupleData>,
341 new_tuple: &super::decoder::TupleData,
342 ) -> Result<(), ConnectorError> {
343 let relation = self.require_relation(relation_id)?;
344 let table = relation.full_name();
345
346 if !self.config.should_include_table(&table) {
347 return Ok(());
348 }
349
350 let before_json = old_tuple.map(|t| tuple_to_json(t, relation));
351 let after_json = tuple_to_json(new_tuple, relation);
352 let (lsn, ts_ms) = self.current_txn_context();
353
354 let event = ChangeEvent {
355 table,
356 op: CdcOperation::Update,
357 lsn,
358 ts_ms,
359 before: before_json,
360 after: Some(after_json),
361 };
362
363 self.push_event(event);
364 self.metrics.record_update();
365 Ok(())
366 }
367
368 fn process_delete(
369 &mut self,
370 relation_id: u32,
371 old_tuple: &super::decoder::TupleData,
372 ) -> Result<(), ConnectorError> {
373 let relation = self.require_relation(relation_id)?;
374 let table = relation.full_name();
375
376 if !self.config.should_include_table(&table) {
377 return Ok(());
378 }
379
380 let before_json = tuple_to_json(old_tuple, relation);
381 let (lsn, ts_ms) = self.current_txn_context();
382
383 let event = ChangeEvent {
384 table,
385 op: CdcOperation::Delete,
386 lsn,
387 ts_ms,
388 before: Some(before_json),
389 after: None,
390 };
391
392 self.push_event(event);
393 self.metrics.record_delete();
394 Ok(())
395 }
396
397 fn require_relation(&self, relation_id: u32) -> Result<&RelationInfo, ConnectorError> {
403 self.relation_cache.get(relation_id).ok_or_else(|| {
404 ConnectorError::ReadError(format!(
405 "unknown relation ID {relation_id} (no Relation message received yet)"
406 ))
407 })
408 }
409
410 fn current_txn_context(&self) -> (Lsn, i64) {
411 match &self.current_txn {
412 Some(txn) => (txn.final_lsn, txn.commit_ts_ms),
413 None => (self.write_lsn, 0),
414 }
415 }
416
417 fn push_event(&mut self, event: ChangeEvent) {
418 if let Some(txn) = &mut self.current_txn {
419 txn.events.push(event);
420 } else {
421 self.event_buffer.push_back(event);
422 }
423 }
424
425 #[cfg(feature = "postgres-cdc")]
427 fn process_wal_payload(&mut self, payload: WalPayload) -> Result<(), ConnectorError> {
428 use super::decoder::pg_timestamp_to_unix_ms;
429
430 match payload {
431 WalPayload::Begin {
432 final_lsn,
433 commit_ts_us,
434 xid,
435 } => {
436 let begin = super::decoder::BeginMessage {
437 final_lsn: Lsn::new(final_lsn),
438 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
439 xid,
440 };
441 self.process_wal_message(WalMessage::Begin(begin))
442 }
443 WalPayload::Commit {
444 end_lsn,
445 commit_ts_us,
446 lsn,
447 } => {
448 let commit = super::decoder::CommitMessage {
449 flags: 0,
450 commit_lsn: Lsn::new(lsn),
451 end_lsn: Lsn::new(end_lsn),
452 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
453 };
454 self.process_wal_message(WalMessage::Commit(commit))
455 }
456 WalPayload::XLogData { wal_end, data } => {
457 if !data.is_empty() && (data[0] == b'B' || data[0] == b'C') {
460 self.write_lsn = Lsn::new(wal_end);
461 return Ok(());
462 }
463 let msg = decode_message(&data)
464 .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
465 self.process_wal_message(msg)?;
466 self.write_lsn = Lsn::new(wal_end);
467 Ok(())
468 }
469 WalPayload::KeepAlive { wal_end } => {
470 self.write_lsn = Lsn::new(wal_end);
471 Ok(())
472 }
473 WalPayload::Error(msg) => Err(ConnectorError::ReadError(msg)),
474 }
475 }
476
477 fn drain_events(&mut self, max: usize) -> Result<Option<RecordBatch>, ConnectorError> {
479 if self.event_buffer.is_empty() {
480 return Ok(None);
481 }
482
483 let count = max.min(self.event_buffer.len());
484 let events: Vec<ChangeEvent> = self.event_buffer.drain(..count).collect();
485
486 let batch = events_to_record_batch(&events)
487 .map_err(|e| ConnectorError::Internal(format!("Arrow batch build: {e}")))?;
488
489 self.metrics.record_batch();
490 Ok(Some(batch))
491 }
492}
493
494#[async_trait]
495#[allow(clippy::too_many_lines)]
496impl SourceConnector for PostgresCdcSource {
497 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
498 self.state = ConnectorState::Initializing;
499
500 if !config.properties().is_empty() {
502 self.config = PostgresCdcConfig::from_config(config)?;
503 }
504
505 if let Some(lsn) = self.config.start_lsn {
507 self.confirmed_flush_lsn = lsn;
508 self.write_lsn = lsn;
509 self.polled_lsn = lsn;
510 }
511
512 #[cfg(not(feature = "postgres-cdc"))]
515 {
516 return Err(ConnectorError::ConfigurationError(
517 "PostgreSQL CDC source requires the `postgres-cdc` feature flag. \
518 Rebuild with `--features postgres-cdc` to enable."
519 .to_string(),
520 ));
521 }
522
523 #[cfg(all(feature = "postgres-cdc", not(test)))]
524 {
525 use super::postgres_io;
526
527 let (client, handle) = postgres_io::connect(&self.config).await?;
529 self.connection_handle = Some(handle);
530
531 let slot_lsn = postgres_io::ensure_replication_slot(
533 &client,
534 &self.config.slot_name,
535 &self.config.output_plugin,
536 )
537 .await?;
538
539 if self.config.start_lsn.is_none() {
541 if let Some(lsn) = slot_lsn {
542 self.confirmed_flush_lsn = lsn;
543 self.write_lsn = lsn;
544 self.polled_lsn = lsn;
545 }
546 }
547
548 let mut repl_config = postgres_io::build_replication_config(&self.config);
550 if self.confirmed_flush_lsn != Lsn::ZERO {
552 repl_config.start_lsn =
553 pgwire_replication::Lsn::from_u64(self.confirmed_flush_lsn.as_u64());
554 }
555
556 let repl_client = pgwire_replication::ReplicationClient::connect(repl_config)
557 .await
558 .map_err(|e| {
559 ConnectorError::ConnectionFailed(format!("pgwire-replication connect: {e}"))
560 })?;
561
562 let (wal_tx, wal_rx) = crossfire::mpsc::bounded_async::<WalPayload>(4096);
564 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
565 let (confirmed_lsn_tx, mut confirmed_lsn_rx) =
566 tokio::sync::watch::channel(self.confirmed_flush_lsn.as_u64());
567 let data_ready = Arc::clone(&self.data_ready);
568 let reader_config = self.config.clone();
569
570 let reader_handle = tokio::spawn(async move {
571 const MAX_FAILURES: u32 = 10;
572 let mut repl_client = repl_client;
573 let mut consecutive_failures: u32 = 0;
574
575 'reconnect: loop {
576 'recv: loop {
578 if confirmed_lsn_rx.has_changed().unwrap_or(false) {
582 let confirmed = *confirmed_lsn_rx.borrow_and_update();
583 if confirmed > 0 {
584 repl_client.update_applied_lsn(pgwire_replication::Lsn::from_u64(
585 confirmed,
586 ));
587 }
588 }
589
590 tokio::select! {
591 biased;
592 _ = shutdown_rx.changed() => break 'reconnect,
593 event = repl_client.recv() => {
594 match event {
595 Ok(Some(event)) => {
596 consecutive_failures = 0;
597 let payload = match &event {
598 pgwire_replication::ReplicationEvent::Begin {
599 final_lsn,
600 xid,
601 commit_time_micros,
602 } => Some(WalPayload::Begin {
603 final_lsn: final_lsn.as_u64(),
604 commit_ts_us: *commit_time_micros,
605 xid: *xid,
606 }),
607 pgwire_replication::ReplicationEvent::Commit {
608 end_lsn,
609 commit_time_micros,
610 lsn,
611 } => Some(WalPayload::Commit {
612 end_lsn: end_lsn.as_u64(),
613 commit_ts_us: *commit_time_micros,
614 lsn: lsn.as_u64(),
615 }),
616 pgwire_replication::ReplicationEvent::XLogData {
617 wal_end,
618 data,
619 ..
620 } => Some(WalPayload::XLogData {
621 wal_end: wal_end.as_u64(),
622 data: data.to_vec(),
623 }),
624 pgwire_replication::ReplicationEvent::KeepAlive {
625 wal_end,
626 ..
627 } => Some(WalPayload::KeepAlive {
628 wal_end: wal_end.as_u64(),
629 }),
630 _ => None,
631 };
632 if let Some(p) = payload {
633 if wal_tx.send(p).await.is_err() {
634 break 'reconnect;
635 }
636 data_ready.notify_one();
637 }
638 }
639 Ok(None) => break 'recv,
640 Err(e) => {
641 tracing::error!(error = %e, "WAL reader error");
642 break 'recv;
643 }
644 }
645 }
646 }
647 }
648
649 let _ = repl_client.shutdown().await;
651 consecutive_failures += 1;
652
653 if consecutive_failures >= MAX_FAILURES {
654 tracing::error!(
655 failures = consecutive_failures,
656 "WAL reader exhausted reconnect attempts"
657 );
658 let _ = wal_tx.send(WalPayload::Error(format!(
659 "WAL reader failed after {consecutive_failures} consecutive reconnect attempts"
660 ))).await;
661 data_ready.notify_one();
662 break 'reconnect;
663 }
664
665 let backoff =
669 crate::retry::Backoff::broker_reconnect().delay(consecutive_failures);
670 tracing::warn!(
671 attempt = consecutive_failures,
672 ?backoff,
673 "WAL reader reconnecting"
674 );
675 tokio::select! {
676 biased;
677 _ = shutdown_rx.changed() => break 'reconnect,
678 () = tokio::time::sleep(backoff) => {}
679 }
680
681 let resume_lsn = *confirmed_lsn_rx.borrow();
683 let mut new_config = postgres_io::build_replication_config(&reader_config);
684 if resume_lsn > 0 {
685 new_config.start_lsn = pgwire_replication::Lsn::from_u64(resume_lsn);
686 }
687
688 match pgwire_replication::ReplicationClient::connect(new_config).await {
689 Ok(new_client) => {
690 tracing::info!("WAL reader reconnected");
691 repl_client = new_client;
692 }
693 Err(e) => {
694 tracing::error!(error = %e, "WAL reader reconnect failed");
695 }
696 }
697 }
698 });
699
700 self.wal_rx = Some(wal_rx);
701 self.reader_handle = Some(reader_handle);
702 self.reader_shutdown = Some(shutdown_tx);
703 self.confirmed_lsn_tx = Some(confirmed_lsn_tx);
704 }
705
706 self.state = ConnectorState::Running;
707 Ok(())
708 }
709
710 async fn poll_batch(
711 &mut self,
712 max_records: usize,
713 ) -> Result<Option<SourceBatch>, ConnectorError> {
714 if self.state != ConnectorState::Running {
715 return Err(ConnectorError::InvalidState {
716 expected: "Running".to_string(),
717 actual: self.state.to_string(),
718 });
719 }
720
721 #[cfg(feature = "postgres-cdc")]
731 {
732 let high_watermark = self.config.backpressure_high_watermark();
733 let mut payloads = Vec::new();
734 let mut reader_closed = false;
735
736 if self.event_buffer.len() < high_watermark {
737 if let Some(ref mut rx) = self.wal_rx {
738 while payloads.len() + self.event_buffer.len() < max_records
739 && self.event_buffer.len() + payloads.len() < high_watermark
740 {
741 match rx.try_recv() {
742 Ok(payload) => payloads.push(payload),
743 Err(crossfire::TryRecvError::Empty) => break,
744 Err(crossfire::TryRecvError::Disconnected) => {
745 reader_closed = true;
746 break;
747 }
748 }
749 }
750 }
751 } else {
752 tracing::debug!(
753 buffered = self.event_buffer.len(),
754 high_watermark,
755 "CDC backpressure active — pausing WAL reader drain"
756 );
757 }
758
759 for payload in payloads {
760 self.process_wal_payload(payload)?;
761 }
762 if reader_closed && self.event_buffer.is_empty() {
763 self.state = ConnectorState::Failed;
764 return Err(ConnectorError::ReadError(
765 "WAL reader task terminated unexpectedly — replication stream lost".to_string(),
766 ));
767 }
768 }
769
770 self.process_pending_messages()?;
772
773 match self.drain_events(max_records)? {
780 Some(batch) => {
781 if self.event_buffer.is_empty() {
784 self.polled_lsn = self.write_lsn;
785 }
786 self.metrics
787 .set_confirmed_flush_lsn(self.confirmed_flush_lsn.as_u64());
788 self.metrics
789 .set_replication_lag_bytes(self.replication_lag_bytes());
790
791 let lsn_str = self.write_lsn.to_string();
792 let partition = PartitionInfo::new(&self.config.slot_name, lsn_str);
793 Ok(Some(SourceBatch::with_partition(batch, partition)))
794 }
795 None => Ok(None),
796 }
797 }
798
799 fn schema(&self) -> SchemaRef {
800 Arc::clone(&self.schema)
801 }
802
803 fn checkpoint(&self) -> SourceCheckpoint {
804 let mut cp = SourceCheckpoint::new(0);
805 cp.set_offset("lsn", self.polled_lsn.to_string());
809 cp.set_offset("write_lsn", self.write_lsn.to_string());
810 cp.set_metadata("slot_name", &self.config.slot_name);
811 cp.set_metadata("publication", &self.config.publication);
812
813 #[cfg(feature = "postgres-cdc")]
816 if let Some(ref tx) = self.confirmed_lsn_tx {
817 let _ = tx.send(self.polled_lsn.as_u64());
818 }
819
820 cp
821 }
822
823 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
824 if let Some(lsn_str) = checkpoint.get_offset("lsn") {
825 let lsn: Lsn = lsn_str
826 .parse()
827 .map_err(|e| ConnectorError::Internal(format!("invalid LSN in checkpoint: {e}")))?;
828 self.confirmed_flush_lsn = lsn;
829 self.polled_lsn = lsn;
830 self.metrics.set_confirmed_flush_lsn(lsn.as_u64());
831 }
832 if let Some(write_lsn_str) = checkpoint.get_offset("write_lsn") {
833 if let Ok(lsn) = write_lsn_str.parse::<Lsn>() {
834 self.write_lsn = lsn;
835 }
836 }
837 Ok(())
838 }
839
840 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
841 Some(Arc::clone(&self.data_ready))
842 }
843
844 async fn close(&mut self) -> Result<(), ConnectorError> {
845 #[cfg(feature = "postgres-cdc")]
847 {
848 if let Some(tx) = self.reader_shutdown.take() {
849 let _ = tx.send(true);
850 }
851 if let Some(handle) = self.reader_handle.take() {
852 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
853 }
854 self.wal_rx = None;
855 self.confirmed_lsn_tx = None;
856 }
857
858 #[cfg(feature = "postgres-cdc")]
861 if let Some(handle) = self.connection_handle.take() {
862 let abort = handle.abort_handle();
863 match tokio::time::timeout(std::time::Duration::from_secs(2), handle).await {
864 Ok(Ok(())) => {}
865 Ok(Err(join_err)) => {
866 tracing::warn!(
867 error = %join_err,
868 "[postgres-cdc] control-plane task exited with error"
869 );
870 }
871 Err(_elapsed) => {
872 tracing::warn!(
873 "[postgres-cdc] control-plane task did not exit within 2s; aborting"
874 );
875 abort.abort();
876 }
877 }
878 }
879
880 self.state = ConnectorState::Closed;
881 self.event_buffer.clear();
882 self.pending_messages.clear();
883 Ok(())
884 }
885}
886
887#[cfg(test)]
890impl PostgresCdcSource {
891 fn inject_event(&mut self, event: ChangeEvent) {
893 self.event_buffer.push_back(event);
894 }
895
896 fn build_relation_message(
898 relation_id: u32,
899 namespace: &str,
900 name: &str,
901 columns: &[(u8, &str, u32, i32)], ) -> Vec<u8> {
903 let mut buf = vec![b'R'];
904 buf.extend_from_slice(&relation_id.to_be_bytes());
905 buf.extend_from_slice(namespace.as_bytes());
906 buf.push(0);
907 buf.extend_from_slice(name.as_bytes());
908 buf.push(0);
909 buf.push(b'd'); buf.extend_from_slice(&(columns.len() as i16).to_be_bytes());
911 for (flags, col_name, oid, modifier) in columns {
912 buf.push(*flags);
913 buf.extend_from_slice(col_name.as_bytes());
914 buf.push(0);
915 buf.extend_from_slice(&oid.to_be_bytes());
916 buf.extend_from_slice(&modifier.to_be_bytes());
917 }
918 buf
919 }
920
921 fn build_begin_message(final_lsn: u64, commit_ts_us: i64, xid: u32) -> Vec<u8> {
923 let mut buf = vec![b'B'];
924 buf.extend_from_slice(&final_lsn.to_be_bytes());
925 buf.extend_from_slice(&commit_ts_us.to_be_bytes());
926 buf.extend_from_slice(&xid.to_be_bytes());
927 buf
928 }
929
930 fn build_commit_message(commit_lsn: u64, end_lsn: u64, commit_ts_us: i64) -> Vec<u8> {
932 let mut buf = vec![b'C'];
933 buf.push(0); buf.extend_from_slice(&commit_lsn.to_be_bytes());
935 buf.extend_from_slice(&end_lsn.to_be_bytes());
936 buf.extend_from_slice(&commit_ts_us.to_be_bytes());
937 buf
938 }
939
940 fn build_insert_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
942 let mut buf = vec![b'I'];
943 buf.extend_from_slice(&relation_id.to_be_bytes());
944 buf.push(b'N');
945 buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
946 for val in values {
947 match val {
948 Some(s) => {
949 buf.push(b't');
950 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
951 buf.extend_from_slice(s.as_bytes());
952 }
953 None => buf.push(b'n'),
954 }
955 }
956 buf
957 }
958
959 fn build_delete_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
961 let mut buf = vec![b'D'];
962 buf.extend_from_slice(&relation_id.to_be_bytes());
963 buf.push(b'K'); buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
965 for val in values {
966 match val {
967 Some(s) => {
968 buf.push(b't');
969 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
970 buf.extend_from_slice(s.as_bytes());
971 }
972 None => buf.push(b'n'),
973 }
974 }
975 buf
976 }
977
978 fn build_truncate_message(relation_ids: &[u32], options: u8) -> Vec<u8> {
980 let mut buf = vec![b'T'];
981 buf.extend_from_slice(&(relation_ids.len() as i32).to_be_bytes());
982 buf.push(options);
983 for id in relation_ids {
984 buf.extend_from_slice(&id.to_be_bytes());
985 }
986 buf
987 }
988
989 fn build_update_message(
991 relation_id: u32,
992 old_values: &[Option<&str>],
993 new_values: &[Option<&str>],
994 ) -> Vec<u8> {
995 let mut buf = vec![b'U'];
996 buf.extend_from_slice(&relation_id.to_be_bytes());
997 buf.push(b'O');
999 buf.extend_from_slice(&(old_values.len() as i16).to_be_bytes());
1000 for val in old_values {
1001 match val {
1002 Some(s) => {
1003 buf.push(b't');
1004 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1005 buf.extend_from_slice(s.as_bytes());
1006 }
1007 None => buf.push(b'n'),
1008 }
1009 }
1010 buf.push(b'N');
1012 buf.extend_from_slice(&(new_values.len() as i16).to_be_bytes());
1013 for val in new_values {
1014 match val {
1015 Some(s) => {
1016 buf.push(b't');
1017 buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1018 buf.extend_from_slice(s.as_bytes());
1019 }
1020 None => buf.push(b'n'),
1021 }
1022 }
1023 buf
1024 }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use super::*;
1030 use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
1031 use arrow_array::cast::AsArray;
1032
1033 fn default_source() -> PostgresCdcSource {
1034 PostgresCdcSource::new(PostgresCdcConfig::default(), None)
1035 }
1036
1037 fn running_source() -> PostgresCdcSource {
1038 let mut src = default_source();
1039 src.state = ConnectorState::Running;
1040 src
1041 }
1042
1043 #[test]
1046 fn test_new_source() {
1047 let src = default_source();
1048 assert_eq!(src.state, ConnectorState::Created);
1049 assert!(src.confirmed_flush_lsn.is_zero());
1050 assert_eq!(src.event_buffer.len(), 0);
1051 assert_eq!(src.schema().fields().len(), 6);
1052 }
1053
1054 #[test]
1055 fn test_from_config() {
1056 let mut config = ConnectorConfig::new("postgres-cdc");
1057 config.set("host", "pg.local");
1058 config.set("database", "testdb");
1059 config.set("slot.name", "my_slot");
1060 config.set("publication", "my_pub");
1061
1062 let src = PostgresCdcSource::from_config(&config).unwrap();
1063 assert_eq!(src.config().host, "pg.local");
1064 assert_eq!(src.config().database, "testdb");
1065 }
1066
1067 #[test]
1068 fn test_from_config_invalid() {
1069 let config = ConnectorConfig::new("postgres-cdc");
1070 assert!(PostgresCdcSource::from_config(&config).is_err());
1071 }
1072
1073 #[cfg(not(feature = "postgres-cdc"))]
1077 #[tokio::test]
1078 async fn test_open_fails_without_feature() {
1079 let mut src = default_source();
1080 let config = ConnectorConfig::new("postgres-cdc");
1081 let result = src.open(&config).await;
1082 assert!(result.is_err());
1083 let err = result.unwrap_err().to_string();
1084 assert!(
1085 err.contains("postgres-cdc"),
1086 "error should mention feature flag: {err}"
1087 );
1088 }
1089
1090 #[tokio::test]
1091 async fn test_close() {
1092 let mut src = running_source();
1093 src.inject_event(ChangeEvent {
1094 table: "t".to_string(),
1095 op: CdcOperation::Insert,
1096 lsn: Lsn::ZERO,
1097 ts_ms: 0,
1098 before: None,
1099 after: Some("{}".to_string()),
1100 });
1101
1102 src.close().await.unwrap();
1103 assert_eq!(src.state, ConnectorState::Closed);
1104 assert_eq!(src.event_buffer.len(), 0);
1105 }
1106
1107 #[test]
1110 fn test_checkpoint() {
1111 let mut src = default_source();
1112 src.confirmed_flush_lsn = "1/ABCD".parse().unwrap();
1113 src.polled_lsn = "1/ABCD".parse().unwrap();
1114 src.write_lsn = "1/ABCE".parse().unwrap();
1115
1116 let cp = src.checkpoint();
1117 assert_eq!(cp.get_offset("lsn"), Some("1/ABCD"));
1118 assert_eq!(cp.get_offset("write_lsn"), Some("1/ABCE"));
1119 assert_eq!(cp.get_metadata("slot_name"), Some("laminar_slot"));
1120 }
1121
1122 #[tokio::test]
1123 async fn test_restore() {
1124 let mut src = default_source();
1125 let mut cp = SourceCheckpoint::new(1);
1126 cp.set_offset("lsn", "2/FF00");
1127 cp.set_offset("write_lsn", "2/FF10");
1128
1129 src.restore(&cp).await.unwrap();
1130 assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1131 assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1132 }
1133
1134 #[tokio::test]
1135 async fn test_restore_invalid_lsn() {
1136 let mut src = default_source();
1137 let mut cp = SourceCheckpoint::new(1);
1138 cp.set_offset("lsn", "not_an_lsn");
1139
1140 assert!(src.restore(&cp).await.is_err());
1141 }
1142
1143 #[tokio::test]
1146 async fn test_poll_empty() {
1147 let mut src = running_source();
1148 let result = src.poll_batch(100).await.unwrap();
1149 assert!(result.is_none());
1150 }
1151
1152 #[tokio::test]
1153 async fn test_poll_not_running() {
1154 let mut src = default_source();
1155 assert!(src.poll_batch(100).await.is_err());
1156 }
1157
1158 #[tokio::test]
1161 async fn test_process_insert_transaction() {
1162 let mut src = running_source();
1163
1164 let rel_msg = PostgresCdcSource::build_relation_message(
1165 16384,
1166 "public",
1167 "users",
1168 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1169 );
1170 let begin_msg = PostgresCdcSource::build_begin_message(0x100, 0, 1);
1171 let insert_msg =
1172 PostgresCdcSource::build_insert_message(16384, &[Some("42"), Some("Alice")]);
1173 let commit_msg = PostgresCdcSource::build_commit_message(0x100, 0x200, 0);
1174
1175 src.enqueue_wal_data(rel_msg);
1176 src.enqueue_wal_data(begin_msg);
1177 src.enqueue_wal_data(insert_msg);
1178 src.enqueue_wal_data(commit_msg);
1179
1180 let batch = src.poll_batch(100).await.unwrap().unwrap();
1181 assert_eq!(batch.num_rows(), 1);
1182
1183 let records = &batch.records;
1184 let table_col = records.column(0).as_string::<i32>();
1185 assert_eq!(table_col.value(0), "users");
1186
1187 let op_col = records.column(1).as_string::<i32>();
1188 assert_eq!(op_col.value(0), "I");
1189
1190 let after_col = records.column(5).as_string::<i32>();
1191 let after_json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1192 assert_eq!(after_json["id"], "42");
1193 assert_eq!(after_json["name"], "Alice");
1194
1195 assert!(records.column(4).is_null(0));
1197 }
1198
1199 #[tokio::test]
1202 async fn test_multi_event_transaction() {
1203 let mut src = running_source();
1204
1205 let rel_msg = PostgresCdcSource::build_relation_message(
1207 16384,
1208 "public",
1209 "users",
1210 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1211 );
1212 src.enqueue_wal_data(rel_msg);
1213
1214 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x300, 0, 2));
1216 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1217 16384,
1218 &[Some("1"), Some("Alice")],
1219 ));
1220 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1221 16384,
1222 &[Some("2"), Some("Bob")],
1223 ));
1224 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1225 16384,
1226 &[Some("3"), Some("Charlie")],
1227 ));
1228 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x300, 0x400, 0));
1229
1230 let batch = src.poll_batch(100).await.unwrap().unwrap();
1231 assert_eq!(batch.num_rows(), 3);
1232 }
1233
1234 #[tokio::test]
1237 async fn test_events_buffered_until_commit() {
1238 let mut src = running_source();
1239
1240 let rel_msg = PostgresCdcSource::build_relation_message(
1241 16384,
1242 "public",
1243 "users",
1244 &[(1, "id", INT8_OID, -1)],
1245 );
1246 src.enqueue_wal_data(rel_msg);
1247
1248 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1250 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1251
1252 let result = src.poll_batch(100).await.unwrap();
1254 assert!(result.is_none());
1255
1256 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1258
1259 let batch = src.poll_batch(100).await.unwrap().unwrap();
1260 assert_eq!(batch.num_rows(), 1);
1261 }
1262
1263 #[tokio::test]
1266 async fn test_process_update() {
1267 let mut src = running_source();
1268
1269 let rel_msg = PostgresCdcSource::build_relation_message(
1270 16384,
1271 "public",
1272 "users",
1273 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1274 );
1275 src.enqueue_wal_data(rel_msg);
1276
1277 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1278 src.enqueue_wal_data(PostgresCdcSource::build_update_message(
1279 16384,
1280 &[Some("42"), Some("Alice")],
1281 &[Some("42"), Some("Bob")],
1282 ));
1283 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1284
1285 let batch = src.poll_batch(100).await.unwrap().unwrap();
1286 assert_eq!(batch.num_rows(), 1);
1287
1288 let op_col = batch.records.column(1).as_string::<i32>();
1289 assert_eq!(op_col.value(0), "U");
1290
1291 assert!(!batch.records.column(4).is_null(0)); assert!(!batch.records.column(5).is_null(0)); }
1295
1296 #[tokio::test]
1299 async fn test_process_delete() {
1300 let mut src = running_source();
1301
1302 let rel_msg = PostgresCdcSource::build_relation_message(
1303 16384,
1304 "public",
1305 "users",
1306 &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1307 );
1308 src.enqueue_wal_data(rel_msg);
1309
1310 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1311 src.enqueue_wal_data(PostgresCdcSource::build_delete_message(
1312 16384,
1313 &[Some("42")],
1314 ));
1315 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1316
1317 let batch = src.poll_batch(100).await.unwrap().unwrap();
1318 let op_col = batch.records.column(1).as_string::<i32>();
1319 assert_eq!(op_col.value(0), "D");
1320
1321 assert!(!batch.records.column(4).is_null(0));
1323 assert!(batch.records.column(5).is_null(0));
1324 }
1325
1326 #[tokio::test]
1329 async fn test_table_exclude_filter() {
1330 let mut config = PostgresCdcConfig::default();
1331 config.table_exclude = vec!["users".to_string()];
1332 let mut src = PostgresCdcSource::new(config, None);
1333 src.state = ConnectorState::Running;
1334
1335 let rel_msg = PostgresCdcSource::build_relation_message(
1336 16384,
1337 "public",
1338 "users",
1339 &[(1, "id", INT8_OID, -1)],
1340 );
1341 src.enqueue_wal_data(rel_msg);
1342
1343 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1344 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1345 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1346
1347 let result = src.poll_batch(100).await.unwrap();
1348 assert!(result.is_none()); }
1350
1351 #[tokio::test]
1354 async fn test_max_poll_records() {
1355 let mut src = running_source();
1356
1357 for i in 0..5 {
1359 src.inject_event(ChangeEvent {
1360 table: "t".to_string(),
1361 op: CdcOperation::Insert,
1362 lsn: Lsn::new(i as u64),
1363 ts_ms: 0,
1364 before: None,
1365 after: Some(format!("{{\"id\":\"{i}\"}}")),
1366 });
1367 }
1368
1369 let batch = src.poll_batch(2).await.unwrap().unwrap();
1371 assert_eq!(batch.num_rows(), 2);
1372 assert_eq!(src.buffered_events(), 3);
1373
1374 let batch = src.poll_batch(100).await.unwrap().unwrap();
1376 assert_eq!(batch.num_rows(), 3);
1377 assert_eq!(src.buffered_events(), 0);
1378 }
1379
1380 #[tokio::test]
1383 async fn test_partition_info() {
1384 let mut src = running_source();
1385 src.write_lsn = "1/ABCD".parse().unwrap();
1386
1387 src.inject_event(ChangeEvent {
1388 table: "t".to_string(),
1389 op: CdcOperation::Insert,
1390 lsn: Lsn::ZERO,
1391 ts_ms: 0,
1392 before: None,
1393 after: Some("{}".to_string()),
1394 });
1395
1396 let batch = src.poll_batch(100).await.unwrap().unwrap();
1397 let partition = batch.partition.unwrap();
1398 assert_eq!(partition.id, "laminar_slot");
1399 assert_eq!(partition.offset, "1/ABCD");
1400 }
1401
1402 #[test]
1405 fn test_replication_lag() {
1406 let mut src = default_source();
1407 src.write_lsn = Lsn::new(1000);
1408 src.confirmed_flush_lsn = Lsn::new(500);
1409 assert_eq!(src.replication_lag_bytes(), 500);
1410 }
1411
1412 #[tokio::test]
1415 async fn test_unknown_relation_error() {
1416 let mut src = running_source();
1417
1418 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1420 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(99999, &[Some("1")]));
1421
1422 let result = src.poll_batch(100).await;
1423 assert!(result.is_err());
1424 }
1425
1426 #[tokio::test]
1429 async fn test_multi_table_transaction() {
1430 let mut src = running_source();
1431
1432 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1434 100,
1435 "public",
1436 "users",
1437 &[(1, "id", INT4_OID, -1)],
1438 ));
1439 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1440 200,
1441 "public",
1442 "orders",
1443 &[(1, "order_id", INT4_OID, -1)],
1444 ));
1445
1446 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x500, 0, 5));
1447 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1448 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1449 200,
1450 &[Some("1001")],
1451 ));
1452 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x500, 0x600, 0));
1453
1454 let batch = src.poll_batch(100).await.unwrap().unwrap();
1455 assert_eq!(batch.num_rows(), 2);
1456
1457 let table_col = batch.records.column(0).as_string::<i32>();
1458 assert_eq!(table_col.value(0), "users");
1459 assert_eq!(table_col.value(1), "orders");
1460 }
1461
1462 #[tokio::test]
1465 async fn test_schema_change_mid_stream() {
1466 let mut src = running_source();
1467
1468 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1470 100,
1471 "public",
1472 "users",
1473 &[(1, "id", INT4_OID, -1)],
1474 ));
1475
1476 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1477 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1478 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1479
1480 let batch1 = src.poll_batch(100).await.unwrap().unwrap();
1481 assert_eq!(batch1.num_rows(), 1);
1482
1483 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1485 100,
1486 "public",
1487 "users",
1488 &[(1, "id", INT4_OID, -1), (0, "email", TEXT_OID, -1)],
1489 ));
1490
1491 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x200, 0, 2));
1492 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1493 100,
1494 &[Some("2"), Some("alice@example.com")],
1495 ));
1496 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x200, 0x300, 0));
1497
1498 let batch2 = src.poll_batch(100).await.unwrap().unwrap();
1499 assert_eq!(batch2.num_rows(), 1);
1500
1501 let after_col = batch2.records.column(5).as_string::<i32>();
1503 let json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1504 assert_eq!(json["email"], "alice@example.com");
1505 }
1506
1507 #[tokio::test]
1510 async fn test_write_lsn_advances() {
1511 let mut src = running_source();
1512
1513 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1514 100,
1515 "public",
1516 "t",
1517 &[(1, "id", INT4_OID, -1)],
1518 ));
1519
1520 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1521 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1522 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1523
1524 let _ = src.poll_batch(100).await;
1525 assert_eq!(src.write_lsn().as_u64(), 0x500);
1526 }
1527
1528 #[tokio::test]
1531 async fn test_truncate_returns_error() {
1532 let mut src = running_source();
1533
1534 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1536 16384,
1537 "public",
1538 "users",
1539 &[(1, "id", INT8_OID, -1)],
1540 ));
1541
1542 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1543 src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[16384], 0));
1544
1545 let result = src.poll_batch(100).await;
1546 assert!(result.is_err());
1547 let err = result.unwrap_err().to_string();
1548 assert!(
1549 err.contains("TRUNCATE"),
1550 "error should mention TRUNCATE: {err}"
1551 );
1552 assert!(
1553 err.contains("users"),
1554 "error should mention table name: {err}"
1555 );
1556 }
1557
1558 #[tokio::test]
1559 async fn test_truncate_unknown_relation_uses_oid() {
1560 let mut src = running_source();
1561
1562 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1564 src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[99999], 0));
1565
1566 let result = src.poll_batch(100).await;
1567 assert!(result.is_err());
1568 let err = result.unwrap_err().to_string();
1569 assert!(err.contains("oid:99999"), "error should mention oid: {err}");
1570 }
1571
1572 #[tokio::test]
1575 async fn test_confirmed_lsn_not_advanced_until_checkpoint() {
1576 let mut src = running_source();
1577
1578 src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1579 100,
1580 "public",
1581 "t",
1582 &[(1, "id", INT4_OID, -1)],
1583 ));
1584
1585 src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1586 src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1587 src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1588
1589 assert!(src.confirmed_flush_lsn().is_zero());
1591
1592 let _ = src.poll_batch(100).await.unwrap().unwrap();
1594 assert!(
1595 src.confirmed_flush_lsn().is_zero(),
1596 "confirmed_flush_lsn should not advance on poll, got {}",
1597 src.confirmed_flush_lsn()
1598 );
1599
1600 assert_eq!(src.polled_lsn.as_u64(), 0x500);
1602
1603 let cp = src.checkpoint();
1605 assert_eq!(cp.get_offset("lsn"), Some("0/500"));
1606 }
1607
1608 #[tokio::test]
1611 async fn test_restore_sets_polled_lsn() {
1612 let mut src = default_source();
1613 let mut cp = SourceCheckpoint::new(1);
1614 cp.set_offset("lsn", "2/FF00");
1615 cp.set_offset("write_lsn", "2/FF10");
1616
1617 src.restore(&cp).await.unwrap();
1618 assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1619 assert_eq!(src.polled_lsn.as_u64(), 0x2_0000_FF00);
1620 assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1621 }
1622
1623 #[tokio::test]
1626 async fn test_backpressure_does_not_drop_buffered_events() {
1627 let mut src = running_source();
1628 src.config.max_buffered_events = 100;
1629
1630 for i in 0..200u64 {
1635 src.inject_event(ChangeEvent {
1636 table: "public.t".to_string(),
1637 op: CdcOperation::Insert,
1638 before: None,
1639 after: Some(format!("{{\"id\": {i}}}")),
1640 ts_ms: i as i64,
1641 lsn: Lsn::new(i),
1642 });
1643 }
1644 assert_eq!(src.event_buffer.len(), 200);
1645
1646 let batch = src.poll_batch(50).await.unwrap().unwrap();
1648 assert_eq!(batch.records.num_rows(), 50);
1649 assert_eq!(src.event_buffer.len(), 150);
1651 assert_eq!(src.metrics.events_dropped.get(), 0);
1652 }
1653}