Skip to main content

laminar_connectors/cdc/postgres/
source.rs

1//! `PostgreSQL` CDC source connector implementation.
2//!
3//! Implements [`SourceConnector`] for streaming logical replication changes
4//! from `PostgreSQL` into `LaminarDB` as Arrow `RecordBatch`es.
5//!
6//! # Architecture
7//!
8//! - **Ring 0**: No CDC code — just SPSC channel pop (~5ns)
9//! - **Ring 1**: WAL consumption, pgoutput parsing, Arrow conversion
10//! - **Ring 2**: Slot management, schema discovery, health checks
11
12use arrow_array::RecordBatch;
13use arrow_schema::SchemaRef;
14use async_trait::async_trait;
15use std::collections::VecDeque;
16use std::sync::Arc;
17use tokio::sync::Notify;
18
19use crate::checkpoint::SourceCheckpoint;
20use crate::config::{ConnectorConfig, ConnectorState};
21use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
22use crate::error::ConnectorError;
23use crate::health::HealthStatus;
24use crate::metrics::ConnectorMetrics;
25
26use super::changelog::{events_to_record_batch, tuple_to_json, CdcOperation, ChangeEvent};
27use super::config::PostgresCdcConfig;
28use super::decoder::{decode_message, WalMessage};
29use super::lsn::Lsn;
30use super::metrics::PostgresCdcMetrics;
31use super::schema::{cdc_envelope_schema, RelationCache, RelationInfo};
32/// `PostgreSQL` CDC source connector.
33///
34/// Streams row-level changes from `PostgreSQL` using logical replication
35/// (`pgoutput` plugin). Changes are emitted as Arrow `RecordBatch`es
36/// in the CDC envelope format.
37///
38/// # Envelope Schema
39///
40/// | Column   | Type          | Nullable | Description                 |
41/// |----------|---------------|----------|-----------------------------|
42/// | `_table` | Utf8          | no       | Schema-qualified table name |
43/// | `_op`    | Utf8          | no       | Operation: I, U, D          |
44/// | `_lsn`   | UInt64        | no       | WAL position                |
45/// | `_ts_ms` | Timestamp(ms) | no       | Commit timestamp            |
46/// | `_before`| Utf8          | yes      | Old row JSON (for U, D)     |
47/// | `_after` | Utf8          | yes      | New row JSON (for I, U)     |
48pub struct PostgresCdcSource {
49    /// Connector configuration.
50    config: PostgresCdcConfig,
51
52    /// Current lifecycle state.
53    state: ConnectorState,
54
55    /// Output schema (CDC envelope).
56    schema: SchemaRef,
57
58    /// Lock-free metrics.
59    metrics: Arc<PostgresCdcMetrics>,
60
61    /// Cached relation (table) schemas from Relation messages.
62    relation_cache: RelationCache,
63
64    /// Buffered change events awaiting `poll_batch()`.
65    event_buffer: VecDeque<ChangeEvent>,
66
67    /// Current transaction state.
68    current_txn: Option<TransactionState>,
69
70    /// Confirmed flush LSN (last acknowledged position).
71    confirmed_flush_lsn: Lsn,
72
73    /// Write LSN (latest position received from server).
74    write_lsn: Lsn,
75
76    /// Polled LSN — tracks the latest position drained into a batch.
77    /// Decoupled from `confirmed_flush_lsn` so the PG replication slot
78    /// is only advanced when the pipeline actually checkpoints.
79    polled_lsn: Lsn,
80
81    /// Pending WAL messages to process (for testing and batch processing).
82    pending_messages: VecDeque<Vec<u8>>,
83
84    /// Notification handle signalled when WAL data arrives from the reader task.
85    data_ready: Arc<Notify>,
86
87    /// Background connection task handle (feature-gated).
88    #[cfg(feature = "postgres-cdc")]
89    connection_handle: Option<tokio::task::JoinHandle<()>>,
90
91    /// Channel receiver for WAL events from the background reader task.
92    #[cfg(feature = "postgres-cdc")]
93    wal_rx: Option<WalPayloadRx>,
94
95    /// Background WAL reader task handle.
96    #[cfg(feature = "postgres-cdc")]
97    reader_handle: Option<tokio::task::JoinHandle<()>>,
98
99    /// Shutdown signal for the background reader task.
100    #[cfg(feature = "postgres-cdc")]
101    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
102
103    /// Sender for feeding confirmed flush LSN back to the reader task.
104    /// The reader uses this to call `update_applied_lsn` only for
105    /// durably-checkpointed positions (prevents at-least-once violation).
106    #[cfg(feature = "postgres-cdc")]
107    confirmed_lsn_tx: Option<tokio::sync::watch::Sender<u64>>,
108}
109
110/// In-progress transaction state.
111#[derive(Debug, Clone)]
112struct TransactionState {
113    /// Final LSN of the transaction.
114    final_lsn: Lsn,
115    /// Commit timestamp in milliseconds.
116    commit_ts_ms: i64,
117    /// Change events accumulated in this transaction.
118    events: Vec<ChangeEvent>,
119}
120
121/// Single-consumer async receiver for the WAL reader → `poll_batch` queue.
122#[cfg(feature = "postgres-cdc")]
123type WalPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<WalPayload>>;
124
125/// WAL event payload sent from the background reader task to [`PostgresCdcSource::poll_batch`].
126#[allow(dead_code)] // constructed + consumed only with feature = "postgres-cdc"
127enum WalPayload {
128    Begin {
129        final_lsn: u64,
130        commit_ts_us: i64,
131        xid: u32,
132    },
133    Commit {
134        end_lsn: u64,
135        commit_ts_us: i64,
136        lsn: u64,
137    },
138    XLogData {
139        wal_end: u64,
140        data: Vec<u8>,
141    },
142    KeepAlive {
143        wal_end: u64,
144    },
145    /// Fatal error from the reader task (e.g. reconnect exhaustion).
146    Error(String),
147}
148
149impl PostgresCdcSource {
150    /// Creates a new `PostgreSQL` CDC source with the given configuration.
151    #[must_use]
152    pub fn new(config: PostgresCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
153        Self {
154            config,
155            state: ConnectorState::Created,
156            schema: cdc_envelope_schema(),
157            metrics: Arc::new(PostgresCdcMetrics::new(registry)),
158            relation_cache: RelationCache::new(),
159            event_buffer: VecDeque::new(),
160            current_txn: None,
161            confirmed_flush_lsn: Lsn::ZERO,
162            write_lsn: Lsn::ZERO,
163            polled_lsn: Lsn::ZERO,
164            pending_messages: VecDeque::new(),
165            data_ready: Arc::new(Notify::new()),
166            #[cfg(feature = "postgres-cdc")]
167            connection_handle: None,
168            #[cfg(feature = "postgres-cdc")]
169            wal_rx: None,
170            #[cfg(feature = "postgres-cdc")]
171            reader_handle: None,
172            #[cfg(feature = "postgres-cdc")]
173            reader_shutdown: None,
174            #[cfg(feature = "postgres-cdc")]
175            confirmed_lsn_tx: None,
176        }
177    }
178
179    /// Creates a new source from a generic [`ConnectorConfig`].
180    ///
181    /// # Errors
182    ///
183    /// Returns `ConnectorError` if the configuration is invalid.
184    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
185        let pg_config = PostgresCdcConfig::from_config(config)?;
186        Ok(Self::new(pg_config, None))
187    }
188
189    /// Returns a reference to the CDC configuration.
190    #[must_use]
191    pub fn config(&self) -> &PostgresCdcConfig {
192        &self.config
193    }
194
195    /// Returns the current confirmed flush LSN.
196    #[must_use]
197    pub fn confirmed_flush_lsn(&self) -> Lsn {
198        self.confirmed_flush_lsn
199    }
200
201    /// Returns the current write LSN.
202    #[must_use]
203    pub fn write_lsn(&self) -> Lsn {
204        self.write_lsn
205    }
206
207    /// Returns the current replication lag in bytes.
208    #[must_use]
209    pub fn replication_lag_bytes(&self) -> u64 {
210        self.write_lsn.diff(self.confirmed_flush_lsn)
211    }
212
213    /// Returns a reference to the relation cache.
214    #[must_use]
215    pub fn relation_cache(&self) -> &RelationCache {
216        &self.relation_cache
217    }
218
219    /// Returns the number of buffered events.
220    #[must_use]
221    pub fn buffered_events(&self) -> usize {
222        self.event_buffer.len()
223    }
224
225    /// Enqueues raw WAL message bytes for processing.
226    ///
227    /// Used by the replication stream handler to feed data into the source,
228    /// and by tests to inject synthetic messages.
229    pub fn enqueue_wal_data(&mut self, data: Vec<u8>) {
230        self.pending_messages.push_back(data);
231    }
232
233    /// Processes all pending WAL messages, converting them to change events.
234    ///
235    /// # Errors
236    ///
237    /// Returns `ConnectorError::ReadError` if decoding fails.
238    pub fn process_pending_messages(&mut self) -> Result<(), ConnectorError> {
239        while let Some(data) = self.pending_messages.pop_front() {
240            self.metrics.record_bytes(data.len() as u64);
241            let msg = decode_message(&data)
242                .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
243            self.process_wal_message(msg)?;
244        }
245        Ok(())
246    }
247
248    /// Processes a single decoded WAL message.
249    fn process_wal_message(&mut self, msg: WalMessage) -> Result<(), ConnectorError> {
250        match msg {
251            WalMessage::Begin(begin) => {
252                self.current_txn = Some(TransactionState {
253                    final_lsn: begin.final_lsn,
254                    commit_ts_ms: begin.commit_ts_ms,
255                    events: Vec::new(),
256                });
257            }
258            WalMessage::Commit(commit) => {
259                if let Some(txn) = self.current_txn.take() {
260                    self.event_buffer.extend(txn.events);
261                    self.write_lsn = commit.end_lsn;
262                    self.metrics.record_transaction();
263                    self.metrics
264                        .set_replication_lag_bytes(self.replication_lag_bytes());
265                }
266            }
267            WalMessage::Relation(rel) => {
268                let info = RelationInfo {
269                    relation_id: rel.relation_id,
270                    namespace: rel.namespace,
271                    name: rel.name,
272                    replica_identity: rel.replica_identity as char,
273                    columns: rel.columns,
274                };
275                self.relation_cache.insert(info);
276            }
277            WalMessage::Insert(ins) => {
278                self.process_insert(ins.relation_id, &ins.new_tuple)?;
279            }
280            WalMessage::Update(upd) => {
281                self.process_update(upd.relation_id, upd.old_tuple.as_ref(), &upd.new_tuple)?;
282            }
283            WalMessage::Delete(del) => {
284                self.process_delete(del.relation_id, &del.old_tuple)?;
285            }
286            WalMessage::Truncate(trunc) => {
287                let table_names: Vec<String> = trunc
288                    .relation_ids
289                    .iter()
290                    .map(|id| {
291                        self.relation_cache
292                            .get(*id)
293                            .map_or_else(|| format!("oid:{id}"), RelationInfo::full_name)
294                    })
295                    .collect();
296                return Err(ConnectorError::ReadError(format!(
297                    "TRUNCATE received on table(s): {}. \
298                     Cannot produce retraction events — restart the pipeline with a fresh snapshot.",
299                    table_names.join(", ")
300                )));
301            }
302            WalMessage::Origin(_) | WalMessage::Type(_) => {
303                // Origin and Type messages are noted but don't
304                // produce change events in the current implementation.
305            }
306        }
307        Ok(())
308    }
309
310    fn process_insert(
311        &mut self,
312        relation_id: u32,
313        new_tuple: &super::decoder::TupleData,
314    ) -> Result<(), ConnectorError> {
315        let relation = self.require_relation(relation_id)?;
316        let table = relation.full_name();
317
318        if !self.config.should_include_table(&table) {
319            return Ok(());
320        }
321
322        let after_json = tuple_to_json(new_tuple, relation);
323        let (lsn, ts_ms) = self.current_txn_context();
324
325        let event = ChangeEvent {
326            table,
327            op: CdcOperation::Insert,
328            lsn,
329            ts_ms,
330            before: None,
331            after: Some(after_json),
332        };
333
334        self.push_event(event);
335        self.metrics.record_insert();
336        Ok(())
337    }
338
339    fn process_update(
340        &mut self,
341        relation_id: u32,
342        old_tuple: Option<&super::decoder::TupleData>,
343        new_tuple: &super::decoder::TupleData,
344    ) -> Result<(), ConnectorError> {
345        let relation = self.require_relation(relation_id)?;
346        let table = relation.full_name();
347
348        if !self.config.should_include_table(&table) {
349            return Ok(());
350        }
351
352        let before_json = old_tuple.map(|t| tuple_to_json(t, relation));
353        let after_json = tuple_to_json(new_tuple, relation);
354        let (lsn, ts_ms) = self.current_txn_context();
355
356        let event = ChangeEvent {
357            table,
358            op: CdcOperation::Update,
359            lsn,
360            ts_ms,
361            before: before_json,
362            after: Some(after_json),
363        };
364
365        self.push_event(event);
366        self.metrics.record_update();
367        Ok(())
368    }
369
370    fn process_delete(
371        &mut self,
372        relation_id: u32,
373        old_tuple: &super::decoder::TupleData,
374    ) -> Result<(), ConnectorError> {
375        let relation = self.require_relation(relation_id)?;
376        let table = relation.full_name();
377
378        if !self.config.should_include_table(&table) {
379            return Ok(());
380        }
381
382        let before_json = tuple_to_json(old_tuple, relation);
383        let (lsn, ts_ms) = self.current_txn_context();
384
385        let event = ChangeEvent {
386            table,
387            op: CdcOperation::Delete,
388            lsn,
389            ts_ms,
390            before: Some(before_json),
391            after: None,
392        };
393
394        self.push_event(event);
395        self.metrics.record_delete();
396        Ok(())
397    }
398
399    /// Looks up a relation by ID, returning a reference (no clone).
400    ///
401    /// The caller must extract all needed data (table name, JSON) from
402    /// the reference before calling `push_event` or other `&mut self`
403    /// methods (Rust's borrow rules require disjoint access).
404    fn require_relation(&self, relation_id: u32) -> Result<&RelationInfo, ConnectorError> {
405        self.relation_cache.get(relation_id).ok_or_else(|| {
406            ConnectorError::ReadError(format!(
407                "unknown relation ID {relation_id} (no Relation message received yet)"
408            ))
409        })
410    }
411
412    fn current_txn_context(&self) -> (Lsn, i64) {
413        match &self.current_txn {
414            Some(txn) => (txn.final_lsn, txn.commit_ts_ms),
415            None => (self.write_lsn, 0),
416        }
417    }
418
419    fn push_event(&mut self, event: ChangeEvent) {
420        if let Some(txn) = &mut self.current_txn {
421            txn.events.push(event);
422        } else {
423            self.event_buffer.push_back(event);
424        }
425    }
426
427    /// Processes a [`WalPayload`] received from the background reader task.
428    #[cfg(feature = "postgres-cdc")]
429    fn process_wal_payload(&mut self, payload: WalPayload) -> Result<(), ConnectorError> {
430        use super::decoder::pg_timestamp_to_unix_ms;
431
432        match payload {
433            WalPayload::Begin {
434                final_lsn,
435                commit_ts_us,
436                xid,
437            } => {
438                let begin = super::decoder::BeginMessage {
439                    final_lsn: Lsn::new(final_lsn),
440                    commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
441                    xid,
442                };
443                self.process_wal_message(WalMessage::Begin(begin))
444            }
445            WalPayload::Commit {
446                end_lsn,
447                commit_ts_us,
448                lsn,
449            } => {
450                let commit = super::decoder::CommitMessage {
451                    flags: 0,
452                    commit_lsn: Lsn::new(lsn),
453                    end_lsn: Lsn::new(end_lsn),
454                    commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
455                };
456                self.process_wal_message(WalMessage::Commit(commit))
457            }
458            WalPayload::XLogData { wal_end, data } => {
459                // Skip raw Begin/Commit bytes to avoid double-processing
460                // (pgwire-replication delivers these as separate events).
461                if !data.is_empty() && (data[0] == b'B' || data[0] == b'C') {
462                    self.write_lsn = Lsn::new(wal_end);
463                    return Ok(());
464                }
465                let msg = decode_message(&data)
466                    .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
467                self.process_wal_message(msg)?;
468                self.write_lsn = Lsn::new(wal_end);
469                Ok(())
470            }
471            WalPayload::KeepAlive { wal_end } => {
472                self.write_lsn = Lsn::new(wal_end);
473                Ok(())
474            }
475            WalPayload::Error(msg) => Err(ConnectorError::ReadError(msg)),
476        }
477    }
478
479    /// Drains up to `max` events from the buffer and converts to a `RecordBatch`.
480    fn drain_events(&mut self, max: usize) -> Result<Option<RecordBatch>, ConnectorError> {
481        if self.event_buffer.is_empty() {
482            return Ok(None);
483        }
484
485        let count = max.min(self.event_buffer.len());
486        let events: Vec<ChangeEvent> = self.event_buffer.drain(..count).collect();
487
488        let batch = events_to_record_batch(&events)
489            .map_err(|e| ConnectorError::Internal(format!("Arrow batch build: {e}")))?;
490
491        self.metrics.record_batch();
492        Ok(Some(batch))
493    }
494}
495
496#[async_trait]
497#[allow(clippy::too_many_lines)]
498impl SourceConnector for PostgresCdcSource {
499    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
500        self.state = ConnectorState::Initializing;
501
502        // If config has properties, re-parse (supports runtime config via SQL WITH).
503        if !config.properties().is_empty() {
504            self.config = PostgresCdcConfig::from_config(config)?;
505        }
506
507        // Set start LSN if configured
508        if let Some(lsn) = self.config.start_lsn {
509            self.confirmed_flush_lsn = lsn;
510            self.write_lsn = lsn;
511            self.polled_lsn = lsn;
512        }
513
514        // Without postgres-cdc feature, open() must fail loudly to prevent
515        // silent data loss (poll_batch would return Ok(None) forever).
516        #[cfg(not(feature = "postgres-cdc"))]
517        {
518            return Err(ConnectorError::ConfigurationError(
519                "PostgreSQL CDC source requires the `postgres-cdc` feature flag. \
520                 Rebuild with `--features postgres-cdc` to enable."
521                    .to_string(),
522            ));
523        }
524
525        #[cfg(all(feature = "postgres-cdc", not(test)))]
526        {
527            use super::postgres_io;
528
529            // 1. Connect control-plane for slot management
530            let (client, handle) = postgres_io::connect(&self.config).await?;
531            self.connection_handle = Some(handle);
532
533            // 2. Ensure replication slot exists
534            let slot_lsn = postgres_io::ensure_replication_slot(
535                &client,
536                &self.config.slot_name,
537                &self.config.output_plugin,
538            )
539            .await?;
540
541            // Use slot's confirmed_flush_lsn if no explicit start LSN
542            if self.config.start_lsn.is_none() {
543                if let Some(lsn) = slot_lsn {
544                    self.confirmed_flush_lsn = lsn;
545                    self.write_lsn = lsn;
546                    self.polled_lsn = lsn;
547                }
548            }
549
550            // 3. Build pgwire-replication config and start WAL streaming
551            let mut repl_config = postgres_io::build_replication_config(&self.config);
552            // If we resolved a slot LSN, override start_lsn so we resume correctly
553            if self.confirmed_flush_lsn != Lsn::ZERO {
554                repl_config.start_lsn =
555                    pgwire_replication::Lsn::from_u64(self.confirmed_flush_lsn.as_u64());
556            }
557
558            let repl_client = pgwire_replication::ReplicationClient::connect(repl_config)
559                .await
560                .map_err(|e| {
561                    ConnectorError::ConnectionFailed(format!("pgwire-replication connect: {e}"))
562                })?;
563
564            // Spawn background reader task for event-driven wake-up.
565            let (wal_tx, wal_rx) = crossfire::mpsc::bounded_async::<WalPayload>(4096);
566            let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
567            let (confirmed_lsn_tx, mut confirmed_lsn_rx) =
568                tokio::sync::watch::channel(self.confirmed_flush_lsn.as_u64());
569            let data_ready = Arc::clone(&self.data_ready);
570            let reader_config = self.config.clone();
571
572            let reader_handle = tokio::spawn(async move {
573                const MAX_FAILURES: u32 = 10;
574                let mut repl_client = repl_client;
575                let mut consecutive_failures: u32 = 0;
576
577                'reconnect: loop {
578                    // Inner recv loop — processes events from the current connection.
579                    'recv: loop {
580                        // Feed back confirmed LSN to PG (non-blocking check).
581                        // Only positions that the main thread has checkpointed
582                        // are reported as "applied", preserving at-least-once.
583                        if confirmed_lsn_rx.has_changed().unwrap_or(false) {
584                            let confirmed = *confirmed_lsn_rx.borrow_and_update();
585                            if confirmed > 0 {
586                                repl_client.update_applied_lsn(pgwire_replication::Lsn::from_u64(
587                                    confirmed,
588                                ));
589                            }
590                        }
591
592                        tokio::select! {
593                            biased;
594                            _ = shutdown_rx.changed() => break 'reconnect,
595                            event = repl_client.recv() => {
596                                match event {
597                                    Ok(Some(event)) => {
598                                        consecutive_failures = 0;
599                                        let payload = match &event {
600                                            pgwire_replication::ReplicationEvent::Begin {
601                                                final_lsn,
602                                                xid,
603                                                commit_time_micros,
604                                            } => Some(WalPayload::Begin {
605                                                final_lsn: final_lsn.as_u64(),
606                                                commit_ts_us: *commit_time_micros,
607                                                xid: *xid,
608                                            }),
609                                            pgwire_replication::ReplicationEvent::Commit {
610                                                end_lsn,
611                                                commit_time_micros,
612                                                lsn,
613                                            } => Some(WalPayload::Commit {
614                                                end_lsn: end_lsn.as_u64(),
615                                                commit_ts_us: *commit_time_micros,
616                                                lsn: lsn.as_u64(),
617                                            }),
618                                            pgwire_replication::ReplicationEvent::XLogData {
619                                                wal_end,
620                                                data,
621                                                ..
622                                            } => Some(WalPayload::XLogData {
623                                                wal_end: wal_end.as_u64(),
624                                                data: data.to_vec(),
625                                            }),
626                                            pgwire_replication::ReplicationEvent::KeepAlive {
627                                                wal_end,
628                                                ..
629                                            } => Some(WalPayload::KeepAlive {
630                                                wal_end: wal_end.as_u64(),
631                                            }),
632                                            _ => None,
633                                        };
634                                        if let Some(p) = payload {
635                                            if wal_tx.send(p).await.is_err() {
636                                                break 'reconnect;
637                                            }
638                                            data_ready.notify_one();
639                                        }
640                                    }
641                                    Ok(None) => break 'recv,
642                                    Err(e) => {
643                                        tracing::error!(error = %e, "WAL reader error");
644                                        break 'recv;
645                                    }
646                                }
647                            }
648                        }
649                    }
650
651                    // Shut down old client before reconnecting.
652                    let _ = repl_client.shutdown().await;
653                    consecutive_failures += 1;
654
655                    if consecutive_failures >= MAX_FAILURES {
656                        tracing::error!(
657                            failures = consecutive_failures,
658                            "WAL reader exhausted reconnect attempts"
659                        );
660                        let _ = wal_tx.send(WalPayload::Error(format!(
661                            "WAL reader failed after {consecutive_failures} consecutive reconnect attempts"
662                        ))).await;
663                        data_ready.notify_one();
664                        break 'reconnect;
665                    }
666
667                    // Exponential backoff: 2^n seconds, capped at 30s.
668                    let backoff_secs = (1u64 << consecutive_failures).min(30);
669                    tracing::warn!(
670                        attempt = consecutive_failures,
671                        backoff_secs,
672                        "WAL reader reconnecting"
673                    );
674                    tokio::select! {
675                        biased;
676                        _ = shutdown_rx.changed() => break 'reconnect,
677                        () = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
678                    }
679
680                    // Rebuild replication config with resume LSN from checkpoint.
681                    let resume_lsn = *confirmed_lsn_rx.borrow();
682                    let mut new_config = postgres_io::build_replication_config(&reader_config);
683                    if resume_lsn > 0 {
684                        new_config.start_lsn = pgwire_replication::Lsn::from_u64(resume_lsn);
685                    }
686
687                    match pgwire_replication::ReplicationClient::connect(new_config).await {
688                        Ok(new_client) => {
689                            tracing::info!("WAL reader reconnected");
690                            repl_client = new_client;
691                        }
692                        Err(e) => {
693                            tracing::error!(error = %e, "WAL reader reconnect failed");
694                        }
695                    }
696                }
697            });
698
699            self.wal_rx = Some(wal_rx);
700            self.reader_handle = Some(reader_handle);
701            self.reader_shutdown = Some(shutdown_tx);
702            self.confirmed_lsn_tx = Some(confirmed_lsn_tx);
703        }
704
705        self.state = ConnectorState::Running;
706        Ok(())
707    }
708
709    async fn poll_batch(
710        &mut self,
711        max_records: usize,
712    ) -> Result<Option<SourceBatch>, ConnectorError> {
713        if self.state != ConnectorState::Running {
714            return Err(ConnectorError::InvalidState {
715                expected: "Running".to_string(),
716                actual: self.state.to_string(),
717            });
718        }
719
720        // Drain WAL events from background reader task.
721        // Collect into a temp vec first to avoid holding a mutable
722        // borrow on self.wal_rx while calling self.process_wal_payload().
723        //
724        // Backpressure: when the event buffer exceeds the high watermark,
725        // stop draining the reader channel. The bounded mpsc channel (4096)
726        // propagates backpressure to the WAL reader task, which in turn
727        // applies TCP backpressure to the replication slot. This prevents
728        // data loss from dropping events.
729        #[cfg(feature = "postgres-cdc")]
730        {
731            let high_watermark = self.config.backpressure_high_watermark();
732            let mut payloads = Vec::new();
733            let mut reader_closed = false;
734
735            if self.event_buffer.len() < high_watermark {
736                if let Some(ref mut rx) = self.wal_rx {
737                    while payloads.len() + self.event_buffer.len() < max_records
738                        && self.event_buffer.len() + payloads.len() < high_watermark
739                    {
740                        match rx.try_recv() {
741                            Ok(payload) => payloads.push(payload),
742                            Err(crossfire::TryRecvError::Empty) => break,
743                            Err(crossfire::TryRecvError::Disconnected) => {
744                                reader_closed = true;
745                                break;
746                            }
747                        }
748                    }
749                }
750            } else {
751                tracing::debug!(
752                    buffered = self.event_buffer.len(),
753                    high_watermark,
754                    "CDC backpressure active — pausing WAL reader drain"
755                );
756            }
757
758            for payload in payloads {
759                self.process_wal_payload(payload)?;
760            }
761            if reader_closed && self.event_buffer.is_empty() {
762                self.state = ConnectorState::Failed;
763                return Err(ConnectorError::ReadError(
764                    "WAL reader task terminated unexpectedly — replication stream lost".to_string(),
765                ));
766            }
767        }
768
769        // Process any pending WAL messages (test injection path)
770        self.process_pending_messages()?;
771
772        // Drain buffered events into a RecordBatch.
773        // Watermark advancement: the batch contains `_ts_ms` (commit timestamp)
774        // which downstream pipeline watermark extractors should use. The LSN
775        // in PartitionInfo tracks replication progress for offset management.
776        // TODO: extract max(_ts_ms) and expose as source-level watermark for
777        // windowed aggregations that depend on CDC event time.
778        match self.drain_events(max_records)? {
779            Some(batch) => {
780                // Advance polled_lsn (NOT confirmed_flush_lsn). PG slot
781                // feedback happens in checkpoint(), not here.
782                if self.event_buffer.is_empty() {
783                    self.polled_lsn = self.write_lsn;
784                }
785                self.metrics
786                    .set_confirmed_flush_lsn(self.confirmed_flush_lsn.as_u64());
787                self.metrics
788                    .set_replication_lag_bytes(self.replication_lag_bytes());
789
790                let lsn_str = self.write_lsn.to_string();
791                let partition = PartitionInfo::new(&self.config.slot_name, lsn_str);
792                Ok(Some(SourceBatch::with_partition(batch, partition)))
793            }
794            None => Ok(None),
795        }
796    }
797
798    fn schema(&self) -> SchemaRef {
799        Arc::clone(&self.schema)
800    }
801
802    fn checkpoint(&self) -> SourceCheckpoint {
803        let mut cp = SourceCheckpoint::new(0);
804        // polled_lsn = latest position drained into a batch. The PG
805        // slot is only advanced here (not in poll_batch) to prevent
806        // data loss on crash.
807        cp.set_offset("lsn", self.polled_lsn.to_string());
808        cp.set_offset("write_lsn", self.write_lsn.to_string());
809        cp.set_metadata("slot_name", &self.config.slot_name);
810        cp.set_metadata("publication", &self.config.publication);
811
812        // Feed polled LSN to reader task so PG can reclaim WAL.
813        // watch::Sender::send is &self, so this works from checkpoint().
814        #[cfg(feature = "postgres-cdc")]
815        if let Some(ref tx) = self.confirmed_lsn_tx {
816            let _ = tx.send(self.polled_lsn.as_u64());
817        }
818
819        cp
820    }
821
822    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
823        if let Some(lsn_str) = checkpoint.get_offset("lsn") {
824            let lsn: Lsn = lsn_str
825                .parse()
826                .map_err(|e| ConnectorError::Internal(format!("invalid LSN in checkpoint: {e}")))?;
827            self.confirmed_flush_lsn = lsn;
828            self.polled_lsn = lsn;
829            self.metrics.set_confirmed_flush_lsn(lsn.as_u64());
830        }
831        if let Some(write_lsn_str) = checkpoint.get_offset("write_lsn") {
832            if let Ok(lsn) = write_lsn_str.parse::<Lsn>() {
833                self.write_lsn = lsn;
834            }
835        }
836        Ok(())
837    }
838
839    fn health_check(&self) -> HealthStatus {
840        match self.state {
841            ConnectorState::Running => {
842                let lag = self.replication_lag_bytes();
843                if lag > 100_000_000 {
844                    // > 100MB lag
845                    HealthStatus::Degraded(format!("replication lag: {lag} bytes"))
846                } else {
847                    HealthStatus::Healthy
848                }
849            }
850            ConnectorState::Failed => HealthStatus::Unhealthy("connector failed".to_string()),
851            _ => HealthStatus::Unknown,
852        }
853    }
854
855    fn metrics(&self) -> ConnectorMetrics {
856        self.metrics.to_connector_metrics()
857    }
858
859    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
860        Some(Arc::clone(&self.data_ready))
861    }
862
863    async fn close(&mut self) -> Result<(), ConnectorError> {
864        // Signal reader task to shut down (it calls repl_client.shutdown() internally).
865        #[cfg(feature = "postgres-cdc")]
866        {
867            if let Some(tx) = self.reader_shutdown.take() {
868                let _ = tx.send(true);
869            }
870            if let Some(handle) = self.reader_handle.take() {
871                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
872            }
873            self.wal_rx = None;
874            self.confirmed_lsn_tx = None;
875        }
876
877        // Await the control-plane connection task briefly so it can
878        // finish any in-flight work before we abort.
879        #[cfg(feature = "postgres-cdc")]
880        if let Some(handle) = self.connection_handle.take() {
881            let abort = handle.abort_handle();
882            match tokio::time::timeout(std::time::Duration::from_secs(2), handle).await {
883                Ok(Ok(())) => {}
884                Ok(Err(join_err)) => {
885                    tracing::warn!(
886                        error = %join_err,
887                        "[postgres-cdc] control-plane task exited with error"
888                    );
889                }
890                Err(_elapsed) => {
891                    tracing::warn!(
892                        "[postgres-cdc] control-plane task did not exit within 2s; aborting"
893                    );
894                    abort.abort();
895                }
896            }
897        }
898
899        self.state = ConnectorState::Closed;
900        self.event_buffer.clear();
901        self.pending_messages.clear();
902        Ok(())
903    }
904}
905
906// ── Test helpers ──
907
908#[cfg(test)]
909impl PostgresCdcSource {
910    /// Injects a pre-built change event directly into the event buffer.
911    fn inject_event(&mut self, event: ChangeEvent) {
912        self.event_buffer.push_back(event);
913    }
914
915    /// Builds a binary pgoutput Relation message for testing.
916    fn build_relation_message(
917        relation_id: u32,
918        namespace: &str,
919        name: &str,
920        columns: &[(u8, &str, u32, i32)], // (flags, name, oid, modifier)
921    ) -> Vec<u8> {
922        let mut buf = vec![b'R'];
923        buf.extend_from_slice(&relation_id.to_be_bytes());
924        buf.extend_from_slice(namespace.as_bytes());
925        buf.push(0);
926        buf.extend_from_slice(name.as_bytes());
927        buf.push(0);
928        buf.push(b'd'); // replica identity = default
929        buf.extend_from_slice(&(columns.len() as i16).to_be_bytes());
930        for (flags, col_name, oid, modifier) in columns {
931            buf.push(*flags);
932            buf.extend_from_slice(col_name.as_bytes());
933            buf.push(0);
934            buf.extend_from_slice(&oid.to_be_bytes());
935            buf.extend_from_slice(&modifier.to_be_bytes());
936        }
937        buf
938    }
939
940    /// Builds a binary pgoutput Begin message for testing.
941    fn build_begin_message(final_lsn: u64, commit_ts_us: i64, xid: u32) -> Vec<u8> {
942        let mut buf = vec![b'B'];
943        buf.extend_from_slice(&final_lsn.to_be_bytes());
944        buf.extend_from_slice(&commit_ts_us.to_be_bytes());
945        buf.extend_from_slice(&xid.to_be_bytes());
946        buf
947    }
948
949    /// Builds a binary pgoutput Commit message for testing.
950    fn build_commit_message(commit_lsn: u64, end_lsn: u64, commit_ts_us: i64) -> Vec<u8> {
951        let mut buf = vec![b'C'];
952        buf.push(0); // flags
953        buf.extend_from_slice(&commit_lsn.to_be_bytes());
954        buf.extend_from_slice(&end_lsn.to_be_bytes());
955        buf.extend_from_slice(&commit_ts_us.to_be_bytes());
956        buf
957    }
958
959    /// Builds a binary pgoutput Insert message for testing.
960    fn build_insert_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
961        let mut buf = vec![b'I'];
962        buf.extend_from_slice(&relation_id.to_be_bytes());
963        buf.push(b'N');
964        buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
965        for val in values {
966            match val {
967                Some(s) => {
968                    buf.push(b't');
969                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
970                    buf.extend_from_slice(s.as_bytes());
971                }
972                None => buf.push(b'n'),
973            }
974        }
975        buf
976    }
977
978    /// Builds a binary pgoutput Delete message for testing.
979    fn build_delete_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
980        let mut buf = vec![b'D'];
981        buf.extend_from_slice(&relation_id.to_be_bytes());
982        buf.push(b'K'); // key identity
983        buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
984        for val in values {
985            match val {
986                Some(s) => {
987                    buf.push(b't');
988                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
989                    buf.extend_from_slice(s.as_bytes());
990                }
991                None => buf.push(b'n'),
992            }
993        }
994        buf
995    }
996
997    /// Builds a binary pgoutput Truncate message for testing.
998    fn build_truncate_message(relation_ids: &[u32], options: u8) -> Vec<u8> {
999        let mut buf = vec![b'T'];
1000        buf.extend_from_slice(&(relation_ids.len() as i32).to_be_bytes());
1001        buf.push(options);
1002        for id in relation_ids {
1003            buf.extend_from_slice(&id.to_be_bytes());
1004        }
1005        buf
1006    }
1007
1008    /// Builds a binary pgoutput Update message (with old tuple) for testing.
1009    fn build_update_message(
1010        relation_id: u32,
1011        old_values: &[Option<&str>],
1012        new_values: &[Option<&str>],
1013    ) -> Vec<u8> {
1014        let mut buf = vec![b'U'];
1015        buf.extend_from_slice(&relation_id.to_be_bytes());
1016        // Old tuple with 'O' tag (REPLICA IDENTITY FULL)
1017        buf.push(b'O');
1018        buf.extend_from_slice(&(old_values.len() as i16).to_be_bytes());
1019        for val in old_values {
1020            match val {
1021                Some(s) => {
1022                    buf.push(b't');
1023                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1024                    buf.extend_from_slice(s.as_bytes());
1025                }
1026                None => buf.push(b'n'),
1027            }
1028        }
1029        // New tuple
1030        buf.push(b'N');
1031        buf.extend_from_slice(&(new_values.len() as i16).to_be_bytes());
1032        for val in new_values {
1033            match val {
1034                Some(s) => {
1035                    buf.push(b't');
1036                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
1037                    buf.extend_from_slice(s.as_bytes());
1038                }
1039                None => buf.push(b'n'),
1040            }
1041        }
1042        buf
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use super::*;
1049    use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
1050    use arrow_array::cast::AsArray;
1051
1052    fn default_source() -> PostgresCdcSource {
1053        PostgresCdcSource::new(PostgresCdcConfig::default(), None)
1054    }
1055
1056    fn running_source() -> PostgresCdcSource {
1057        let mut src = default_source();
1058        src.state = ConnectorState::Running;
1059        src
1060    }
1061
1062    // ── Construction ──
1063
1064    #[test]
1065    fn test_new_source() {
1066        let src = default_source();
1067        assert_eq!(src.state, ConnectorState::Created);
1068        assert!(src.confirmed_flush_lsn.is_zero());
1069        assert_eq!(src.event_buffer.len(), 0);
1070        assert_eq!(src.schema().fields().len(), 6);
1071    }
1072
1073    #[test]
1074    fn test_from_config() {
1075        let mut config = ConnectorConfig::new("postgres-cdc");
1076        config.set("host", "pg.local");
1077        config.set("database", "testdb");
1078        config.set("slot.name", "my_slot");
1079        config.set("publication", "my_pub");
1080
1081        let src = PostgresCdcSource::from_config(&config).unwrap();
1082        assert_eq!(src.config().host, "pg.local");
1083        assert_eq!(src.config().database, "testdb");
1084    }
1085
1086    #[test]
1087    fn test_from_config_invalid() {
1088        let config = ConnectorConfig::new("postgres-cdc");
1089        assert!(PostgresCdcSource::from_config(&config).is_err());
1090    }
1091
1092    // ── Lifecycle ──
1093
1094    // Without postgres-cdc feature, open() must return an error.
1095    #[cfg(not(feature = "postgres-cdc"))]
1096    #[tokio::test]
1097    async fn test_open_fails_without_feature() {
1098        let mut src = default_source();
1099        let config = ConnectorConfig::new("postgres-cdc");
1100        let result = src.open(&config).await;
1101        assert!(result.is_err());
1102        let err = result.unwrap_err().to_string();
1103        assert!(
1104            err.contains("postgres-cdc"),
1105            "error should mention feature flag: {err}"
1106        );
1107    }
1108
1109    #[tokio::test]
1110    async fn test_close() {
1111        let mut src = running_source();
1112        src.inject_event(ChangeEvent {
1113            table: "t".to_string(),
1114            op: CdcOperation::Insert,
1115            lsn: Lsn::ZERO,
1116            ts_ms: 0,
1117            before: None,
1118            after: Some("{}".to_string()),
1119        });
1120
1121        src.close().await.unwrap();
1122        assert_eq!(src.state, ConnectorState::Closed);
1123        assert_eq!(src.event_buffer.len(), 0);
1124    }
1125
1126    // ── Checkpoint / Restore ──
1127
1128    #[test]
1129    fn test_checkpoint() {
1130        let mut src = default_source();
1131        src.confirmed_flush_lsn = "1/ABCD".parse().unwrap();
1132        src.polled_lsn = "1/ABCD".parse().unwrap();
1133        src.write_lsn = "1/ABCE".parse().unwrap();
1134
1135        let cp = src.checkpoint();
1136        assert_eq!(cp.get_offset("lsn"), Some("1/ABCD"));
1137        assert_eq!(cp.get_offset("write_lsn"), Some("1/ABCE"));
1138        assert_eq!(cp.get_metadata("slot_name"), Some("laminar_slot"));
1139    }
1140
1141    #[tokio::test]
1142    async fn test_restore() {
1143        let mut src = default_source();
1144        let mut cp = SourceCheckpoint::new(1);
1145        cp.set_offset("lsn", "2/FF00");
1146        cp.set_offset("write_lsn", "2/FF10");
1147
1148        src.restore(&cp).await.unwrap();
1149        assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1150        assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1151    }
1152
1153    #[tokio::test]
1154    async fn test_restore_invalid_lsn() {
1155        let mut src = default_source();
1156        let mut cp = SourceCheckpoint::new(1);
1157        cp.set_offset("lsn", "not_an_lsn");
1158
1159        assert!(src.restore(&cp).await.is_err());
1160    }
1161
1162    // ── Poll (empty) ──
1163
1164    #[tokio::test]
1165    async fn test_poll_empty() {
1166        let mut src = running_source();
1167        let result = src.poll_batch(100).await.unwrap();
1168        assert!(result.is_none());
1169    }
1170
1171    #[tokio::test]
1172    async fn test_poll_not_running() {
1173        let mut src = default_source();
1174        assert!(src.poll_batch(100).await.is_err());
1175    }
1176
1177    // ── WAL message processing: full transaction ──
1178
1179    #[tokio::test]
1180    async fn test_process_insert_transaction() {
1181        let mut src = running_source();
1182
1183        let rel_msg = PostgresCdcSource::build_relation_message(
1184            16384,
1185            "public",
1186            "users",
1187            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1188        );
1189        let begin_msg = PostgresCdcSource::build_begin_message(0x100, 0, 1);
1190        let insert_msg =
1191            PostgresCdcSource::build_insert_message(16384, &[Some("42"), Some("Alice")]);
1192        let commit_msg = PostgresCdcSource::build_commit_message(0x100, 0x200, 0);
1193
1194        src.enqueue_wal_data(rel_msg);
1195        src.enqueue_wal_data(begin_msg);
1196        src.enqueue_wal_data(insert_msg);
1197        src.enqueue_wal_data(commit_msg);
1198
1199        let batch = src.poll_batch(100).await.unwrap().unwrap();
1200        assert_eq!(batch.num_rows(), 1);
1201
1202        let records = &batch.records;
1203        let table_col = records.column(0).as_string::<i32>();
1204        assert_eq!(table_col.value(0), "users");
1205
1206        let op_col = records.column(1).as_string::<i32>();
1207        assert_eq!(op_col.value(0), "I");
1208
1209        let after_col = records.column(5).as_string::<i32>();
1210        let after_json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1211        assert_eq!(after_json["id"], "42");
1212        assert_eq!(after_json["name"], "Alice");
1213
1214        // before should be null for INSERT
1215        assert!(records.column(4).is_null(0));
1216    }
1217
1218    // ── Multiple events in one transaction ──
1219
1220    #[tokio::test]
1221    async fn test_multi_event_transaction() {
1222        let mut src = running_source();
1223
1224        // Register relation
1225        let rel_msg = PostgresCdcSource::build_relation_message(
1226            16384,
1227            "public",
1228            "users",
1229            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1230        );
1231        src.enqueue_wal_data(rel_msg);
1232
1233        // Transaction with 3 events
1234        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x300, 0, 2));
1235        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1236            16384,
1237            &[Some("1"), Some("Alice")],
1238        ));
1239        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1240            16384,
1241            &[Some("2"), Some("Bob")],
1242        ));
1243        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1244            16384,
1245            &[Some("3"), Some("Charlie")],
1246        ));
1247        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x300, 0x400, 0));
1248
1249        let batch = src.poll_batch(100).await.unwrap().unwrap();
1250        assert_eq!(batch.num_rows(), 3);
1251    }
1252
1253    // ── Events buffered until commit ──
1254
1255    #[tokio::test]
1256    async fn test_events_buffered_until_commit() {
1257        let mut src = running_source();
1258
1259        let rel_msg = PostgresCdcSource::build_relation_message(
1260            16384,
1261            "public",
1262            "users",
1263            &[(1, "id", INT8_OID, -1)],
1264        );
1265        src.enqueue_wal_data(rel_msg);
1266
1267        // Begin + Insert but NO commit
1268        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1269        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1270
1271        // Poll should return nothing (events in txn buffer)
1272        let result = src.poll_batch(100).await.unwrap();
1273        assert!(result.is_none());
1274
1275        // Now commit
1276        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1277
1278        let batch = src.poll_batch(100).await.unwrap().unwrap();
1279        assert_eq!(batch.num_rows(), 1);
1280    }
1281
1282    // ── Update with old tuple ──
1283
1284    #[tokio::test]
1285    async fn test_process_update() {
1286        let mut src = running_source();
1287
1288        let rel_msg = PostgresCdcSource::build_relation_message(
1289            16384,
1290            "public",
1291            "users",
1292            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1293        );
1294        src.enqueue_wal_data(rel_msg);
1295
1296        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1297        src.enqueue_wal_data(PostgresCdcSource::build_update_message(
1298            16384,
1299            &[Some("42"), Some("Alice")],
1300            &[Some("42"), Some("Bob")],
1301        ));
1302        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1303
1304        let batch = src.poll_batch(100).await.unwrap().unwrap();
1305        assert_eq!(batch.num_rows(), 1);
1306
1307        let op_col = batch.records.column(1).as_string::<i32>();
1308        assert_eq!(op_col.value(0), "U");
1309
1310        // Both before and after should be present
1311        assert!(!batch.records.column(4).is_null(0)); // before
1312        assert!(!batch.records.column(5).is_null(0)); // after
1313    }
1314
1315    // ── Delete ──
1316
1317    #[tokio::test]
1318    async fn test_process_delete() {
1319        let mut src = running_source();
1320
1321        let rel_msg = PostgresCdcSource::build_relation_message(
1322            16384,
1323            "public",
1324            "users",
1325            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1326        );
1327        src.enqueue_wal_data(rel_msg);
1328
1329        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1330        src.enqueue_wal_data(PostgresCdcSource::build_delete_message(
1331            16384,
1332            &[Some("42")],
1333        ));
1334        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1335
1336        let batch = src.poll_batch(100).await.unwrap().unwrap();
1337        let op_col = batch.records.column(1).as_string::<i32>();
1338        assert_eq!(op_col.value(0), "D");
1339
1340        // before present, after null
1341        assert!(!batch.records.column(4).is_null(0));
1342        assert!(batch.records.column(5).is_null(0));
1343    }
1344
1345    // ── Table filtering ──
1346
1347    #[tokio::test]
1348    async fn test_table_exclude_filter() {
1349        let mut config = PostgresCdcConfig::default();
1350        config.table_exclude = vec!["users".to_string()];
1351        let mut src = PostgresCdcSource::new(config, None);
1352        src.state = ConnectorState::Running;
1353
1354        let rel_msg = PostgresCdcSource::build_relation_message(
1355            16384,
1356            "public",
1357            "users",
1358            &[(1, "id", INT8_OID, -1)],
1359        );
1360        src.enqueue_wal_data(rel_msg);
1361
1362        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1363        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1364        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1365
1366        let result = src.poll_batch(100).await.unwrap();
1367        assert!(result.is_none()); // filtered out
1368    }
1369
1370    // ── Max poll records batching ──
1371
1372    #[tokio::test]
1373    async fn test_max_poll_records() {
1374        let mut src = running_source();
1375
1376        // Inject 5 events directly
1377        for i in 0..5 {
1378            src.inject_event(ChangeEvent {
1379                table: "t".to_string(),
1380                op: CdcOperation::Insert,
1381                lsn: Lsn::new(i as u64),
1382                ts_ms: 0,
1383                before: None,
1384                after: Some(format!("{{\"id\":\"{i}\"}}")),
1385            });
1386        }
1387
1388        // Poll only 2
1389        let batch = src.poll_batch(2).await.unwrap().unwrap();
1390        assert_eq!(batch.num_rows(), 2);
1391        assert_eq!(src.buffered_events(), 3);
1392
1393        // Poll remaining
1394        let batch = src.poll_batch(100).await.unwrap().unwrap();
1395        assert_eq!(batch.num_rows(), 3);
1396        assert_eq!(src.buffered_events(), 0);
1397    }
1398
1399    // ── Partition info ──
1400
1401    #[tokio::test]
1402    async fn test_partition_info() {
1403        let mut src = running_source();
1404        src.write_lsn = "1/ABCD".parse().unwrap();
1405
1406        src.inject_event(ChangeEvent {
1407            table: "t".to_string(),
1408            op: CdcOperation::Insert,
1409            lsn: Lsn::ZERO,
1410            ts_ms: 0,
1411            before: None,
1412            after: Some("{}".to_string()),
1413        });
1414
1415        let batch = src.poll_batch(100).await.unwrap().unwrap();
1416        let partition = batch.partition.unwrap();
1417        assert_eq!(partition.id, "laminar_slot");
1418        assert_eq!(partition.offset, "1/ABCD");
1419    }
1420
1421    // ── Health check ──
1422
1423    #[test]
1424    fn test_health_check_healthy() {
1425        let src = running_source();
1426        assert!(src.health_check().is_healthy());
1427    }
1428
1429    #[test]
1430    fn test_health_check_degraded() {
1431        let mut src = running_source();
1432        src.write_lsn = Lsn::new(200_000_000);
1433        src.confirmed_flush_lsn = Lsn::ZERO;
1434        assert!(matches!(src.health_check(), HealthStatus::Degraded(_)));
1435    }
1436
1437    #[test]
1438    fn test_health_check_unknown_when_created() {
1439        let src = default_source();
1440        assert!(matches!(src.health_check(), HealthStatus::Unknown));
1441    }
1442
1443    // ── Metrics ──
1444
1445    #[tokio::test]
1446    async fn test_metrics_after_processing() {
1447        let mut src = running_source();
1448
1449        let rel_msg = PostgresCdcSource::build_relation_message(
1450            16384,
1451            "public",
1452            "users",
1453            &[(1, "id", INT8_OID, -1)],
1454        );
1455        src.enqueue_wal_data(rel_msg);
1456
1457        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1458        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1459        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("2")]));
1460        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1461
1462        let _ = src.poll_batch(100).await.unwrap();
1463
1464        let metrics = src.metrics();
1465        assert_eq!(metrics.records_total, 2); // 2 inserts
1466    }
1467
1468    // ── Replication lag ──
1469
1470    #[test]
1471    fn test_replication_lag() {
1472        let mut src = default_source();
1473        src.write_lsn = Lsn::new(1000);
1474        src.confirmed_flush_lsn = Lsn::new(500);
1475        assert_eq!(src.replication_lag_bytes(), 500);
1476    }
1477
1478    // ── Unknown relation ID ──
1479
1480    #[tokio::test]
1481    async fn test_unknown_relation_error() {
1482        let mut src = running_source();
1483
1484        // Insert without prior Relation message
1485        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1486        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(99999, &[Some("1")]));
1487
1488        let result = src.poll_batch(100).await;
1489        assert!(result.is_err());
1490    }
1491
1492    // ── Multi-table in one transaction ──
1493
1494    #[tokio::test]
1495    async fn test_multi_table_transaction() {
1496        let mut src = running_source();
1497
1498        // Two relations
1499        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1500            100,
1501            "public",
1502            "users",
1503            &[(1, "id", INT4_OID, -1)],
1504        ));
1505        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1506            200,
1507            "public",
1508            "orders",
1509            &[(1, "order_id", INT4_OID, -1)],
1510        ));
1511
1512        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x500, 0, 5));
1513        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1514        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1515            200,
1516            &[Some("1001")],
1517        ));
1518        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x500, 0x600, 0));
1519
1520        let batch = src.poll_batch(100).await.unwrap().unwrap();
1521        assert_eq!(batch.num_rows(), 2);
1522
1523        let table_col = batch.records.column(0).as_string::<i32>();
1524        assert_eq!(table_col.value(0), "users");
1525        assert_eq!(table_col.value(1), "orders");
1526    }
1527
1528    // ── Relation cache update (schema change) ──
1529
1530    #[tokio::test]
1531    async fn test_schema_change_mid_stream() {
1532        let mut src = running_source();
1533
1534        // Initial schema: 1 column
1535        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1536            100,
1537            "public",
1538            "users",
1539            &[(1, "id", INT4_OID, -1)],
1540        ));
1541
1542        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1543        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1544        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1545
1546        let batch1 = src.poll_batch(100).await.unwrap().unwrap();
1547        assert_eq!(batch1.num_rows(), 1);
1548
1549        // Schema changes: add a column
1550        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1551            100,
1552            "public",
1553            "users",
1554            &[(1, "id", INT4_OID, -1), (0, "email", TEXT_OID, -1)],
1555        ));
1556
1557        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x200, 0, 2));
1558        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1559            100,
1560            &[Some("2"), Some("alice@example.com")],
1561        ));
1562        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x200, 0x300, 0));
1563
1564        let batch2 = src.poll_batch(100).await.unwrap().unwrap();
1565        assert_eq!(batch2.num_rows(), 1);
1566
1567        // Verify the new column appears in JSON
1568        let after_col = batch2.records.column(5).as_string::<i32>();
1569        let json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1570        assert_eq!(json["email"], "alice@example.com");
1571    }
1572
1573    // ── Write LSN advances on commit ──
1574
1575    #[tokio::test]
1576    async fn test_write_lsn_advances() {
1577        let mut src = running_source();
1578
1579        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1580            100,
1581            "public",
1582            "t",
1583            &[(1, "id", INT4_OID, -1)],
1584        ));
1585
1586        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1587        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1588        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1589
1590        let _ = src.poll_batch(100).await;
1591        assert_eq!(src.write_lsn().as_u64(), 0x500);
1592    }
1593
1594    // ── TRUNCATE returns error ──
1595
1596    #[tokio::test]
1597    async fn test_truncate_returns_error() {
1598        let mut src = running_source();
1599
1600        // Register relation so the error message includes the table name.
1601        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1602            16384,
1603            "public",
1604            "users",
1605            &[(1, "id", INT8_OID, -1)],
1606        ));
1607
1608        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1609        src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[16384], 0));
1610
1611        let result = src.poll_batch(100).await;
1612        assert!(result.is_err());
1613        let err = result.unwrap_err().to_string();
1614        assert!(
1615            err.contains("TRUNCATE"),
1616            "error should mention TRUNCATE: {err}"
1617        );
1618        assert!(
1619            err.contains("users"),
1620            "error should mention table name: {err}"
1621        );
1622    }
1623
1624    #[tokio::test]
1625    async fn test_truncate_unknown_relation_uses_oid() {
1626        let mut src = running_source();
1627
1628        // No relation registered for ID 99999
1629        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1630        src.enqueue_wal_data(PostgresCdcSource::build_truncate_message(&[99999], 0));
1631
1632        let result = src.poll_batch(100).await;
1633        assert!(result.is_err());
1634        let err = result.unwrap_err().to_string();
1635        assert!(err.contains("oid:99999"), "error should mention oid: {err}");
1636    }
1637
1638    // ── confirmed_flush_lsn not advanced until checkpoint ──
1639
1640    #[tokio::test]
1641    async fn test_confirmed_lsn_not_advanced_until_checkpoint() {
1642        let mut src = running_source();
1643
1644        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1645            100,
1646            "public",
1647            "t",
1648            &[(1, "id", INT4_OID, -1)],
1649        ));
1650
1651        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1652        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1653        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1654
1655        // Before poll: confirmed_flush_lsn is ZERO.
1656        assert!(src.confirmed_flush_lsn().is_zero());
1657
1658        // After poll: confirmed_flush_lsn must NOT have advanced.
1659        let _ = src.poll_batch(100).await.unwrap().unwrap();
1660        assert!(
1661            src.confirmed_flush_lsn().is_zero(),
1662            "confirmed_flush_lsn should not advance on poll, got {}",
1663            src.confirmed_flush_lsn()
1664        );
1665
1666        // polled_lsn should have advanced.
1667        assert_eq!(src.polled_lsn.as_u64(), 0x500);
1668
1669        // After checkpoint: the checkpoint offset should use polled_lsn.
1670        let cp = src.checkpoint();
1671        assert_eq!(cp.get_offset("lsn"), Some("0/500"));
1672    }
1673
1674    // ── Restore sets polled_lsn ──
1675
1676    #[tokio::test]
1677    async fn test_restore_sets_polled_lsn() {
1678        let mut src = default_source();
1679        let mut cp = SourceCheckpoint::new(1);
1680        cp.set_offset("lsn", "2/FF00");
1681        cp.set_offset("write_lsn", "2/FF10");
1682
1683        src.restore(&cp).await.unwrap();
1684        assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
1685        assert_eq!(src.polled_lsn.as_u64(), 0x2_0000_FF00);
1686        assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
1687    }
1688
1689    // ── Backpressure (no event dropping) ──
1690
1691    #[tokio::test]
1692    async fn test_backpressure_does_not_drop_buffered_events() {
1693        let mut src = running_source();
1694        src.config.max_buffered_events = 100;
1695
1696        // Inject 200 events directly into the event buffer.
1697        // With backpressure, existing buffered events are never dropped —
1698        // only channel draining is paused when the buffer exceeds the
1699        // high watermark. Direct-injected events are already in the buffer.
1700        for i in 0..200u64 {
1701            src.inject_event(ChangeEvent {
1702                table: "public.t".to_string(),
1703                op: CdcOperation::Insert,
1704                before: None,
1705                after: Some(format!("{{\"id\": {i}}}")),
1706                ts_ms: i as i64,
1707                lsn: Lsn::new(i),
1708            });
1709        }
1710        assert_eq!(src.event_buffer.len(), 200);
1711
1712        // poll_batch drains events from the buffer — no dropping.
1713        let batch = src.poll_batch(50).await.unwrap().unwrap();
1714        assert_eq!(batch.records.num_rows(), 50);
1715        // 200 - 50 drained = 150 remaining. No events dropped.
1716        assert_eq!(src.event_buffer.len(), 150);
1717        assert_eq!(src.metrics.events_dropped.get(), 0);
1718    }
1719}