Skip to main content

laminar_connectors/lookup/
cdc_adapter.rs

1//! CDC-to-reference-table adapter.
2//!
3//! Wraps any `SourceConnector` (Postgres CDC, MySQL CDC) as a
4//! `ReferenceTableSource` so CDC streams can populate lookup tables.
5//!
6//! The adapter has two phases:
7//! 1. **Snapshot**: calls `poll_batch()` until the source returns `None`,
8//!    treating the initial burst as a consistent snapshot.
9//! 2. **Changes**: subsequent `poll_changes()` calls continue polling
10//!    for incremental CDC events.
11
12use arrow_array::RecordBatch;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::reference::ReferenceTableSource;
19
20/// Lifecycle phase of the CDC adapter.
21enum Phase {
22    /// Not yet opened.
23    Init,
24    /// Delivering initial snapshot batches via `poll_snapshot()`.
25    Snapshot,
26    /// Snapshot complete; delivering incremental CDC via `poll_changes()`.
27    Changes,
28    /// Closed.
29    Closed,
30}
31
32/// Adapts a [`SourceConnector`] into a [`ReferenceTableSource`].
33///
34/// The underlying connector is opened lazily on the first `poll_snapshot()`
35/// call. Snapshot completion is detected when `poll_batch()` returns `None`
36/// for the first time after delivering at least one batch (or immediately
37/// if the source has no data).
38pub struct CdcTableSource {
39    connector: Box<dyn SourceConnector>,
40    config: ConnectorConfig,
41    phase: Phase,
42    max_batch_size: usize,
43}
44
45impl CdcTableSource {
46    /// Creates a new adapter wrapping the given source connector.
47    ///
48    /// `max_batch_size` is the hint passed to `poll_batch()`.
49    #[must_use]
50    pub fn new(
51        connector: Box<dyn SourceConnector>,
52        config: ConnectorConfig,
53        max_batch_size: usize,
54    ) -> Self {
55        Self {
56            connector,
57            config,
58            phase: Phase::Init,
59            max_batch_size,
60        }
61    }
62
63    /// Ensures the connector is opened before polling.
64    async fn ensure_open(&mut self) -> Result<(), ConnectorError> {
65        if matches!(self.phase, Phase::Init) {
66            self.connector.open(&self.config).await?;
67            self.phase = Phase::Snapshot;
68        }
69        Ok(())
70    }
71
72    /// Polls the underlying connector for the next batch.
73    async fn poll_inner(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
74        match self.connector.poll_batch(self.max_batch_size).await? {
75            Some(SourceBatch { records, .. }) if records.num_rows() > 0 => Ok(Some(records)),
76            _ => Ok(None),
77        }
78    }
79}
80
81#[async_trait::async_trait]
82impl ReferenceTableSource for CdcTableSource {
83    async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
84        self.ensure_open().await?;
85
86        if !matches!(self.phase, Phase::Snapshot) {
87            return Ok(None);
88        }
89
90        if let Some(batch) = self.poll_inner().await? {
91            return Ok(Some(batch));
92        }
93        self.phase = Phase::Changes;
94        Ok(None)
95    }
96
97    fn is_snapshot_complete(&self) -> bool {
98        matches!(self.phase, Phase::Changes | Phase::Closed)
99    }
100
101    async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
102        if !matches!(self.phase, Phase::Changes) {
103            return Ok(None);
104        }
105        self.poll_inner().await
106    }
107
108    fn checkpoint(&self) -> SourceCheckpoint {
109        self.connector.checkpoint()
110    }
111
112    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
113        self.connector.restore(checkpoint).await
114    }
115
116    async fn close(&mut self) -> Result<(), ConnectorError> {
117        self.phase = Phase::Closed;
118        self.connector.close().await
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use arrow_array::Int32Array;
126    use arrow_schema::{DataType, Field, Schema, SchemaRef};
127    use std::collections::VecDeque;
128    use std::sync::Arc;
129
130    fn test_schema() -> SchemaRef {
131        Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]))
132    }
133
134    fn test_batch(values: &[i32]) -> RecordBatch {
135        RecordBatch::try_new(
136            test_schema(),
137            vec![Arc::new(Int32Array::from(values.to_vec()))],
138        )
139        .unwrap()
140    }
141
142    /// Minimal mock connector for testing the adapter.
143    struct MockCdcConnector {
144        schema: SchemaRef,
145        batches: VecDeque<RecordBatch>,
146        opened: bool,
147        closed: bool,
148    }
149
150    impl MockCdcConnector {
151        fn new(batches: Vec<RecordBatch>) -> Self {
152            Self {
153                schema: test_schema(),
154                batches: VecDeque::from(batches),
155                opened: false,
156                closed: false,
157            }
158        }
159    }
160
161    #[async_trait::async_trait]
162    impl SourceConnector for MockCdcConnector {
163        async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
164            self.opened = true;
165            Ok(())
166        }
167
168        async fn poll_batch(
169            &mut self,
170            _max_records: usize,
171        ) -> Result<Option<SourceBatch>, ConnectorError> {
172            Ok(self.batches.pop_front().map(SourceBatch::new))
173        }
174
175        fn schema(&self) -> SchemaRef {
176            Arc::clone(&self.schema)
177        }
178
179        fn checkpoint(&self) -> SourceCheckpoint {
180            SourceCheckpoint::new(0)
181        }
182
183        async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
184            Ok(())
185        }
186
187        async fn close(&mut self) -> Result<(), ConnectorError> {
188            self.closed = true;
189            Ok(())
190        }
191    }
192
193    #[tokio::test]
194    async fn snapshot_then_changes() {
195        let connector = MockCdcConnector::new(vec![
196            test_batch(&[1, 2]),
197            test_batch(&[3]),
198            // None gap = snapshot complete
199        ]);
200        let mut src =
201            CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
202
203        // Snapshot phase
204        let b1 = src.poll_snapshot().await.unwrap().unwrap();
205        assert_eq!(b1.num_rows(), 2);
206        assert!(!src.is_snapshot_complete());
207
208        let b2 = src.poll_snapshot().await.unwrap().unwrap();
209        assert_eq!(b2.num_rows(), 1);
210
211        // No more data → snapshot complete
212        assert!(src.poll_snapshot().await.unwrap().is_none());
213        assert!(src.is_snapshot_complete());
214
215        // Changes phase returns None (no more batches in mock)
216        assert!(src.poll_changes().await.unwrap().is_none());
217    }
218
219    #[tokio::test]
220    async fn empty_source_completes_snapshot_immediately() {
221        let connector = MockCdcConnector::new(vec![]);
222        let mut src =
223            CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
224
225        assert!(src.poll_snapshot().await.unwrap().is_none());
226        assert!(src.is_snapshot_complete());
227    }
228
229    #[tokio::test]
230    async fn poll_changes_before_snapshot_returns_none() {
231        let connector = MockCdcConnector::new(vec![test_batch(&[1])]);
232        let mut src =
233            CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
234
235        // Calling poll_changes before snapshot is complete
236        assert!(src.poll_changes().await.unwrap().is_none());
237    }
238
239    #[tokio::test]
240    async fn checkpoint_delegates_to_connector() {
241        let connector = MockCdcConnector::new(vec![]);
242        let src = CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
243        let cp = src.checkpoint();
244        assert_eq!(cp.epoch(), 0);
245    }
246
247    #[tokio::test]
248    async fn close_transitions_to_closed() {
249        let connector = MockCdcConnector::new(vec![]);
250        let mut src =
251            CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
252        src.close().await.unwrap();
253        assert!(src.is_snapshot_complete());
254        assert!(src.poll_snapshot().await.unwrap().is_none());
255    }
256}