Skip to main content

laminar_connectors/cdc/postgres/
source.rs

1//! `PostgreSQL` CDC source connector implementation.
2//!
3//! Implements [`SourceConnector`] for streaming logical replication changes
4//! from `PostgreSQL` into `LaminarDB` as Arrow `RecordBatch`es.
5//!
6//! # Architecture
7//!
8//! - **Ring 0**: No CDC code — just SPSC channel pop (~5ns)
9//! - **Ring 1**: WAL consumption, pgoutput parsing, Arrow conversion
10//! - **Ring 2**: Slot management, schema discovery, health checks
11
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Instant;
15
16use arrow_array::RecordBatch;
17use arrow_schema::SchemaRef;
18use async_trait::async_trait;
19use tokio::sync::Notify;
20
21use crate::checkpoint::SourceCheckpoint;
22use crate::config::{ConnectorConfig, ConnectorState};
23use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
24use crate::error::ConnectorError;
25use crate::health::HealthStatus;
26use crate::metrics::ConnectorMetrics;
27
28use super::changelog::{events_to_record_batch, tuple_to_json, CdcOperation, ChangeEvent};
29use super::config::PostgresCdcConfig;
30use super::decoder::{decode_message, WalMessage};
31use super::lsn::Lsn;
32use super::metrics::CdcMetrics;
33use super::schema::{cdc_envelope_schema, RelationCache, RelationInfo};
34/// `PostgreSQL` CDC source connector.
35///
36/// Streams row-level changes from `PostgreSQL` using logical replication
37/// (`pgoutput` plugin). Changes are emitted as Arrow `RecordBatch`es
38/// in the CDC envelope format.
39///
40/// # Envelope Schema
41///
42/// | Column   | Type   | Nullable | Description                    |
43/// |----------|--------|----------|--------------------------------|
44/// | `_table` | Utf8   | no       | Schema-qualified table name    |
45/// | `_op`    | Utf8   | no       | Operation: I, U, D             |
46/// | `_lsn`   | UInt64 | no       | WAL position                   |
47/// | `_ts_ms` | Int64  | no       | Commit timestamp (Unix ms)     |
48/// | `_before`| Utf8   | yes      | Old row JSON (for U, D)        |
49/// | `_after` | Utf8   | yes      | New row JSON (for I, U)        |
50pub struct PostgresCdcSource {
51    /// Connector configuration.
52    config: PostgresCdcConfig,
53
54    /// Current lifecycle state.
55    state: ConnectorState,
56
57    /// Output schema (CDC envelope).
58    schema: SchemaRef,
59
60    /// Lock-free metrics.
61    metrics: Arc<CdcMetrics>,
62
63    /// Cached relation (table) schemas from Relation messages.
64    relation_cache: RelationCache,
65
66    /// Buffered change events awaiting `poll_batch()`.
67    event_buffer: VecDeque<ChangeEvent>,
68
69    /// Current transaction state.
70    current_txn: Option<TransactionState>,
71
72    /// Confirmed flush LSN (last acknowledged position).
73    confirmed_flush_lsn: Lsn,
74
75    /// Write LSN (latest position received from server).
76    write_lsn: Lsn,
77
78    /// Last time a keepalive was received (used by production I/O path via `process_wal_payload`).
79    #[allow(dead_code)] // updated in process_wal_payload (feature = "postgres-cdc")
80    last_keepalive: Instant,
81
82    /// Pending WAL messages to process (for testing and batch processing).
83    pending_messages: VecDeque<Vec<u8>>,
84
85    /// Notification handle signalled when WAL data arrives from the reader task.
86    data_ready: Arc<Notify>,
87
88    /// Background connection task handle (feature-gated).
89    #[cfg(feature = "postgres-cdc")]
90    connection_handle: Option<tokio::task::JoinHandle<()>>,
91
92    /// Channel receiver for WAL events from the background reader task.
93    #[cfg(feature = "postgres-cdc")]
94    wal_rx: Option<tokio::sync::mpsc::Receiver<WalPayload>>,
95
96    /// Background WAL reader task handle.
97    #[cfg(feature = "postgres-cdc")]
98    reader_handle: Option<tokio::task::JoinHandle<()>>,
99
100    /// Shutdown signal for the background reader task.
101    #[cfg(feature = "postgres-cdc")]
102    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
103}
104
105/// In-progress transaction state.
106#[derive(Debug, Clone)]
107struct TransactionState {
108    /// Transaction ID.
109    _xid: u32,
110    /// Final LSN of the transaction.
111    final_lsn: Lsn,
112    /// Commit timestamp in milliseconds.
113    commit_ts_ms: i64,
114    /// Change events accumulated in this transaction.
115    events: Vec<ChangeEvent>,
116}
117
118/// WAL event payload sent from the background reader task to [`PostgresCdcSource::poll_batch`].
119#[allow(dead_code)] // constructed + consumed only with feature = "postgres-cdc"
120enum WalPayload {
121    Begin {
122        final_lsn: u64,
123        commit_ts_us: i64,
124        xid: u32,
125    },
126    Commit {
127        end_lsn: u64,
128        commit_ts_us: i64,
129        lsn: u64,
130    },
131    XLogData {
132        wal_end: u64,
133        data: Vec<u8>,
134    },
135    KeepAlive {
136        wal_end: u64,
137    },
138}
139
140impl PostgresCdcSource {
141    /// Creates a new `PostgreSQL` CDC source with the given configuration.
142    #[must_use]
143    pub fn new(config: PostgresCdcConfig) -> Self {
144        Self {
145            config,
146            state: ConnectorState::Created,
147            schema: cdc_envelope_schema(),
148            metrics: Arc::new(CdcMetrics::new()),
149            relation_cache: RelationCache::new(),
150            event_buffer: VecDeque::new(),
151            current_txn: None,
152            confirmed_flush_lsn: Lsn::ZERO,
153            write_lsn: Lsn::ZERO,
154            last_keepalive: Instant::now(),
155            pending_messages: VecDeque::new(),
156            data_ready: Arc::new(Notify::new()),
157            #[cfg(feature = "postgres-cdc")]
158            connection_handle: None,
159            #[cfg(feature = "postgres-cdc")]
160            wal_rx: None,
161            #[cfg(feature = "postgres-cdc")]
162            reader_handle: None,
163            #[cfg(feature = "postgres-cdc")]
164            reader_shutdown: None,
165        }
166    }
167
168    /// Creates a new source from a generic [`ConnectorConfig`].
169    ///
170    /// # Errors
171    ///
172    /// Returns `ConnectorError` if the configuration is invalid.
173    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
174        let pg_config = PostgresCdcConfig::from_config(config)?;
175        Ok(Self::new(pg_config))
176    }
177
178    /// Returns a reference to the CDC configuration.
179    #[must_use]
180    pub fn config(&self) -> &PostgresCdcConfig {
181        &self.config
182    }
183
184    /// Returns the current confirmed flush LSN.
185    #[must_use]
186    pub fn confirmed_flush_lsn(&self) -> Lsn {
187        self.confirmed_flush_lsn
188    }
189
190    /// Returns the current write LSN.
191    #[must_use]
192    pub fn write_lsn(&self) -> Lsn {
193        self.write_lsn
194    }
195
196    /// Returns the current replication lag in bytes.
197    #[must_use]
198    pub fn replication_lag_bytes(&self) -> u64 {
199        self.write_lsn.diff(self.confirmed_flush_lsn)
200    }
201
202    /// Returns a reference to the relation cache.
203    #[must_use]
204    pub fn relation_cache(&self) -> &RelationCache {
205        &self.relation_cache
206    }
207
208    /// Returns the number of buffered events.
209    #[must_use]
210    pub fn buffered_events(&self) -> usize {
211        self.event_buffer.len()
212    }
213
214    /// Enqueues raw WAL message bytes for processing.
215    ///
216    /// Used by the replication stream handler to feed data into the source,
217    /// and by tests to inject synthetic messages.
218    pub fn enqueue_wal_data(&mut self, data: Vec<u8>) {
219        self.pending_messages.push_back(data);
220    }
221
222    /// Processes all pending WAL messages, converting them to change events.
223    ///
224    /// # Errors
225    ///
226    /// Returns `ConnectorError::ReadError` if decoding fails.
227    pub fn process_pending_messages(&mut self) -> Result<(), ConnectorError> {
228        while let Some(data) = self.pending_messages.pop_front() {
229            self.metrics.record_bytes(data.len() as u64);
230            let msg = decode_message(&data)
231                .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
232            self.process_wal_message(msg)?;
233        }
234        Ok(())
235    }
236
237    /// Processes a single decoded WAL message.
238    fn process_wal_message(&mut self, msg: WalMessage) -> Result<(), ConnectorError> {
239        match msg {
240            WalMessage::Begin(begin) => {
241                self.current_txn = Some(TransactionState {
242                    _xid: begin.xid,
243                    final_lsn: begin.final_lsn,
244                    commit_ts_ms: begin.commit_ts_ms,
245                    events: Vec::new(),
246                });
247            }
248            WalMessage::Commit(commit) => {
249                if let Some(txn) = self.current_txn.take() {
250                    // Flush transaction events to the output buffer
251                    for event in txn.events {
252                        self.event_buffer.push_back(event);
253                    }
254                    self.write_lsn = commit.end_lsn;
255                    self.metrics.record_transaction();
256                    self.metrics
257                        .set_replication_lag_bytes(self.replication_lag_bytes());
258                }
259            }
260            WalMessage::Relation(rel) => {
261                let info = RelationInfo {
262                    relation_id: rel.relation_id,
263                    namespace: rel.namespace,
264                    name: rel.name,
265                    replica_identity: rel.replica_identity as char,
266                    columns: rel.columns,
267                };
268                self.relation_cache.insert(info);
269            }
270            WalMessage::Insert(ins) => {
271                self.process_insert(ins.relation_id, &ins.new_tuple)?;
272            }
273            WalMessage::Update(upd) => {
274                self.process_update(upd.relation_id, upd.old_tuple.as_ref(), &upd.new_tuple)?;
275            }
276            WalMessage::Delete(del) => {
277                self.process_delete(del.relation_id, &del.old_tuple)?;
278            }
279            WalMessage::Truncate(_) | WalMessage::Origin(_) | WalMessage::Type(_) => {
280                // Truncate, Origin, and Type messages are noted but don't
281                // produce change events in the current implementation.
282            }
283        }
284        Ok(())
285    }
286
287    fn process_insert(
288        &mut self,
289        relation_id: u32,
290        new_tuple: &super::decoder::TupleData,
291    ) -> Result<(), ConnectorError> {
292        let relation = self.get_relation(relation_id)?;
293        let table = relation.full_name();
294
295        if !self.config.should_include_table(&table) {
296            return Ok(());
297        }
298
299        let after_json = tuple_to_json(new_tuple, &relation);
300        let (lsn, ts_ms) = self.current_txn_context();
301
302        let event = ChangeEvent {
303            table,
304            op: CdcOperation::Insert,
305            lsn,
306            ts_ms,
307            before: None,
308            after: Some(after_json),
309        };
310
311        self.push_event(event);
312        self.metrics.record_insert();
313        Ok(())
314    }
315
316    fn process_update(
317        &mut self,
318        relation_id: u32,
319        old_tuple: Option<&super::decoder::TupleData>,
320        new_tuple: &super::decoder::TupleData,
321    ) -> Result<(), ConnectorError> {
322        let relation = self.get_relation(relation_id)?;
323        let table = relation.full_name();
324
325        if !self.config.should_include_table(&table) {
326            return Ok(());
327        }
328
329        let before_json = old_tuple.map(|t| tuple_to_json(t, &relation));
330        let after_json = tuple_to_json(new_tuple, &relation);
331        let (lsn, ts_ms) = self.current_txn_context();
332
333        let event = ChangeEvent {
334            table,
335            op: CdcOperation::Update,
336            lsn,
337            ts_ms,
338            before: before_json,
339            after: Some(after_json),
340        };
341
342        self.push_event(event);
343        self.metrics.record_update();
344        Ok(())
345    }
346
347    fn process_delete(
348        &mut self,
349        relation_id: u32,
350        old_tuple: &super::decoder::TupleData,
351    ) -> Result<(), ConnectorError> {
352        let relation = self.get_relation(relation_id)?;
353        let table = relation.full_name();
354
355        if !self.config.should_include_table(&table) {
356            return Ok(());
357        }
358
359        let before_json = tuple_to_json(old_tuple, &relation);
360        let (lsn, ts_ms) = self.current_txn_context();
361
362        let event = ChangeEvent {
363            table,
364            op: CdcOperation::Delete,
365            lsn,
366            ts_ms,
367            before: Some(before_json),
368            after: None,
369        };
370
371        self.push_event(event);
372        self.metrics.record_delete();
373        Ok(())
374    }
375
376    fn get_relation(&self, relation_id: u32) -> Result<RelationInfo, ConnectorError> {
377        self.relation_cache
378            .get(relation_id)
379            .cloned()
380            .ok_or_else(|| {
381                ConnectorError::ReadError(format!(
382                    "unknown relation ID {relation_id} (no Relation message received yet)"
383                ))
384            })
385    }
386
387    fn current_txn_context(&self) -> (Lsn, i64) {
388        match &self.current_txn {
389            Some(txn) => (txn.final_lsn, txn.commit_ts_ms),
390            None => (self.write_lsn, 0),
391        }
392    }
393
394    fn push_event(&mut self, event: ChangeEvent) {
395        if let Some(txn) = &mut self.current_txn {
396            txn.events.push(event);
397        } else {
398            self.event_buffer.push_back(event);
399        }
400    }
401
402    /// Processes a [`WalPayload`] received from the background reader task.
403    #[cfg(feature = "postgres-cdc")]
404    fn process_wal_payload(&mut self, payload: WalPayload) -> Result<(), ConnectorError> {
405        use super::decoder::pg_timestamp_to_unix_ms;
406
407        match payload {
408            WalPayload::Begin {
409                final_lsn,
410                commit_ts_us,
411                xid,
412            } => {
413                let begin = super::decoder::BeginMessage {
414                    final_lsn: Lsn::new(final_lsn),
415                    commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
416                    xid,
417                };
418                self.process_wal_message(WalMessage::Begin(begin))
419            }
420            WalPayload::Commit {
421                end_lsn,
422                commit_ts_us,
423                lsn,
424            } => {
425                let commit = super::decoder::CommitMessage {
426                    flags: 0,
427                    commit_lsn: Lsn::new(lsn),
428                    end_lsn: Lsn::new(end_lsn),
429                    commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
430                };
431                self.process_wal_message(WalMessage::Commit(commit))
432            }
433            WalPayload::XLogData { wal_end, data } => {
434                // Skip raw Begin/Commit bytes to avoid double-processing
435                // (pgwire-replication delivers these as separate events).
436                if !data.is_empty() && (data[0] == b'B' || data[0] == b'C') {
437                    self.write_lsn = Lsn::new(wal_end);
438                    return Ok(());
439                }
440                let msg = decode_message(&data)
441                    .map_err(|e| ConnectorError::ReadError(format!("pgoutput decode: {e}")))?;
442                self.process_wal_message(msg)?;
443                self.write_lsn = Lsn::new(wal_end);
444                Ok(())
445            }
446            WalPayload::KeepAlive { wal_end } => {
447                self.write_lsn = Lsn::new(wal_end);
448                self.last_keepalive = Instant::now();
449                Ok(())
450            }
451        }
452    }
453
454    /// Drains up to `max` events from the buffer and converts to a `RecordBatch`.
455    fn drain_events(&mut self, max: usize) -> Result<Option<RecordBatch>, ConnectorError> {
456        if self.event_buffer.is_empty() {
457            return Ok(None);
458        }
459
460        let count = max.min(self.event_buffer.len());
461        let events: Vec<ChangeEvent> = self.event_buffer.drain(..count).collect();
462
463        let batch = events_to_record_batch(&events)
464            .map_err(|e| ConnectorError::Internal(format!("Arrow batch build: {e}")))?;
465
466        self.metrics.record_batch();
467        Ok(Some(batch))
468    }
469}
470
471#[async_trait]
472#[allow(clippy::too_many_lines)]
473impl SourceConnector for PostgresCdcSource {
474    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
475        self.state = ConnectorState::Initializing;
476
477        // If config has properties, re-parse (supports runtime config via SQL WITH).
478        if !config.properties().is_empty() {
479            self.config = PostgresCdcConfig::from_config(config)?;
480        }
481
482        // Set start LSN if configured
483        if let Some(lsn) = self.config.start_lsn {
484            self.confirmed_flush_lsn = lsn;
485            self.write_lsn = lsn;
486        }
487
488        // Without postgres-cdc feature, open() must fail loudly to prevent
489        // silent data loss (poll_batch would return Ok(None) forever).
490        #[cfg(not(feature = "postgres-cdc"))]
491        {
492            return Err(ConnectorError::ConfigurationError(
493                "PostgreSQL CDC source requires the `postgres-cdc` feature flag. \
494                 Rebuild with `--features postgres-cdc` to enable."
495                    .to_string(),
496            ));
497        }
498
499        #[cfg(all(feature = "postgres-cdc", not(test)))]
500        {
501            use super::postgres_io;
502
503            // 1. Connect control-plane for slot management
504            let (client, handle) = postgres_io::connect(&self.config).await?;
505            self.connection_handle = Some(handle);
506
507            // 2. Ensure replication slot exists
508            let slot_lsn = postgres_io::ensure_replication_slot(
509                &client,
510                &self.config.slot_name,
511                &self.config.output_plugin,
512            )
513            .await?;
514
515            // Use slot's confirmed_flush_lsn if no explicit start LSN
516            if self.config.start_lsn.is_none() {
517                if let Some(lsn) = slot_lsn {
518                    self.confirmed_flush_lsn = lsn;
519                    self.write_lsn = lsn;
520                }
521            }
522
523            // 3. Build pgwire-replication config and start WAL streaming
524            let mut repl_config = postgres_io::build_replication_config(&self.config);
525            // If we resolved a slot LSN, override start_lsn so we resume correctly
526            if self.confirmed_flush_lsn != Lsn::ZERO {
527                repl_config.start_lsn =
528                    pgwire_replication::Lsn::from_u64(self.confirmed_flush_lsn.as_u64());
529            }
530
531            let repl_client = pgwire_replication::ReplicationClient::connect(repl_config)
532                .await
533                .map_err(|e| {
534                    ConnectorError::ConnectionFailed(format!("pgwire-replication connect: {e}"))
535                })?;
536
537            // Spawn background reader task for event-driven wake-up.
538            let (wal_tx, wal_rx) = tokio::sync::mpsc::channel(4096);
539            let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
540            let data_ready = Arc::clone(&self.data_ready);
541
542            let reader_handle = tokio::spawn(async move {
543                let mut repl_client = repl_client;
544                loop {
545                    tokio::select! {
546                        biased;
547                        _ = shutdown_rx.changed() => break,
548                        event = repl_client.recv() => {
549                            match event {
550                                Ok(Some(event)) => {
551                                    let payload = match &event {
552                                        pgwire_replication::ReplicationEvent::Begin {
553                                            final_lsn,
554                                            xid,
555                                            commit_time_micros,
556                                        } => Some(WalPayload::Begin {
557                                            final_lsn: final_lsn.as_u64(),
558                                            commit_ts_us: *commit_time_micros,
559                                            xid: *xid,
560                                        }),
561                                        pgwire_replication::ReplicationEvent::Commit {
562                                            end_lsn,
563                                            commit_time_micros,
564                                            lsn,
565                                        } => {
566                                            repl_client.update_applied_lsn(*end_lsn);
567                                            Some(WalPayload::Commit {
568                                                end_lsn: end_lsn.as_u64(),
569                                                commit_ts_us: *commit_time_micros,
570                                                lsn: lsn.as_u64(),
571                                            })
572                                        }
573                                        pgwire_replication::ReplicationEvent::XLogData {
574                                            wal_end,
575                                            data,
576                                            ..
577                                        } => Some(WalPayload::XLogData {
578                                            wal_end: wal_end.as_u64(),
579                                            data: data.to_vec(),
580                                        }),
581                                        pgwire_replication::ReplicationEvent::KeepAlive {
582                                            wal_end,
583                                            ..
584                                        } => Some(WalPayload::KeepAlive {
585                                            wal_end: wal_end.as_u64(),
586                                        }),
587                                        _ => None,
588                                    };
589                                    if let Some(p) = payload {
590                                        if wal_tx.send(p).await.is_err() {
591                                            break;
592                                        }
593                                        data_ready.notify_one();
594                                    }
595                                }
596                                Ok(None) => break,
597                                Err(e) => {
598                                    tracing::warn!(error = %e, "WAL reader error");
599                                    break;
600                                }
601                            }
602                        }
603                    }
604                }
605                let _ = repl_client.shutdown().await;
606            });
607
608            self.wal_rx = Some(wal_rx);
609            self.reader_handle = Some(reader_handle);
610            self.reader_shutdown = Some(shutdown_tx);
611            self.last_keepalive = Instant::now();
612        }
613
614        self.state = ConnectorState::Running;
615        Ok(())
616    }
617
618    async fn poll_batch(
619        &mut self,
620        max_records: usize,
621    ) -> Result<Option<SourceBatch>, ConnectorError> {
622        if self.state != ConnectorState::Running {
623            return Err(ConnectorError::InvalidState {
624                expected: "Running".to_string(),
625                actual: self.state.to_string(),
626            });
627        }
628
629        // Drain WAL events from background reader task.
630        #[cfg(feature = "postgres-cdc")]
631        if let Some(mut rx) = self.wal_rx.take() {
632            while self.event_buffer.len() < max_records {
633                match rx.try_recv() {
634                    Ok(payload) => self.process_wal_payload(payload)?,
635                    Err(_) => break,
636                }
637            }
638            self.wal_rx = Some(rx);
639        }
640
641        // Process any pending WAL messages (test injection path)
642        self.process_pending_messages()?;
643
644        // Drain buffered events into a RecordBatch.
645        // Watermark advancement: the batch contains `_ts_ms` (commit timestamp)
646        // which downstream pipeline watermark extractors should use. The LSN
647        // in PartitionInfo tracks replication progress for offset management.
648        // TODO: extract max(_ts_ms) and expose as source-level watermark for
649        // windowed aggregations that depend on CDC event time.
650        match self.drain_events(max_records)? {
651            Some(batch) => {
652                let lsn_str = self.write_lsn.to_string();
653                let partition = PartitionInfo::new(&self.config.slot_name, lsn_str);
654                Ok(Some(SourceBatch::with_partition(batch, partition)))
655            }
656            None => Ok(None),
657        }
658    }
659
660    fn schema(&self) -> SchemaRef {
661        Arc::clone(&self.schema)
662    }
663
664    fn checkpoint(&self) -> SourceCheckpoint {
665        let mut cp = SourceCheckpoint::new(0);
666        cp.set_offset("lsn", self.confirmed_flush_lsn.to_string());
667        cp.set_offset("write_lsn", self.write_lsn.to_string());
668        cp.set_metadata("slot_name", &self.config.slot_name);
669        cp.set_metadata("publication", &self.config.publication);
670        cp
671    }
672
673    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
674        if let Some(lsn_str) = checkpoint.get_offset("lsn") {
675            let lsn: Lsn = lsn_str.parse().map_err(|e| {
676                ConnectorError::CheckpointError(format!("invalid LSN in checkpoint: {e}"))
677            })?;
678            self.confirmed_flush_lsn = lsn;
679            self.metrics.set_confirmed_flush_lsn(lsn.as_u64());
680        }
681        if let Some(write_lsn_str) = checkpoint.get_offset("write_lsn") {
682            if let Ok(lsn) = write_lsn_str.parse::<Lsn>() {
683                self.write_lsn = lsn;
684            }
685        }
686        Ok(())
687    }
688
689    fn health_check(&self) -> HealthStatus {
690        match self.state {
691            ConnectorState::Running => {
692                let lag = self.replication_lag_bytes();
693                if lag > 100_000_000 {
694                    // > 100MB lag
695                    HealthStatus::Degraded(format!("replication lag: {lag} bytes"))
696                } else {
697                    HealthStatus::Healthy
698                }
699            }
700            ConnectorState::Failed => HealthStatus::Unhealthy("connector failed".to_string()),
701            _ => HealthStatus::Unknown,
702        }
703    }
704
705    fn metrics(&self) -> ConnectorMetrics {
706        self.metrics.to_connector_metrics()
707    }
708
709    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
710        Some(Arc::clone(&self.data_ready))
711    }
712
713    async fn close(&mut self) -> Result<(), ConnectorError> {
714        // Signal reader task to shut down (it calls repl_client.shutdown() internally).
715        #[cfg(feature = "postgres-cdc")]
716        {
717            if let Some(tx) = self.reader_shutdown.take() {
718                let _ = tx.send(true);
719            }
720            if let Some(handle) = self.reader_handle.take() {
721                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
722            }
723            self.wal_rx = None;
724        }
725
726        // Abort the background control-plane connection task.
727        #[cfg(feature = "postgres-cdc")]
728        if let Some(handle) = self.connection_handle.take() {
729            handle.abort();
730        }
731
732        self.state = ConnectorState::Closed;
733        self.event_buffer.clear();
734        self.pending_messages.clear();
735        Ok(())
736    }
737}
738
739// ── Test helpers ──
740
741#[cfg(test)]
742impl PostgresCdcSource {
743    /// Injects a pre-built change event directly into the event buffer.
744    fn inject_event(&mut self, event: ChangeEvent) {
745        self.event_buffer.push_back(event);
746    }
747
748    /// Builds a binary pgoutput Relation message for testing.
749    fn build_relation_message(
750        relation_id: u32,
751        namespace: &str,
752        name: &str,
753        columns: &[(u8, &str, u32, i32)], // (flags, name, oid, modifier)
754    ) -> Vec<u8> {
755        let mut buf = vec![b'R'];
756        buf.extend_from_slice(&relation_id.to_be_bytes());
757        buf.extend_from_slice(namespace.as_bytes());
758        buf.push(0);
759        buf.extend_from_slice(name.as_bytes());
760        buf.push(0);
761        buf.push(b'd'); // replica identity = default
762        buf.extend_from_slice(&(columns.len() as i16).to_be_bytes());
763        for (flags, col_name, oid, modifier) in columns {
764            buf.push(*flags);
765            buf.extend_from_slice(col_name.as_bytes());
766            buf.push(0);
767            buf.extend_from_slice(&oid.to_be_bytes());
768            buf.extend_from_slice(&modifier.to_be_bytes());
769        }
770        buf
771    }
772
773    /// Builds a binary pgoutput Begin message for testing.
774    fn build_begin_message(final_lsn: u64, commit_ts_us: i64, xid: u32) -> Vec<u8> {
775        let mut buf = vec![b'B'];
776        buf.extend_from_slice(&final_lsn.to_be_bytes());
777        buf.extend_from_slice(&commit_ts_us.to_be_bytes());
778        buf.extend_from_slice(&xid.to_be_bytes());
779        buf
780    }
781
782    /// Builds a binary pgoutput Commit message for testing.
783    fn build_commit_message(commit_lsn: u64, end_lsn: u64, commit_ts_us: i64) -> Vec<u8> {
784        let mut buf = vec![b'C'];
785        buf.push(0); // flags
786        buf.extend_from_slice(&commit_lsn.to_be_bytes());
787        buf.extend_from_slice(&end_lsn.to_be_bytes());
788        buf.extend_from_slice(&commit_ts_us.to_be_bytes());
789        buf
790    }
791
792    /// Builds a binary pgoutput Insert message for testing.
793    fn build_insert_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
794        let mut buf = vec![b'I'];
795        buf.extend_from_slice(&relation_id.to_be_bytes());
796        buf.push(b'N');
797        buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
798        for val in values {
799            match val {
800                Some(s) => {
801                    buf.push(b't');
802                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
803                    buf.extend_from_slice(s.as_bytes());
804                }
805                None => buf.push(b'n'),
806            }
807        }
808        buf
809    }
810
811    /// Builds a binary pgoutput Delete message for testing.
812    fn build_delete_message(relation_id: u32, values: &[Option<&str>]) -> Vec<u8> {
813        let mut buf = vec![b'D'];
814        buf.extend_from_slice(&relation_id.to_be_bytes());
815        buf.push(b'K'); // key identity
816        buf.extend_from_slice(&(values.len() as i16).to_be_bytes());
817        for val in values {
818            match val {
819                Some(s) => {
820                    buf.push(b't');
821                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
822                    buf.extend_from_slice(s.as_bytes());
823                }
824                None => buf.push(b'n'),
825            }
826        }
827        buf
828    }
829
830    /// Builds a binary pgoutput Update message (with old tuple) for testing.
831    fn build_update_message(
832        relation_id: u32,
833        old_values: &[Option<&str>],
834        new_values: &[Option<&str>],
835    ) -> Vec<u8> {
836        let mut buf = vec![b'U'];
837        buf.extend_from_slice(&relation_id.to_be_bytes());
838        // Old tuple with 'O' tag (REPLICA IDENTITY FULL)
839        buf.push(b'O');
840        buf.extend_from_slice(&(old_values.len() as i16).to_be_bytes());
841        for val in old_values {
842            match val {
843                Some(s) => {
844                    buf.push(b't');
845                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
846                    buf.extend_from_slice(s.as_bytes());
847                }
848                None => buf.push(b'n'),
849            }
850        }
851        // New tuple
852        buf.push(b'N');
853        buf.extend_from_slice(&(new_values.len() as i16).to_be_bytes());
854        for val in new_values {
855            match val {
856                Some(s) => {
857                    buf.push(b't');
858                    buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
859                    buf.extend_from_slice(s.as_bytes());
860                }
861                None => buf.push(b'n'),
862            }
863        }
864        buf
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use super::*;
871    use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
872    use arrow_array::cast::AsArray;
873
874    fn default_source() -> PostgresCdcSource {
875        PostgresCdcSource::new(PostgresCdcConfig::default())
876    }
877
878    fn running_source() -> PostgresCdcSource {
879        let mut src = default_source();
880        src.state = ConnectorState::Running;
881        src
882    }
883
884    // ── Construction ──
885
886    #[test]
887    fn test_new_source() {
888        let src = default_source();
889        assert_eq!(src.state, ConnectorState::Created);
890        assert!(src.confirmed_flush_lsn.is_zero());
891        assert_eq!(src.event_buffer.len(), 0);
892        assert_eq!(src.schema().fields().len(), 6);
893    }
894
895    #[test]
896    fn test_from_config() {
897        let mut config = ConnectorConfig::new("postgres-cdc");
898        config.set("host", "pg.local");
899        config.set("database", "testdb");
900        config.set("slot.name", "my_slot");
901        config.set("publication", "my_pub");
902
903        let src = PostgresCdcSource::from_config(&config).unwrap();
904        assert_eq!(src.config().host, "pg.local");
905        assert_eq!(src.config().database, "testdb");
906    }
907
908    #[test]
909    fn test_from_config_invalid() {
910        let config = ConnectorConfig::new("postgres-cdc");
911        assert!(PostgresCdcSource::from_config(&config).is_err());
912    }
913
914    // ── Lifecycle ──
915
916    // Without postgres-cdc feature, open() must return an error.
917    #[cfg(not(feature = "postgres-cdc"))]
918    #[tokio::test]
919    async fn test_open_fails_without_feature() {
920        let mut src = default_source();
921        let config = ConnectorConfig::new("postgres-cdc");
922        let result = src.open(&config).await;
923        assert!(result.is_err());
924        let err = result.unwrap_err().to_string();
925        assert!(
926            err.contains("postgres-cdc"),
927            "error should mention feature flag: {err}"
928        );
929    }
930
931    #[tokio::test]
932    async fn test_close() {
933        let mut src = running_source();
934        src.inject_event(ChangeEvent {
935            table: "t".to_string(),
936            op: CdcOperation::Insert,
937            lsn: Lsn::ZERO,
938            ts_ms: 0,
939            before: None,
940            after: Some("{}".to_string()),
941        });
942
943        src.close().await.unwrap();
944        assert_eq!(src.state, ConnectorState::Closed);
945        assert_eq!(src.event_buffer.len(), 0);
946    }
947
948    // ── Checkpoint / Restore ──
949
950    #[test]
951    fn test_checkpoint() {
952        let mut src = default_source();
953        src.confirmed_flush_lsn = "1/ABCD".parse().unwrap();
954        src.write_lsn = "1/ABCE".parse().unwrap();
955
956        let cp = src.checkpoint();
957        assert_eq!(cp.get_offset("lsn"), Some("1/ABCD"));
958        assert_eq!(cp.get_offset("write_lsn"), Some("1/ABCE"));
959        assert_eq!(cp.get_metadata("slot_name"), Some("laminar_slot"));
960    }
961
962    #[tokio::test]
963    async fn test_restore() {
964        let mut src = default_source();
965        let mut cp = SourceCheckpoint::new(1);
966        cp.set_offset("lsn", "2/FF00");
967        cp.set_offset("write_lsn", "2/FF10");
968
969        src.restore(&cp).await.unwrap();
970        assert_eq!(src.confirmed_flush_lsn.as_u64(), 0x2_0000_FF00);
971        assert_eq!(src.write_lsn.as_u64(), 0x2_0000_FF10);
972    }
973
974    #[tokio::test]
975    async fn test_restore_invalid_lsn() {
976        let mut src = default_source();
977        let mut cp = SourceCheckpoint::new(1);
978        cp.set_offset("lsn", "not_an_lsn");
979
980        assert!(src.restore(&cp).await.is_err());
981    }
982
983    // ── Poll (empty) ──
984
985    #[tokio::test]
986    async fn test_poll_empty() {
987        let mut src = running_source();
988        let result = src.poll_batch(100).await.unwrap();
989        assert!(result.is_none());
990    }
991
992    #[tokio::test]
993    async fn test_poll_not_running() {
994        let mut src = default_source();
995        assert!(src.poll_batch(100).await.is_err());
996    }
997
998    // ── WAL message processing: full transaction ──
999
1000    #[tokio::test]
1001    async fn test_process_insert_transaction() {
1002        let mut src = running_source();
1003
1004        let rel_msg = PostgresCdcSource::build_relation_message(
1005            16384,
1006            "public",
1007            "users",
1008            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1009        );
1010        let begin_msg = PostgresCdcSource::build_begin_message(0x100, 0, 1);
1011        let insert_msg =
1012            PostgresCdcSource::build_insert_message(16384, &[Some("42"), Some("Alice")]);
1013        let commit_msg = PostgresCdcSource::build_commit_message(0x100, 0x200, 0);
1014
1015        src.enqueue_wal_data(rel_msg);
1016        src.enqueue_wal_data(begin_msg);
1017        src.enqueue_wal_data(insert_msg);
1018        src.enqueue_wal_data(commit_msg);
1019
1020        let batch = src.poll_batch(100).await.unwrap().unwrap();
1021        assert_eq!(batch.num_rows(), 1);
1022
1023        let records = &batch.records;
1024        let table_col = records.column(0).as_string::<i32>();
1025        assert_eq!(table_col.value(0), "users");
1026
1027        let op_col = records.column(1).as_string::<i32>();
1028        assert_eq!(op_col.value(0), "I");
1029
1030        let after_col = records.column(5).as_string::<i32>();
1031        let after_json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1032        assert_eq!(after_json["id"], "42");
1033        assert_eq!(after_json["name"], "Alice");
1034
1035        // before should be null for INSERT
1036        assert!(records.column(4).is_null(0));
1037    }
1038
1039    // ── Multiple events in one transaction ──
1040
1041    #[tokio::test]
1042    async fn test_multi_event_transaction() {
1043        let mut src = running_source();
1044
1045        // Register relation
1046        let rel_msg = PostgresCdcSource::build_relation_message(
1047            16384,
1048            "public",
1049            "users",
1050            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1051        );
1052        src.enqueue_wal_data(rel_msg);
1053
1054        // Transaction with 3 events
1055        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x300, 0, 2));
1056        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1057            16384,
1058            &[Some("1"), Some("Alice")],
1059        ));
1060        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1061            16384,
1062            &[Some("2"), Some("Bob")],
1063        ));
1064        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1065            16384,
1066            &[Some("3"), Some("Charlie")],
1067        ));
1068        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x300, 0x400, 0));
1069
1070        let batch = src.poll_batch(100).await.unwrap().unwrap();
1071        assert_eq!(batch.num_rows(), 3);
1072    }
1073
1074    // ── Events buffered until commit ──
1075
1076    #[tokio::test]
1077    async fn test_events_buffered_until_commit() {
1078        let mut src = running_source();
1079
1080        let rel_msg = PostgresCdcSource::build_relation_message(
1081            16384,
1082            "public",
1083            "users",
1084            &[(1, "id", INT8_OID, -1)],
1085        );
1086        src.enqueue_wal_data(rel_msg);
1087
1088        // Begin + Insert but NO commit
1089        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1090        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1091
1092        // Poll should return nothing (events in txn buffer)
1093        let result = src.poll_batch(100).await.unwrap();
1094        assert!(result.is_none());
1095
1096        // Now commit
1097        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1098
1099        let batch = src.poll_batch(100).await.unwrap().unwrap();
1100        assert_eq!(batch.num_rows(), 1);
1101    }
1102
1103    // ── Update with old tuple ──
1104
1105    #[tokio::test]
1106    async fn test_process_update() {
1107        let mut src = running_source();
1108
1109        let rel_msg = PostgresCdcSource::build_relation_message(
1110            16384,
1111            "public",
1112            "users",
1113            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1114        );
1115        src.enqueue_wal_data(rel_msg);
1116
1117        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1118        src.enqueue_wal_data(PostgresCdcSource::build_update_message(
1119            16384,
1120            &[Some("42"), Some("Alice")],
1121            &[Some("42"), Some("Bob")],
1122        ));
1123        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1124
1125        let batch = src.poll_batch(100).await.unwrap().unwrap();
1126        assert_eq!(batch.num_rows(), 1);
1127
1128        let op_col = batch.records.column(1).as_string::<i32>();
1129        assert_eq!(op_col.value(0), "U");
1130
1131        // Both before and after should be present
1132        assert!(!batch.records.column(4).is_null(0)); // before
1133        assert!(!batch.records.column(5).is_null(0)); // after
1134    }
1135
1136    // ── Delete ──
1137
1138    #[tokio::test]
1139    async fn test_process_delete() {
1140        let mut src = running_source();
1141
1142        let rel_msg = PostgresCdcSource::build_relation_message(
1143            16384,
1144            "public",
1145            "users",
1146            &[(1, "id", INT8_OID, -1), (0, "name", TEXT_OID, -1)],
1147        );
1148        src.enqueue_wal_data(rel_msg);
1149
1150        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1151        src.enqueue_wal_data(PostgresCdcSource::build_delete_message(
1152            16384,
1153            &[Some("42")],
1154        ));
1155        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1156
1157        let batch = src.poll_batch(100).await.unwrap().unwrap();
1158        let op_col = batch.records.column(1).as_string::<i32>();
1159        assert_eq!(op_col.value(0), "D");
1160
1161        // before present, after null
1162        assert!(!batch.records.column(4).is_null(0));
1163        assert!(batch.records.column(5).is_null(0));
1164    }
1165
1166    // ── Table filtering ──
1167
1168    #[tokio::test]
1169    async fn test_table_exclude_filter() {
1170        let mut config = PostgresCdcConfig::default();
1171        config.table_exclude = vec!["users".to_string()];
1172        let mut src = PostgresCdcSource::new(config);
1173        src.state = ConnectorState::Running;
1174
1175        let rel_msg = PostgresCdcSource::build_relation_message(
1176            16384,
1177            "public",
1178            "users",
1179            &[(1, "id", INT8_OID, -1)],
1180        );
1181        src.enqueue_wal_data(rel_msg);
1182
1183        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1184        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1185        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1186
1187        let result = src.poll_batch(100).await.unwrap();
1188        assert!(result.is_none()); // filtered out
1189    }
1190
1191    // ── Max poll records batching ──
1192
1193    #[tokio::test]
1194    async fn test_max_poll_records() {
1195        let mut src = running_source();
1196
1197        // Inject 5 events directly
1198        for i in 0..5 {
1199            src.inject_event(ChangeEvent {
1200                table: "t".to_string(),
1201                op: CdcOperation::Insert,
1202                lsn: Lsn::new(i as u64),
1203                ts_ms: 0,
1204                before: None,
1205                after: Some(format!("{{\"id\":\"{i}\"}}")),
1206            });
1207        }
1208
1209        // Poll only 2
1210        let batch = src.poll_batch(2).await.unwrap().unwrap();
1211        assert_eq!(batch.num_rows(), 2);
1212        assert_eq!(src.buffered_events(), 3);
1213
1214        // Poll remaining
1215        let batch = src.poll_batch(100).await.unwrap().unwrap();
1216        assert_eq!(batch.num_rows(), 3);
1217        assert_eq!(src.buffered_events(), 0);
1218    }
1219
1220    // ── Partition info ──
1221
1222    #[tokio::test]
1223    async fn test_partition_info() {
1224        let mut src = running_source();
1225        src.write_lsn = "1/ABCD".parse().unwrap();
1226
1227        src.inject_event(ChangeEvent {
1228            table: "t".to_string(),
1229            op: CdcOperation::Insert,
1230            lsn: Lsn::ZERO,
1231            ts_ms: 0,
1232            before: None,
1233            after: Some("{}".to_string()),
1234        });
1235
1236        let batch = src.poll_batch(100).await.unwrap().unwrap();
1237        let partition = batch.partition.unwrap();
1238        assert_eq!(partition.id, "laminar_slot");
1239        assert_eq!(partition.offset, "1/ABCD");
1240    }
1241
1242    // ── Health check ──
1243
1244    #[test]
1245    fn test_health_check_healthy() {
1246        let src = running_source();
1247        assert!(src.health_check().is_healthy());
1248    }
1249
1250    #[test]
1251    fn test_health_check_degraded() {
1252        let mut src = running_source();
1253        src.write_lsn = Lsn::new(200_000_000);
1254        src.confirmed_flush_lsn = Lsn::ZERO;
1255        assert!(matches!(src.health_check(), HealthStatus::Degraded(_)));
1256    }
1257
1258    #[test]
1259    fn test_health_check_unknown_when_created() {
1260        let src = default_source();
1261        assert!(matches!(src.health_check(), HealthStatus::Unknown));
1262    }
1263
1264    // ── Metrics ──
1265
1266    #[tokio::test]
1267    async fn test_metrics_after_processing() {
1268        let mut src = running_source();
1269
1270        let rel_msg = PostgresCdcSource::build_relation_message(
1271            16384,
1272            "public",
1273            "users",
1274            &[(1, "id", INT8_OID, -1)],
1275        );
1276        src.enqueue_wal_data(rel_msg);
1277
1278        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1279        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("1")]));
1280        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(16384, &[Some("2")]));
1281        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1282
1283        let _ = src.poll_batch(100).await.unwrap();
1284
1285        let metrics = src.metrics();
1286        assert_eq!(metrics.records_total, 2); // 2 inserts
1287    }
1288
1289    // ── Replication lag ──
1290
1291    #[test]
1292    fn test_replication_lag() {
1293        let mut src = default_source();
1294        src.write_lsn = Lsn::new(1000);
1295        src.confirmed_flush_lsn = Lsn::new(500);
1296        assert_eq!(src.replication_lag_bytes(), 500);
1297    }
1298
1299    // ── Unknown relation ID ──
1300
1301    #[tokio::test]
1302    async fn test_unknown_relation_error() {
1303        let mut src = running_source();
1304
1305        // Insert without prior Relation message
1306        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1307        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(99999, &[Some("1")]));
1308
1309        let result = src.poll_batch(100).await;
1310        assert!(result.is_err());
1311    }
1312
1313    // ── Multi-table in one transaction ──
1314
1315    #[tokio::test]
1316    async fn test_multi_table_transaction() {
1317        let mut src = running_source();
1318
1319        // Two relations
1320        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1321            100,
1322            "public",
1323            "users",
1324            &[(1, "id", INT4_OID, -1)],
1325        ));
1326        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1327            200,
1328            "public",
1329            "orders",
1330            &[(1, "order_id", INT4_OID, -1)],
1331        ));
1332
1333        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x500, 0, 5));
1334        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1335        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1336            200,
1337            &[Some("1001")],
1338        ));
1339        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x500, 0x600, 0));
1340
1341        let batch = src.poll_batch(100).await.unwrap().unwrap();
1342        assert_eq!(batch.num_rows(), 2);
1343
1344        let table_col = batch.records.column(0).as_string::<i32>();
1345        assert_eq!(table_col.value(0), "users");
1346        assert_eq!(table_col.value(1), "orders");
1347    }
1348
1349    // ── Relation cache update (schema change) ──
1350
1351    #[tokio::test]
1352    async fn test_schema_change_mid_stream() {
1353        let mut src = running_source();
1354
1355        // Initial schema: 1 column
1356        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1357            100,
1358            "public",
1359            "users",
1360            &[(1, "id", INT4_OID, -1)],
1361        ));
1362
1363        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1364        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1365        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x200, 0));
1366
1367        let batch1 = src.poll_batch(100).await.unwrap().unwrap();
1368        assert_eq!(batch1.num_rows(), 1);
1369
1370        // Schema changes: add a column
1371        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1372            100,
1373            "public",
1374            "users",
1375            &[(1, "id", INT4_OID, -1), (0, "email", TEXT_OID, -1)],
1376        ));
1377
1378        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x200, 0, 2));
1379        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(
1380            100,
1381            &[Some("2"), Some("alice@example.com")],
1382        ));
1383        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x200, 0x300, 0));
1384
1385        let batch2 = src.poll_batch(100).await.unwrap().unwrap();
1386        assert_eq!(batch2.num_rows(), 1);
1387
1388        // Verify the new column appears in JSON
1389        let after_col = batch2.records.column(5).as_string::<i32>();
1390        let json: serde_json::Value = serde_json::from_str(after_col.value(0)).unwrap();
1391        assert_eq!(json["email"], "alice@example.com");
1392    }
1393
1394    // ── Write LSN advances on commit ──
1395
1396    #[tokio::test]
1397    async fn test_write_lsn_advances() {
1398        let mut src = running_source();
1399
1400        src.enqueue_wal_data(PostgresCdcSource::build_relation_message(
1401            100,
1402            "public",
1403            "t",
1404            &[(1, "id", INT4_OID, -1)],
1405        ));
1406
1407        src.enqueue_wal_data(PostgresCdcSource::build_begin_message(0x100, 0, 1));
1408        src.enqueue_wal_data(PostgresCdcSource::build_insert_message(100, &[Some("1")]));
1409        src.enqueue_wal_data(PostgresCdcSource::build_commit_message(0x100, 0x500, 0));
1410
1411        let _ = src.poll_batch(100).await;
1412        assert_eq!(src.write_lsn().as_u64(), 0x500);
1413    }
1414}