Skip to main content

laminar_connectors/cdc/postgres/
source.rs

1//! `PostgreSQL` CDC source connector implementation.
2//!
3//! Implements [`SourceConnector`] for streaming logical replication changes
4//! from `PostgreSQL` into `LaminarDB` as Arrow `RecordBatch`es.
5//!
6//! # Architecture
7//!
8//! - **Ring 0**: No CDC code — just SPSC channel pop (~5ns)
9//! - **Ring 1**: WAL consumption, pgoutput parsing, Arrow conversion
10//! - **Ring 2**: Slot management, schema discovery, health checks
11
12use arrow_array::RecordBatch;
13use arrow_schema::SchemaRef;
14use async_trait::async_trait;
15use std::collections::VecDeque;
16use std::sync::Arc;
17use tokio::sync::Notify;
18
19use crate::checkpoint::SourceCheckpoint;
20use crate::config::{ConnectorConfig, ConnectorState};
21use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
22use crate::error::ConnectorError;
23
24use super::changelog::{events_to_record_batch, tuple_to_json, CdcOperation, ChangeEvent};
25use super::config::PostgresCdcConfig;
26use super::decoder::{decode_message, WalMessage};
27use super::lsn::Lsn;
28use super::metrics::PostgresCdcMetrics;
29use super::schema::{cdc_envelope_schema, RelationCache, RelationInfo};
30/// `PostgreSQL` CDC source connector.
31///
32/// Streams row-level changes from `PostgreSQL` using logical replication
33/// (`pgoutput` plugin). Changes are emitted as Arrow `RecordBatch`es
34/// in the CDC envelope format.
35///
36/// # Envelope Schema
37///
38/// | Column   | Type          | Nullable | Description                 |
39/// |----------|---------------|----------|-----------------------------|
40/// | `_table` | Utf8          | no       | Schema-qualified table name |
41/// | `_op`    | Utf8          | no       | Operation: I, U, D          |
42/// | `_lsn`   | UInt64        | no       | WAL position                |
43/// | `_ts_ms` | Timestamp(ms) | no       | Commit timestamp            |
44/// | `_before`| Utf8          | yes      | Old row JSON (for U, D)     |
45/// | `_after` | Utf8          | yes      | New row JSON (for I, U)     |
46pub struct PostgresCdcSource {
47    /// Connector configuration.
48    config: PostgresCdcConfig,
49
50    /// Current lifecycle state.
51    state: ConnectorState,
52
53    /// Output schema (CDC envelope).
54    schema: SchemaRef,
55
56    /// Lock-free metrics.
57    metrics: Arc<PostgresCdcMetrics>,
58
59    /// Cached relation (table) schemas from Relation messages.
60    relation_cache: RelationCache,
61
62    /// Buffered change events awaiting `poll_batch()`.
63    event_buffer: VecDeque<ChangeEvent>,
64
65    /// Current transaction state.
66    current_txn: Option<TransactionState>,
67
68    /// Confirmed flush LSN (last acknowledged position).
69    confirmed_flush_lsn: Lsn,
70
71    /// Write LSN (latest position received from server).
72    write_lsn: Lsn,
73
74    /// Polled LSN — tracks the latest position drained into a batch.
75    /// Decoupled from `confirmed_flush_lsn` so the PG replication slot
76    /// is only advanced when the pipeline actually checkpoints.
77    polled_lsn: Lsn,
78
79    /// Pending WAL messages to process (for testing and batch processing).
80    pending_messages: VecDeque<Vec<u8>>,
81
82    /// Notification handle signalled when WAL data arrives from the reader task.
83    data_ready: Arc<Notify>,
84
85    /// Background connection task handle (feature-gated).
86    #[cfg(feature = "postgres-cdc")]
87    connection_handle: Option<tokio::task::JoinHandle<()>>,
88
89    /// Channel receiver for WAL events from the background reader task.
90    #[cfg(feature = "postgres-cdc")]
91    wal_rx: Option<WalPayloadRx>,
92
93    /// Background WAL reader task handle.
94    #[cfg(feature = "postgres-cdc")]
95    reader_handle: Option<tokio::task::JoinHandle<()>>,
96
97    /// Shutdown signal for the background reader task.
98    #[cfg(feature = "postgres-cdc")]
99    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
100
101    /// Sender for feeding confirmed flush LSN back to the reader task.
102    /// The reader uses this to call `update_applied_lsn` only for
103    /// durably-checkpointed positions (prevents at-least-once violation).
104    #[cfg(feature = "postgres-cdc")]
105    confirmed_lsn_tx: Option<tokio::sync::watch::Sender<u64>>,
106}
107
108/// In-progress transaction state.
109#[derive(Debug, Clone)]
110struct TransactionState {
111    /// Final LSN of the transaction.
112    final_lsn: Lsn,
113    /// Commit timestamp in milliseconds.
114    commit_ts_ms: i64,
115    /// Change events accumulated in this transaction.
116    events: Vec<ChangeEvent>,
117}
118
119/// Single-consumer async receiver for the WAL reader → `poll_batch` queue.
120#[cfg(feature = "postgres-cdc")]
121type WalPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<WalPayload>>;
122
123/// WAL event payload sent from the background reader task to [`PostgresCdcSource::poll_batch`].
124#[allow(dead_code)] // constructed + consumed only with feature = "postgres-cdc"
125enum WalPayload {
126    Begin {
127        final_lsn: u64,
128        commit_ts_us: i64,
129        xid: u32,
130    },
131    Commit {
132        end_lsn: u64,
133        commit_ts_us: i64,
134        lsn: u64,
135    },
136    XLogData {
137        wal_end: u64,
138        data: Vec<u8>,
139    },
140    KeepAlive {
141        wal_end: u64,
142    },
143    /// Fatal error from the reader task (e.g. reconnect exhaustion).
144    Error(String),
145}
146
147impl PostgresCdcSource {
148    /// Creates a new `PostgreSQL` CDC source with the given configuration.
149    #[must_use]
150    pub fn new(config: PostgresCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
151        Self {
152            config,
153            state: ConnectorState::Created,
154            schema: cdc_envelope_schema(),
155            metrics: Arc::new(PostgresCdcMetrics::new(registry)),
156            relation_cache: RelationCache::new(),
157            event_buffer: VecDeque::new(),
158            current_txn: None,
159            confirmed_flush_lsn: Lsn::ZERO,
160            write_lsn: Lsn::ZERO,
161            polled_lsn: Lsn::ZERO,
162            pending_messages: VecDeque::new(),
163            data_ready: Arc::new(Notify::new()),
164            #[cfg(feature = "postgres-cdc")]
165            connection_handle: None,
166            #[cfg(feature = "postgres-cdc")]
167            wal_rx: None,
168            #[cfg(feature = "postgres-cdc")]
169            reader_handle: None,
170            #[cfg(feature = "postgres-cdc")]
171            reader_shutdown: None,
172            #[cfg(feature = "postgres-cdc")]
173            confirmed_lsn_tx: None,
174        }
175    }
176
177    /// Creates a new source from a generic [`ConnectorConfig`].
178    ///
179    /// # Errors
180    ///
181    /// Returns `ConnectorError` if the configuration is invalid.
182    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
183        let pg_config = PostgresCdcConfig::from_config(config)?;
184        Ok(Self::new(pg_config, None))
185    }
186
187    /// Returns a reference to the CDC configuration.
188    #[must_use]
189    pub fn config(&self) -> &PostgresCdcConfig {
190        &self.config
191    }
192
193    /// Returns the current confirmed flush LSN.
194    #[must_use]
195    pub fn confirmed_flush_lsn(&self) -> Lsn {
196        self.confirmed_flush_lsn
197    }
198
199    /// Returns the current write LSN.
200    #[must_use]
201    pub fn write_lsn(&self) -> Lsn {
202        self.write_lsn
203    }
204
205    /// Returns the current replication lag in bytes.
206    #[must_use]
207    pub fn replication_lag_bytes(&self) -> u64 {
208        self.write_lsn.diff(self.confirmed_flush_lsn)
209    }
210
211    /// Returns a reference to the relation cache.
212    #[must_use]
213    pub fn relation_cache(&self) -> &RelationCache {
214        &self.relation_cache
215    }
216
217    /// Returns the number of buffered events.
218    #[must_use]
219    pub fn buffered_events(&self) -> usize {
220        self.event_buffer.len()
221    }
222
223    /// Enqueues raw WAL message bytes for processing.
224    ///
225    /// Used by the replication stream handler to feed data into the source,
226    /// and by tests to inject synthetic messages.
227    pub fn enqueue_wal_data(&mut self, data: Vec<u8>) {
228        self.pending_messages.push_back(data);
229    }
230
231    /// Processes all pending WAL messages, converting them to change events.
232    ///
233    /// # Errors
234    ///
235    /// Returns `ConnectorError::ReadError` if decoding fails.
236    pub fn process_pending_messages(&mut self) -> Result<(), ConnectorError> {
237        while let Some(data) = self.pending_messages.pop_front() {
238            self.metrics.record_bytes(data.len() as u64);
239            let msg = decode_message(&data)
240                .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
241            self.process_wal_message(msg)?;
242        }
243        Ok(())
244    }
245
246    /// Processes a single decoded WAL message.
247    fn process_wal_message(&mut self, msg: WalMessage) -> Result<(), ConnectorError> {
248        match msg {
249            WalMessage::Begin(begin) => {
250                self.current_txn = Some(TransactionState {
251                    final_lsn: begin.final_lsn,
252                    commit_ts_ms: begin.commit_ts_ms,
253                    events: Vec::new(),
254                });
255            }
256            WalMessage::Commit(commit) => {
257                if let Some(txn) = self.current_txn.take() {
258                    self.event_buffer.extend(txn.events);
259                    self.write_lsn = commit.end_lsn;
260                    self.metrics.record_transaction();
261                    self.metrics
262                        .set_replication_lag_bytes(self.replication_lag_bytes());
263                }
264            }
265            WalMessage::Relation(rel) => {
266                let info = RelationInfo {
267                    relation_id: rel.relation_id,
268                    namespace: rel.namespace,
269                    name: rel.name,
270                    replica_identity: rel.replica_identity as char,
271                    columns: rel.columns,
272                };
273                self.relation_cache.insert(info);
274            }
275            WalMessage::Insert(ins) => {
276                self.process_insert(ins.relation_id, &ins.new_tuple)?;
277            }
278            WalMessage::Update(upd) => {
279                self.process_update(upd.relation_id, upd.old_tuple.as_ref(), &upd.new_tuple)?;
280            }
281            WalMessage::Delete(del) => {
282                self.process_delete(del.relation_id, &del.old_tuple)?;
283            }
284            WalMessage::Truncate(trunc) => {
285                let table_names: Vec<String> = trunc
286                    .relation_ids
287                    .iter()
288                    .map(|id| {
289                        self.relation_cache
290                            .get(*id)
291                            .map_or_else(|| format!("oid:{id}"), RelationInfo::full_name)
292                    })
293                    .collect();
294                return Err(ConnectorError::ReadError(format!(
295                    "TRUNCATE received on table(s): {}. \
296                     Cannot produce retraction events — restart the pipeline with a fresh snapshot.",
297                    table_names.join(", ")
298                )));
299            }
300            WalMessage::Origin(_) | WalMessage::Type(_) => {
301                // Origin and Type messages are noted but don't
302                // produce change events in the current implementation.
303            }
304        }
305        Ok(())
306    }
307
308    fn process_insert(
309        &mut self,
310        relation_id: u32,
311        new_tuple: &super::decoder::TupleData,
312    ) -> Result<(), ConnectorError> {
313        let relation = self.require_relation(relation_id)?;
314        let table = relation.full_name();
315
316        if !self.config.should_include_table(&table) {
317            return Ok(());
318        }
319
320        let after_json = tuple_to_json(new_tuple, relation);
321        let (lsn, ts_ms) = self.current_txn_context();
322
323        let event = ChangeEvent {
324            table,
325            op: CdcOperation::Insert,
326            lsn,
327            ts_ms,
328            before: None,
329            after: Some(after_json),
330        };
331
332        self.push_event(event);
333        self.metrics.record_insert();
334        Ok(())
335    }
336
337    fn process_update(
338        &mut self,
339        relation_id: u32,
340        old_tuple: Option<&super::decoder::TupleData>,
341        new_tuple: &super::decoder::TupleData,
342    ) -> Result<(), ConnectorError> {
343        let relation = self.require_relation(relation_id)?;
344        let table = relation.full_name();
345
346        if !self.config.should_include_table(&table) {
347            return Ok(());
348        }
349
350        let before_json = old_tuple.map(|t| tuple_to_json(t, relation));
351        let after_json = tuple_to_json(new_tuple, relation);
352        let (lsn, ts_ms) = self.current_txn_context();
353
354        let event = ChangeEvent {
355            table,
356            op: CdcOperation::Update,
357            lsn,
358            ts_ms,
359            before: before_json,
360            after: Some(after_json),
361        };
362
363        self.push_event(event);
364        self.metrics.record_update();
365        Ok(())
366    }
367
368    fn process_delete(
369        &mut self,
370        relation_id: u32,
371        old_tuple: &super::decoder::TupleData,
372    ) -> Result<(), ConnectorError> {
373        let relation = self.require_relation(relation_id)?;
374        let table = relation.full_name();
375
376        if !self.config.should_include_table(&table) {
377            return Ok(());
378        }
379
380        let before_json = tuple_to_json(old_tuple, relation);
381        let (lsn, ts_ms) = self.current_txn_context();
382
383        let event = ChangeEvent {
384            table,
385            op: CdcOperation::Delete,
386            lsn,
387            ts_ms,
388            before: Some(before_json),
389            after: None,
390        };
391
392        self.push_event(event);
393        self.metrics.record_delete();
394        Ok(())
395    }
396
397    /// Looks up a relation by ID, returning a reference (no clone).
398    ///
399    /// The caller must extract all needed data (table name, JSON) from
400    /// the reference before calling `push_event` or other `&mut self`
401    /// methods (Rust's borrow rules require disjoint access).
402    fn require_relation(&self, relation_id: u32) -> Result<&RelationInfo, ConnectorError> {
403        self.relation_cache.get(relation_id).ok_or_else(|| {
404            ConnectorError::ReadError(format!(
405                "unknown relation ID {relation_id} (no Relation message received yet)"
406            ))
407        })
408    }
409
410    fn current_txn_context(&self) -> (Lsn, i64) {
411        match &self.current_txn {
412            Some(txn) => (txn.final_lsn, txn.commit_ts_ms),
413            None => (self.write_lsn, 0),
414        }
415    }
416
417    fn push_event(&mut self, event: ChangeEvent) {
418        if let Some(txn) = &mut self.current_txn {
419            txn.events.push(event);
420        } else {
421            self.event_buffer.push_back(event);
422        }
423    }
424
425    /// Processes a [`WalPayload`] received from the background reader task.
426    #[cfg(feature = "postgres-cdc")]
427    fn process_wal_payload(&mut self, payload: WalPayload) -> Result<(), ConnectorError> {
428        use super::decoder::pg_timestamp_to_unix_ms;
429
430        match payload {
431            WalPayload::Begin {
432                final_lsn,
433                commit_ts_us,
434                xid,
435            } => {
436                let begin = super::decoder::BeginMessage {
437                    final_lsn: Lsn::new(final_lsn),
438                    commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
439                    xid,
440                };
441                self.process_wal_message(WalMessage::Begin(begin))
442            }
443            WalPayload::Commit {
444                end_lsn,
445                commit_ts_us,
446                lsn,
447            } => {
448                let commit = super::decoder::CommitMessage {
449                    flags: 0,
450                    commit_lsn: Lsn::new(lsn),
451                    end_lsn: Lsn::new(end_lsn),
452                    commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
453                };
454                self.process_wal_message(WalMessage::Commit(commit))
455            }
456            WalPayload::XLogData { wal_end, data } => {
457                // Skip raw Begin/Commit bytes to avoid double-processing
458                // (pgwire-replication delivers these as separate events).
459                if !data.is_empty() && (data[0] == b'B' || data[0] == b'C') {
460                    self.write_lsn = Lsn::new(wal_end);
461                    return Ok(());
462                }
463                let msg = decode_message(&data)
464                    .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
465                self.process_wal_message(msg)?;
466                self.write_lsn = Lsn::new(wal_end);
467                Ok(())
468            }
469            WalPayload::KeepAlive { wal_end } => {
470                self.write_lsn = Lsn::new(wal_end);
471                Ok(())
472            }
473            WalPayload::Error(msg) => Err(ConnectorError::ReadError(msg)),
474        }
475    }
476
477    /// Drains up to `max` events from the buffer and converts to a `RecordBatch`.
478    fn drain_events(&mut self, max: usize) -> Result<Option<RecordBatch>, ConnectorError> {
479        if self.event_buffer.is_empty() {
480            return Ok(None);
481        }
482
483        let count = max.min(self.event_buffer.len());
484        let events: Vec<ChangeEvent> = self.event_buffer.drain(..count).collect();
485
486        let batch = events_to_record_batch(&events)
487            .map_err(|e| ConnectorError::Internal(format!("Arrow batch build: {e}")))?;
488
489        self.metrics.record_batch();
490        Ok(Some(batch))
491    }
492}
493
494#[async_trait]
495#[allow(clippy::too_many_lines)]
496impl SourceConnector for PostgresCdcSource {
497    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
498        self.state = ConnectorState::Initializing;
499
500        // If config has properties, re-parse (supports runtime config via SQL WITH).
501        if !config.properties().is_empty() {
502            self.config = PostgresCdcConfig::from_config(config)?;
503        }
504
505        // Set start LSN if configured
506        if let Some(lsn) = self.config.start_lsn {
507            self.confirmed_flush_lsn = lsn;
508            self.write_lsn = lsn;
509            self.polled_lsn = lsn;
510        }
511
512        // Without postgres-cdc feature, open() must fail loudly to prevent
513        // silent data loss (poll_batch would return Ok(None) forever).
514        #[cfg(not(feature = "postgres-cdc"))]
515        {
516            return Err(ConnectorError::ConfigurationError(
517                "PostgreSQL CDC source requires the `postgres-cdc` feature flag. \
518                 Rebuild with `--features postgres-cdc` to enable."
519                    .to_string(),
520            ));
521        }
522
523        #[cfg(all(feature = "postgres-cdc", not(test)))]
524        {
525            use super::postgres_io;
526
527            // 1. Connect control-plane for slot management
528            let (client, handle) = postgres_io::connect(&self.config).await?;
529            self.connection_handle = Some(handle);
530
531            // 2. Ensure replication slot exists
532            let slot_lsn = postgres_io::ensure_replication_slot(
533                &client,
534                &self.config.slot_name,
535                &self.config.output_plugin,
536            )
537            .await?;
538
539            // Use slot's confirmed_flush_lsn if no explicit start LSN
540            if self.config.start_lsn.is_none() {
541                if let Some(lsn) = slot_lsn {
542                    self.confirmed_flush_lsn = lsn;
543                    self.write_lsn = lsn;
544                    self.polled_lsn = lsn;
545                }
546            }
547
548            // 3. Build pgwire-replication config and start WAL streaming
549            let mut repl_config = postgres_io::build_replication_config(&self.config);
550            // If we resolved a slot LSN, override start_lsn so we resume correctly
551            if self.confirmed_flush_lsn != Lsn::ZERO {
552                repl_config.start_lsn =
553                    pgwire_replication::Lsn::from_u64(self.confirmed_flush_lsn.as_u64());
554            }
555
556            let repl_client = pgwire_replication::ReplicationClient::connect(repl_config)
557                .await
558                .map_err(|e| {
559                    ConnectorError::ConnectionFailed(format!("pgwire-replication connect: {e}"))
560                })?;
561
562            // Spawn background reader task for event-driven wake-up.
563            let (wal_tx, wal_rx) = crossfire::mpsc::bounded_async::<WalPayload>(4096);
564            let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
565            let (confirmed_lsn_tx, mut confirmed_lsn_rx) =
566                tokio::sync::watch::channel(self.confirmed_flush_lsn.as_u64());
567            let data_ready = Arc::clone(&self.data_ready);
568            let reader_config = self.config.clone();
569
570            let reader_handle = tokio::spawn(async move {
571                const MAX_FAILURES: u32 = 10;
572                let mut repl_client = repl_client;
573                let mut consecutive_failures: u32 = 0;
574
575                'reconnect: loop {
576                    // Inner recv loop — processes events from the current connection.
577                    'recv: loop {
578                        // Feed back confirmed LSN to PG (non-blocking check).
579                        // Only positions that the main thread has checkpointed
580                        // are reported as "applied", preserving at-least-once.
581                        if confirmed_lsn_rx.has_changed().unwrap_or(false) {
582                            let confirmed = *confirmed_lsn_rx.borrow_and_update();
583                            if confirmed > 0 {
584                                repl_client.update_applied_lsn(pgwire_replication::Lsn::from_u64(
585                                    confirmed,
586                                ));
587                            }
588                        }
589
590                        tokio::select! {
591                            biased;
592                            _ = shutdown_rx.changed() => break 'reconnect,
593                            event = repl_client.recv() => {
594                                match event {
595                                    Ok(Some(event)) => {
596                                        consecutive_failures = 0;
597                                        let payload = match &event {
598                                            pgwire_replication::ReplicationEvent::Begin {
599                                                final_lsn,
600                                                xid,
601                                                commit_time_micros,
602                                            } => Some(WalPayload::Begin {
603                                                final_lsn: final_lsn.as_u64(),
604                                                commit_ts_us: *commit_time_micros,
605                                                xid: *xid,
606                                            }),
607                                            pgwire_replication::ReplicationEvent::Commit {
608                                                end_lsn,
609                                                commit_time_micros,
610                                                lsn,
611                                            } => Some(WalPayload::Commit {
612                                                end_lsn: end_lsn.as_u64(),
613                                                commit_ts_us: *commit_time_micros,
614                                                lsn: lsn.as_u64(),
615                                            }),
616                                            pgwire_replication::ReplicationEvent::XLogData {
617                                                wal_end,
618                                                data,
619                                                ..
620                                            } => Some(WalPayload::XLogData {
621                                                wal_end: wal_end.as_u64(),
622                                                data: data.to_vec(),
623                                            }),
624                                            pgwire_replication::ReplicationEvent::KeepAlive {
625                                                wal_end,
626                                                ..
627                                            } => Some(WalPayload::KeepAlive {
628                                                wal_end: wal_end.as_u64(),
629                                            }),
630                                            _ => None,
631                                        };
632                                        if let Some(p) = payload {
633                                            if wal_tx.send(p).await.is_err() {
634                                                break 'reconnect;
635                                            }
636                                            data_ready.notify_one();
637                                        }
638                                    }
639                                    Ok(None) => break 'recv,
640                                    Err(e) => {
641                                        tracing::error!(error = %e, "WAL reader error");
642                                        break 'recv;
643                                    }
644                                }
645                            }
646                        }
647                    }
648
649                    // Shut down old client before reconnecting.
650                    let _ = repl_client.shutdown().await;
651                    consecutive_failures += 1;
652
653                    if consecutive_failures >= MAX_FAILURES {
654                        tracing::error!(
655                            failures = consecutive_failures,
656                            "WAL reader exhausted reconnect attempts"
657                        );
658                        let _ = wal_tx.send(WalPayload::Error(format!(
659                            "WAL reader failed after {consecutive_failures} consecutive reconnect attempts"
660                        ))).await;
661                        data_ready.notify_one();
662                        break 'reconnect;
663                    }
664
665                    // Jittered exponential backoff: capped at 30 s and
666                    // shift-overflow-safe even if MAX_FAILURES is raised
667                    // past 63 in a future refactor.
668                    let backoff =
669                        crate::retry::Backoff::broker_reconnect().delay(consecutive_failures);
670                    tracing::warn!(
671                        attempt = consecutive_failures,
672                        ?backoff,
673                        "WAL reader reconnecting"
674                    );
675                    tokio::select! {
676                        biased;
677                        _ = shutdown_rx.changed() => break 'reconnect,
678                        () = tokio::time::sleep(backoff) => {}
679                    }
680
681                    // Rebuild replication config with resume LSN from checkpoint.
682                    let resume_lsn = *confirmed_lsn_rx.borrow();
683                    let mut new_config = postgres_io::build_replication_config(&reader_config);
684                    if resume_lsn > 0 {
685                        new_config.start_lsn = pgwire_replication::Lsn::from_u64(resume_lsn);
686                    }
687
688                    match pgwire_replication::ReplicationClient::connect(new_config).await {
689                        Ok(new_client) => {
690                            tracing::info!("WAL reader reconnected");
691                            repl_client = new_client;
692                        }
693                        Err(e) => {
694                            tracing::error!(error = %e, "WAL reader reconnect failed");
695                        }
696                    }
697                }
698            });
699
700            self.wal_rx = Some(wal_rx);
701            self.reader_handle = Some(reader_handle);
702            self.reader_shutdown = Some(shutdown_tx);
703            self.confirmed_lsn_tx = Some(confirmed_lsn_tx);
704        }
705
706        self.state = ConnectorState::Running;
707        Ok(())
708    }
709
710    async fn poll_batch(
711        &mut self,
712        max_records: usize,
713    ) -> Result<Option<SourceBatch>, ConnectorError> {
714        if self.state != ConnectorState::Running {
715            return Err(ConnectorError::InvalidState {
716                expected: "Running".to_string(),
717                actual: self.state.to_string(),
718            });
719        }
720
721        // Drain WAL events from background reader task.
722        // Collect into a temp vec first to avoid holding a mutable
723        // borrow on self.wal_rx while calling self.process_wal_payload().
724        //
725        // Backpressure: when the event buffer exceeds the high watermark,
726        // stop draining the reader channel. The bounded mpsc channel (4096)
727        // propagates backpressure to the WAL reader task, which in turn
728        // applies TCP backpressure to the replication slot. This prevents
729        // data loss from dropping events.
730        #[cfg(feature = "postgres-cdc")]
731        {
732            let high_watermark = self.config.backpressure_high_watermark();
733            let mut payloads = Vec::new();
734            let mut reader_closed = false;
735
736            if self.event_buffer.len() < high_watermark {
737                if let Some(ref mut rx) = self.wal_rx {
738                    while payloads.len() + self.event_buffer.len() < max_records
739                        && self.event_buffer.len() + payloads.len() < high_watermark
740                    {
741                        match rx.try_recv() {
742                            Ok(payload) => payloads.push(payload),
743                            Err(crossfire::TryRecvError::Empty) => break,
744                            Err(crossfire::TryRecvError::Disconnected) => {
745                                reader_closed = true;
746                                break;
747                            }
748                        }
749                    }
750                }
751            } else {
752                tracing::debug!(
753                    buffered = self.event_buffer.len(),
754                    high_watermark,
755                    "CDC backpressure active — pausing WAL reader drain"
756                );
757            }
758
759            for payload in payloads {
760                self.process_wal_payload(payload)?;
761            }
762            if reader_closed && self.event_buffer.is_empty() {
763                self.state = ConnectorState::Failed;
764                return Err(ConnectorError::ReadError(
765                    "WAL reader task terminated unexpectedly — replication stream lost".to_string(),
766                ));
767            }
768        }
769
770        // Process any pending WAL messages (test injection path)
771        self.process_pending_messages()?;
772
773        // Drain buffered events into a RecordBatch.
774        // Watermark advancement: the batch contains `_ts_ms` (commit timestamp)
775        // which downstream pipeline watermark extractors should use. The LSN
776        // in PartitionInfo tracks replication progress for offset management.
777        // TODO: extract max(_ts_ms) and expose as source-level watermark for
778        // windowed aggregations that depend on CDC event time.
779        match self.drain_events(max_records)? {
780            Some(batch) => {
781                // Advance polled_lsn (NOT confirmed_flush_lsn). PG slot
782                // feedback happens in checkpoint(), not here.
783                if self.event_buffer.is_empty() {
784                    self.polled_lsn = self.write_lsn;
785                }
786                self.metrics
787                    .set_confirmed_flush_lsn(self.confirmed_flush_lsn.as_u64());
788                self.metrics
789                    .set_replication_lag_bytes(self.replication_lag_bytes());
790
791                let lsn_str = self.write_lsn.to_string();
792                let partition = PartitionInfo::new(&self.config.slot_name, lsn_str);
793                Ok(Some(SourceBatch::with_partition(batch, partition)))
794            }
795            None => Ok(None),
796        }
797    }
798
799    fn schema(&self) -> SchemaRef {
800        Arc::clone(&self.schema)
801    }
802
803    fn checkpoint(&self) -> SourceCheckpoint {
804        let mut cp = SourceCheckpoint::new(0);
805        // polled_lsn = latest position drained into a batch. The PG
806        // slot is only advanced here (not in poll_batch) to prevent
807        // data loss on crash.
808        cp.set_offset("lsn", self.polled_lsn.to_string());
809        cp.set_offset("write_lsn", self.write_lsn.to_string());
810        cp.set_metadata("slot_name", &self.config.slot_name);
811        cp.set_metadata("publication", &self.config.publication);
812
813        // Feed polled LSN to reader task so PG can reclaim WAL.
814        // watch::Sender::send is &self, so this works from checkpoint().
815        #[cfg(feature = "postgres-cdc")]
816        if let Some(ref tx) = self.confirmed_lsn_tx {
817            let _ = tx.send(self.polled_lsn.as_u64());
818        }
819
820        cp
821    }
822
823    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
824        if let Some(lsn_str) = checkpoint.get_offset("lsn") {
825            let lsn: Lsn = lsn_str
826                .parse()
827                .map_err(|e| ConnectorError::Internal(format!("invalid LSN in checkpoint: {e}")))?;
828            self.confirmed_flush_lsn = lsn;
829            self.polled_lsn = lsn;
830            self.metrics.set_confirmed_flush_lsn(lsn.as_u64());
831        }
832        if let Some(write_lsn_str) = checkpoint.get_offset("write_lsn") {
833            if let Ok(lsn) = write_lsn_str.parse::<Lsn>() {
834                self.write_lsn = lsn;
835            }
836        }
837        Ok(())
838    }
839
840    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
841        Some(Arc::clone(&self.data_ready))
842    }
843
844    async fn close(&mut self) -> Result<(), ConnectorError> {
845        // Signal reader task to shut down (it calls repl_client.shutdown() internally).
846        #[cfg(feature = "postgres-cdc")]
847        {
848            if let Some(tx) = self.reader_shutdown.take() {
849                let _ = tx.send(true);
850            }
851            if let Some(handle) = self.reader_handle.take() {
852                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
853            }
854            self.wal_rx = None;
855            self.confirmed_lsn_tx = None;
856        }
857
858        // Await the control-plane connection task briefly so it can
859        // finish any in-flight work before we abort.
860        #[cfg(feature = "postgres-cdc")]
861        if let Some(handle) = self.connection_handle.take() {
862            let abort = handle.abort_handle();
863            match tokio::time::timeout(std::time::Duration::from_secs(2), handle).await {
864                Ok(Ok(())) => {}
865                Ok(Err(join_err)) => {
866                    tracing::warn!(
867                        error = %join_err,
868                        "[postgres-cdc] control-plane task exited with error"
869                    );
870                }
871                Err(_elapsed) => {
872                    tracing::warn!(
873                        "[postgres-cdc] control-plane task did not exit within 2s; aborting"
874                    );
875                    abort.abort();
876                }
877            }
878        }
879
880        self.state = ConnectorState::Closed;
881        self.event_buffer.clear();
882        self.pending_messages.clear();
883        Ok(())
884    }
885}
886
887// ── Test helpers ──
888
889#[cfg(test)]
890impl PostgresCdcSource {
891    /// Injects a pre-built change event directly into the event buffer.
892    fn inject_event(&mut self, event: ChangeEvent) {
893        self.event_buffer.push_back(event);
894    }
895
896    /// Builds a binary pgoutput Relation message for testing.
897    fn build_relation_message(
898        relation_id: u32,
899        namespace: &str,
900        name: &str,
901        columns: &[(u8, &str, u32, i32)], // (flags, name, oid, modifier)
902    ) -> Vec<u8> {
903        let mut buf = vec![b'R'];
904        buf.extend_from_slice(&relation_id.to_be_bytes());
905        buf.extend_from_slice(namespace.as_bytes());
906        buf.push(0);
907        buf.extend_from_slice(name.as_bytes());
908        buf.push(0);
909        buf.push(b'd'); // replica identity = default
910        buf.extend_from_slice(&(columns.len() as i16).to_be_bytes());
911        for (flags, col_name, oid, modifier) in columns {
912            buf.push(*flags);
913            buf.extend_from_slice(col_name.as_bytes());
914            buf.push(0);
915            buf.extend_from_slice(&oid.to_be_bytes());
916            buf.extend_from_slice(&modifier.to_be_bytes());
917        }
918        buf
919    }
920
921    /// Builds a binary pgoutput Begin message for testing.
922    fn build_begin_message(final_lsn: u64, commit_ts_us: i64, xid: u32) -> Vec<u8> {
923        let mut buf = vec![b'B'];
924        buf.extend_from_slice(&final_lsn.to_be_bytes());
925        buf.extend_from_slice(&commit_ts_us.to_be_bytes());
926        buf.extend_from_slice(&xid.to_be_bytes());
927        buf
928    }
929
930    /// Builds a binary pgoutput Commit message for testing.
931    fn build_commit_message(commit_lsn: u64, end_lsn: u64, commit_ts_us: i64) -> Vec<u8> {
932        let mut buf = vec![b'C'];
933        buf.push(0); // flags
934        buf.extend_from_slice(&commit_lsn.to_be_bytes());
935        buf.extend_from_slice(&end_lsn.to_be_bytes());
936        buf.extend_from_slice(&commit_ts_us.to_be_bytes());
937        buf
938    }
939
940    /// Builds a binary pgoutput Insert message for testing.
941    fn build_insert_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
942        let mut buf = vec![b'I'];
943        buf.extend_from_slice(&relation_id.to_be_bytes());
944        buf.push(b'N');
945        buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
946        for val in values {
947            match val {
948                Some(s) => {
949                    buf.push(b't');
950                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
951                    buf.extend_from_slice(s.as_bytes());
952                }
953                None => buf.push(b'n'),
954            }
955        }
956        buf
957    }
958
959    /// Builds a binary pgoutput Delete message for testing.
960    fn build_delete_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
961        let mut buf = vec![b'D'];
962        buf.extend_from_slice(&relation_id.to_be_bytes());
963        buf.push(b'K'); // key identity
964        buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
965        for val in values {
966            match val {
967                Some(s) => {
968                    buf.push(b't');
969                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
970                    buf.extend_from_slice(s.as_bytes());
971                }
972                None => buf.push(b'n'),
973            }
974        }
975        buf
976    }
977
978    /// Builds a binary pgoutput Truncate message for testing.
979    fn build_truncate_message(relation_ids: &[u32], options: u8) -> Vec<u8> {
980        let mut buf = vec![b'T'];
981        buf.extend_from_slice(&(relation_ids.len() as i32).to_be_bytes());
982        buf.push(options);
983        for id in relation_ids {
984            buf.extend_from_slice(&id.to_be_bytes());
985        }
986        buf
987    }
988
989    /// Builds a binary pgoutput Update message (with old tuple) for testing.
990    fn build_update_message(
991        relation_id: u32,
992        old_values: &[Option<&str>],
993        new_values: &[Option<&str>],
994    ) -> Vec<u8> {
995        let mut buf = vec![b'U'];
996        buf.extend_from_slice(&relation_id.to_be_bytes());
997        // Old tuple with 'O' tag (REPLICA IDENTITY FULL)
998        buf.push(b'O');
999        buf.extend_from_slice(&(old_values.len() as i16).to_be_bytes());
1000        for val in old_values {
1001            match val {
1002                Some(s) => {
1003                    buf.push(b't');
1004                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1005                    buf.extend_from_slice(s.as_bytes());
1006                }
1007                None => buf.push(b'n'),
1008            }
1009        }
1010        // New tuple
1011        buf.push(b'N');
1012        buf.extend_from_slice(&(new_values.len() as i16).to_be_bytes());
1013        for val in new_values {
1014            match val {
1015                Some(s) => {
1016                    buf.push(b't');
1017                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1018                    buf.extend_from_slice(s.as_bytes());
1019                }
1020                None => buf.push(b'n'),
1021            }
1022        }
1023        buf
1024    }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use super::*;
1030    use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
1031    use arrow_array::cast::AsArray;
1032
1033    fn default_source() -> PostgresCdcSource {
1034        PostgresCdcSource::new(PostgresCdcConfig::default(), None)
1035    }
1036
1037    fn running_source() -> PostgresCdcSource {
1038        let mut src = default_source();
1039        src.state = ConnectorState::Running;
1040        src
1041    }
1042
1043    // ── Construction ──
1044
1045    #[test]
1046    fn test_new_source() {
1047        let src = default_source();
1048        assert_eq!(src.state, ConnectorState::Created);
1049        assert!(src.confirmed_flush_lsn.is_zero());
1050        assert_eq!(src.event_buffer.len(), 0);
1051        assert_eq!(src.schema().fields().len(), 6);
1052    }
1053
1054    #[test]
1055    fn test_from_config() {
1056        let mut config = ConnectorConfig::new("postgres-cdc");
1057        config.set("host", "pg.local");
1058        config.set("database", "testdb");
1059        config.set("slot.name", "my_slot");
1060        config.set("publication", "my_pub");
1061
1062        let src = PostgresCdcSource::from_config(&config).unwrap();
1063        assert_eq!(src.config().host, "pg.local");
1064        assert_eq!(src.config().database, "testdb");
1065    }
1066
1067    #[test]
1068    fn test_from_config_invalid() {
1069        let config = ConnectorConfig::new("postgres-cdc");
1070        assert!(PostgresCdcSource::from_config(&config).is_err());
1071    }
1072
1073    // ── Lifecycle ──
1074
1075    // Without postgres-cdc feature, open() must return an error.
1076    #[cfg(not(feature = "postgres-cdc"))]
1077    #[tokio::test]
1078    async fn test_open_fails_without_feature() {
1079        let mut src = default_source();
1080        let config = ConnectorConfig::new("postgres-cdc");
1081        let result = src.open(&config).await;
1082        assert!(result.is_err());
1083        let err = result.unwrap_err().to_string();
1084        assert!(
1085            err.contains("postgres-cdc"),
1086            "error should mention feature flag: {err}"
1087        );
1088    }
1089
1090    #[tokio::test]
1091    async fn test_close() {
1092        let mut src = running_source();
1093        src.inject_event(ChangeEvent {
1094            table: "t".to_string(),
1095            op: CdcOperation::Insert,
1096            lsn: Lsn::ZERO,
1097            ts_ms: 0,
1098            before: None,
1099            after: Some("{}".to_string()),
1100        });
1101
1102        src.close().await.unwrap();
1103        assert_eq!(src.state, ConnectorState::Closed);
1104        assert_eq!(src.event_buffer.len(), 0);
1105    }
1106
1107    // ── Checkpoint / Restore ──
1108
1109    #[test]
1110    fn test_checkpoint() {
1111        let mut src = default_source();
1112        src.confirmed_flush_lsn = "1/ABCD".parse().unwrap();
1113        src.polled_lsn = "1/ABCD".parse().unwrap();
1114        src.write_lsn = "1/ABCE".parse().unwrap();
1115
1116        let cp = src.checkpoint();
1117        assert_eq!(cp.get_offset("lsn"), Some("1/ABCD"));
1118        assert_eq!(cp.get_offset("write_lsn"), Some("1/ABCE"));
1119        assert_eq!(cp.get_metadata("slot_name"), Some("laminar_slot"));
1120    }
1121
1122    #[tokio::test]
1123    async fn test_restore() {
1124        let mut src = default_source();
1125        let mut cp = SourceCheckpoint::new(1);
1126        cp.set_offset("lsn", "2/FF00");
1127        cp.set_offset("write_lsn", "2/FF10");
1128
1129        src.restore(&cp).await.unwrap();
1130        assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1131        assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1132    }
1133
1134    #[tokio::test]
1135    async fn test_restore_invalid_lsn() {
1136        let mut src = default_source();
1137        let mut cp = SourceCheckpoint::new(1);
1138        cp.set_offset("lsn", "not_an_lsn");
1139
1140        assert!(src.restore(&cp).await.is_err());
1141    }
1142
1143    // ── Poll (empty) ──
1144
1145    #[tokio::test]
1146    async fn test_poll_empty() {
1147        let mut src = running_source();
1148        let result = src.poll_batch(100).await.unwrap();
1149        assert!(result.is_none());
1150    }
1151
1152    #[tokio::test]
1153    async fn test_poll_not_running() {
1154        let mut src = default_source();
1155        assert!(src.poll_batch(100).await.is_err());
1156    }
1157
1158    // ── WAL message processing: full transaction ──
1159
1160    #[tokio::test]
1161    async fn test_process_insert_transaction() {
1162        let mut src = running_source();
1163
1164        let rel_msg = PostgresCdcSource::build_relation_message(
1165            16384,
1166            "public",
1167            "users",
1168            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1169        );
1170        let begin_msg = PostgresCdcSource::build_begin_message(0x100, 0, 1);
1171        let insert_msg =
1172            PostgresCdcSource::build_insert_message(16384, &[Some("42"), Some("Alice")]);
1173        let commit_msg = PostgresCdcSource::build_commit_message(0x100, 0x200, 0);
1174
1175        src.enqueue_wal_data(rel_msg);
1176        src.enqueue_wal_data(begin_msg);
1177        src.enqueue_wal_data(insert_msg);
1178        src.enqueue_wal_data(commit_msg);
1179
1180        let batch = src.poll_batch(100).await.unwrap().unwrap();
1181        assert_eq!(batch.num_rows(), 1);
1182
1183        let records = &batch.records;
1184        let table_col = records.column(0).as_string::<i32>();
1185        assert_eq!(table_col.value(0), "users");
1186
1187        let op_col = records.column(1).as_string::<i32>();
1188        assert_eq!(op_col.value(0), "I");
1189
1190        let after_col = records.column(5).as_string::<i32>();
1191        let after_json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1192        assert_eq!(after_json["id"], "42");
1193        assert_eq!(after_json["name"], "Alice");
1194
1195        // before should be null for INSERT
1196        assert!(records.column(4).is_null(0));
1197    }
1198
1199    // ── Multiple events in one transaction ──
1200
1201    #[tokio::test]
1202    async fn test_multi_event_transaction() {
1203        let mut src = running_source();
1204
1205        // Register relation
1206        let rel_msg = PostgresCdcSource::build_relation_message(
1207            16384,
1208            "public",
1209            "users",
1210            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1211        );
1212        src.enqueue_wal_data(rel_msg);
1213
1214        // Transaction with 3 events
1215        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x300, 0, 2));
1216        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1217            16384,
1218            &[Some("1"), Some("Alice")],
1219        ));
1220        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1221            16384,
1222            &[Some("2"), Some("Bob")],
1223        ));
1224        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1225            16384,
1226            &[Some("3"), Some("Charlie")],
1227        ));
1228        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x300, 0x400, 0));
1229
1230        let batch = src.poll_batch(100).await.unwrap().unwrap();
1231        assert_eq!(batch.num_rows(), 3);
1232    }
1233
1234    // ── Events buffered until commit ──
1235
1236    #[tokio::test]
1237    async fn test_events_buffered_until_commit() {
1238        let mut src = running_source();
1239
1240        let rel_msg = PostgresCdcSource::build_relation_message(
1241            16384,
1242            "public",
1243            "users",
1244            &[(1, "id", INT8_OID, -1)],
1245        );
1246        src.enqueue_wal_data(rel_msg);
1247
1248        // Begin + Insert but NO commit
1249        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1250        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1251
1252        // Poll should return nothing (events in txn buffer)
1253        let result = src.poll_batch(100).await.unwrap();
1254        assert!(result.is_none());
1255
1256        // Now commit
1257        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1258
1259        let batch = src.poll_batch(100).await.unwrap().unwrap();
1260        assert_eq!(batch.num_rows(), 1);
1261    }
1262
1263    // ── Update with old tuple ──
1264
1265    #[tokio::test]
1266    async fn test_process_update() {
1267        let mut src = running_source();
1268
1269        let rel_msg = PostgresCdcSource::build_relation_message(
1270            16384,
1271            "public",
1272            "users",
1273            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1274        );
1275        src.enqueue_wal_data(rel_msg);
1276
1277        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1278        src.enqueue_wal_data(PostgresCdcSource::build_update_message(
1279            16384,
1280            &[Some("42"), Some("Alice")],
1281            &[Some("42"), Some("Bob")],
1282        ));
1283        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1284
1285        let batch = src.poll_batch(100).await.unwrap().unwrap();
1286        assert_eq!(batch.num_rows(), 1);
1287
1288        let op_col = batch.records.column(1).as_string::<i32>();
1289        assert_eq!(op_col.value(0), "U");
1290
1291        // Both before and after should be present
1292        assert!(!batch.records.column(4).is_null(0)); // before
1293        assert!(!batch.records.column(5).is_null(0)); // after
1294    }
1295
1296    // ── Delete ──
1297
1298    #[tokio::test]
1299    async fn test_process_delete() {
1300        let mut src = running_source();
1301
1302        let rel_msg = PostgresCdcSource::build_relation_message(
1303            16384,
1304            "public",
1305            "users",
1306            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1307        );
1308        src.enqueue_wal_data(rel_msg);
1309
1310        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1311        src.enqueue_wal_data(PostgresCdcSource::build_delete_message(
1312            16384,
1313            &[Some("42")],
1314        ));
1315        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1316
1317        let batch = src.poll_batch(100).await.unwrap().unwrap();
1318        let op_col = batch.records.column(1).as_string::<i32>();
1319        assert_eq!(op_col.value(0), "D");
1320
1321        // before present, after null
1322        assert!(!batch.records.column(4).is_null(0));
1323        assert!(batch.records.column(5).is_null(0));
1324    }
1325
1326    // ── Table filtering ──
1327
1328    #[tokio::test]
1329    async fn test_table_exclude_filter() {
1330        let mut config = PostgresCdcConfig::default();
1331        config.table_exclude = vec!["users".to_string()];
1332        let mut src = PostgresCdcSource::new(config, None);
1333        src.state = ConnectorState::Running;
1334
1335        let rel_msg = PostgresCdcSource::build_relation_message(
1336            16384,
1337            "public",
1338            "users",
1339            &[(1, "id", INT8_OID, -1)],
1340        );
1341        src.enqueue_wal_data(rel_msg);
1342
1343        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1344        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1345        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1346
1347        let result = src.poll_batch(100).await.unwrap();
1348        assert!(result.is_none()); // filtered out
1349    }
1350
1351    // ── Max poll records batching ──
1352
1353    #[tokio::test]
1354    async fn test_max_poll_records() {
1355        let mut src = running_source();
1356
1357        // Inject 5 events directly
1358        for i in 0..5 {
1359            src.inject_event(ChangeEvent {
1360                table: "t".to_string(),
1361                op: CdcOperation::Insert,
1362                lsn: Lsn::new(i as u64),
1363                ts_ms: 0,
1364                before: None,
1365                after: Some(format!("{{\"id\":\"{i}\"}}")),
1366            });
1367        }
1368
1369        // Poll only 2
1370        let batch = src.poll_batch(2).await.unwrap().unwrap();
1371        assert_eq!(batch.num_rows(), 2);
1372        assert_eq!(src.buffered_events(), 3);
1373
1374        // Poll remaining
1375        let batch = src.poll_batch(100).await.unwrap().unwrap();
1376        assert_eq!(batch.num_rows(), 3);
1377        assert_eq!(src.buffered_events(), 0);
1378    }
1379
1380    // ── Partition info ──
1381
1382    #[tokio::test]
1383    async fn test_partition_info() {
1384        let mut src = running_source();
1385        src.write_lsn = "1/ABCD".parse().unwrap();
1386
1387        src.inject_event(ChangeEvent {
1388            table: "t".to_string(),
1389            op: CdcOperation::Insert,
1390            lsn: Lsn::ZERO,
1391            ts_ms: 0,
1392            before: None,
1393            after: Some("{}".to_string()),
1394        });
1395
1396        let batch = src.poll_batch(100).await.unwrap().unwrap();
1397        let partition = batch.partition.unwrap();
1398        assert_eq!(partition.id, "laminar_slot");
1399        assert_eq!(partition.offset, "1/ABCD");
1400    }
1401
1402    // ── Replication lag ──
1403
1404    #[test]
1405    fn test_replication_lag() {
1406        let mut src = default_source();
1407        src.write_lsn = Lsn::new(1000);
1408        src.confirmed_flush_lsn = Lsn::new(500);
1409        assert_eq!(src.replication_lag_bytes(), 500);
1410    }
1411
1412    // ── Unknown relation ID ──
1413
1414    #[tokio::test]
1415    async fn test_unknown_relation_error() {
1416        let mut src = running_source();
1417
1418        // Insert without prior Relation message
1419        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1420        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(99999, &[Some("1")]));
1421
1422        let result = src.poll_batch(100).await;
1423        assert!(result.is_err());
1424    }
1425
1426    // ── Multi-table in one transaction ──
1427
1428    #[tokio::test]
1429    async fn test_multi_table_transaction() {
1430        let mut src = running_source();
1431
1432        // Two relations
1433        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1434            100,
1435            "public",
1436            "users",
1437            &[(1, "id", INT4_OID, -1)],
1438        ));
1439        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1440            200,
1441            "public",
1442            "orders",
1443            &[(1, "order_id", INT4_OID, -1)],
1444        ));
1445
1446        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x500, 0, 5));
1447        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1448        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1449            200,
1450            &[Some("1001")],
1451        ));
1452        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x500, 0x600, 0));
1453
1454        let batch = src.poll_batch(100).await.unwrap().unwrap();
1455        assert_eq!(batch.num_rows(), 2);
1456
1457        let table_col = batch.records.column(0).as_string::<i32>();
1458        assert_eq!(table_col.value(0), "users");
1459        assert_eq!(table_col.value(1), "orders");
1460    }
1461
1462    // ── Relation cache update (schema change) ──
1463
1464    #[tokio::test]
1465    async fn test_schema_change_mid_stream() {
1466        let mut src = running_source();
1467
1468        // Initial schema: 1 column
1469        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1470            100,
1471            "public",
1472            "users",
1473            &[(1, "id", INT4_OID, -1)],
1474        ));
1475
1476        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1477        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1478        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1479
1480        let batch1 = src.poll_batch(100).await.unwrap().unwrap();
1481        assert_eq!(batch1.num_rows(), 1);
1482
1483        // Schema changes: add a column
1484        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1485            100,
1486            "public",
1487            "users",
1488            &[(1, "id", INT4_OID, -1), (0, "email", TEXT_OID, -1)],
1489        ));
1490
1491        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x200, 0, 2));
1492        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1493            100,
1494            &[Some("2"), Some("alice@example.com")],
1495        ));
1496        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x200, 0x300, 0));
1497
1498        let batch2 = src.poll_batch(100).await.unwrap().unwrap();
1499        assert_eq!(batch2.num_rows(), 1);
1500
1501        // Verify the new column appears in JSON
1502        let after_col = batch2.records.column(5).as_string::<i32>();
1503        let json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1504        assert_eq!(json["email"], "alice@example.com");
1505    }
1506
1507    // ── Write LSN advances on commit ──
1508
1509    #[tokio::test]
1510    async fn test_write_lsn_advances() {
1511        let mut src = running_source();
1512
1513        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1514            100,
1515            "public",
1516            "t",
1517            &[(1, "id", INT4_OID, -1)],
1518        ));
1519
1520        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1521        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1522        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1523
1524        let _ = src.poll_batch(100).await;
1525        assert_eq!(src.write_lsn().as_u64(), 0x500);
1526    }
1527
1528    // ── TRUNCATE returns error ──
1529
1530    #[tokio::test]
1531    async fn test_truncate_returns_error() {
1532        let mut src = running_source();
1533
1534        // Register relation so the error message includes the table name.
1535        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1536            16384,
1537            "public",
1538            "users",
1539            &[(1, "id", INT8_OID, -1)],
1540        ));
1541
1542        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1543        src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[16384], 0));
1544
1545        let result = src.poll_batch(100).await;
1546        assert!(result.is_err());
1547        let err = result.unwrap_err().to_string();
1548        assert!(
1549            err.contains("TRUNCATE"),
1550            "error should mention TRUNCATE: {err}"
1551        );
1552        assert!(
1553            err.contains("users"),
1554            "error should mention table name: {err}"
1555        );
1556    }
1557
1558    #[tokio::test]
1559    async fn test_truncate_unknown_relation_uses_oid() {
1560        let mut src = running_source();
1561
1562        // No relation registered for ID 99999
1563        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1564        src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[99999], 0));
1565
1566        let result = src.poll_batch(100).await;
1567        assert!(result.is_err());
1568        let err = result.unwrap_err().to_string();
1569        assert!(err.contains("oid:99999"), "error should mention oid: {err}");
1570    }
1571
1572    // ── confirmed_flush_lsn not advanced until checkpoint ──
1573
1574    #[tokio::test]
1575    async fn test_confirmed_lsn_not_advanced_until_checkpoint() {
1576        let mut src = running_source();
1577
1578        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1579            100,
1580            "public",
1581            "t",
1582            &[(1, "id", INT4_OID, -1)],
1583        ));
1584
1585        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1586        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1587        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1588
1589        // Before poll: confirmed_flush_lsn is ZERO.
1590        assert!(src.confirmed_flush_lsn().is_zero());
1591
1592        // After poll: confirmed_flush_lsn must NOT have advanced.
1593        let _ = src.poll_batch(100).await.unwrap().unwrap();
1594        assert!(
1595            src.confirmed_flush_lsn().is_zero(),
1596            "confirmed_flush_lsn should not advance on poll, got {}",
1597            src.confirmed_flush_lsn()
1598        );
1599
1600        // polled_lsn should have advanced.
1601        assert_eq!(src.polled_lsn.as_u64(), 0x500);
1602
1603        // After checkpoint: the checkpoint offset should use polled_lsn.
1604        let cp = src.checkpoint();
1605        assert_eq!(cp.get_offset("lsn"), Some("0/500"));
1606    }
1607
1608    // ── Restore sets polled_lsn ──
1609
1610    #[tokio::test]
1611    async fn test_restore_sets_polled_lsn() {
1612        let mut src = default_source();
1613        let mut cp = SourceCheckpoint::new(1);
1614        cp.set_offset("lsn", "2/FF00");
1615        cp.set_offset("write_lsn", "2/FF10");
1616
1617        src.restore(&cp).await.unwrap();
1618        assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1619        assert_eq!(src.polled_lsn.as_u64(), 0x2_0000_FF00);
1620        assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1621    }
1622
1623    // ── Backpressure (no event dropping) ──
1624
1625    #[tokio::test]
1626    async fn test_backpressure_does_not_drop_buffered_events() {
1627        let mut src = running_source();
1628        src.config.max_buffered_events = 100;
1629
1630        // Inject 200 events directly into the event buffer.
1631        // With backpressure, existing buffered events are never dropped —
1632        // only channel draining is paused when the buffer exceeds the
1633        // high watermark. Direct-injected events are already in the buffer.
1634        for i in 0..200u64 {
1635            src.inject_event(ChangeEvent {
1636                table: "public.t".to_string(),
1637                op: CdcOperation::Insert,
1638                before: None,
1639                after: Some(format!("{{\"id\": {i}}}")),
1640                ts_ms: i as i64,
1641                lsn: Lsn::new(i),
1642            });
1643        }
1644        assert_eq!(src.event_buffer.len(), 200);
1645
1646        // poll_batch drains events from the buffer — no dropping.
1647        let batch = src.poll_batch(50).await.unwrap().unwrap();
1648        assert_eq!(batch.records.num_rows(), 50);
1649        // 200 - 50 drained = 150 remaining. No events dropped.
1650        assert_eq!(src.event_buffer.len(), 150);
1651        assert_eq!(src.metrics.events_dropped.get(), 0);
1652    }
1653}