laminar_connectors/lookup/
cdc_adapter.rs1use arrow_array::RecordBatch;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::reference::ReferenceTableSource;
19
20enum Phase {
22 Init,
24 Snapshot,
26 Changes,
28 Closed,
30}
31
32pub struct CdcTableSource {
39 connector: Box<dyn SourceConnector>,
40 config: ConnectorConfig,
41 phase: Phase,
42 max_batch_size: usize,
43}
44
45impl CdcTableSource {
46 #[must_use]
50 pub fn new(
51 connector: Box<dyn SourceConnector>,
52 config: ConnectorConfig,
53 max_batch_size: usize,
54 ) -> Self {
55 Self {
56 connector,
57 config,
58 phase: Phase::Init,
59 max_batch_size,
60 }
61 }
62
63 async fn ensure_open(&mut self) -> Result<(), ConnectorError> {
65 if matches!(self.phase, Phase::Init) {
66 self.connector.open(&self.config).await?;
67 self.phase = Phase::Snapshot;
68 }
69 Ok(())
70 }
71
72 async fn poll_inner(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
74 match self.connector.poll_batch(self.max_batch_size).await? {
75 Some(SourceBatch { records, .. }) if records.num_rows() > 0 => Ok(Some(records)),
76 _ => Ok(None),
77 }
78 }
79}
80
81#[async_trait::async_trait]
82impl ReferenceTableSource for CdcTableSource {
83 async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
84 self.ensure_open().await?;
85
86 if !matches!(self.phase, Phase::Snapshot) {
87 return Ok(None);
88 }
89
90 if let Some(batch) = self.poll_inner().await? {
91 return Ok(Some(batch));
92 }
93 self.phase = Phase::Changes;
94 Ok(None)
95 }
96
97 fn is_snapshot_complete(&self) -> bool {
98 matches!(self.phase, Phase::Changes | Phase::Closed)
99 }
100
101 async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
102 if !matches!(self.phase, Phase::Changes) {
103 return Ok(None);
104 }
105 self.poll_inner().await
106 }
107
108 fn checkpoint(&self) -> SourceCheckpoint {
109 self.connector.checkpoint()
110 }
111
112 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
113 self.connector.restore(checkpoint).await
114 }
115
116 async fn close(&mut self) -> Result<(), ConnectorError> {
117 self.phase = Phase::Closed;
118 self.connector.close().await
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use arrow_array::Int32Array;
126 use arrow_schema::{DataType, Field, Schema, SchemaRef};
127 use std::collections::VecDeque;
128 use std::sync::Arc;
129
130 fn test_schema() -> SchemaRef {
131 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]))
132 }
133
134 fn test_batch(values: &[i32]) -> RecordBatch {
135 RecordBatch::try_new(
136 test_schema(),
137 vec![Arc::new(Int32Array::from(values.to_vec()))],
138 )
139 .unwrap()
140 }
141
142 struct MockCdcConnector {
144 schema: SchemaRef,
145 batches: VecDeque<RecordBatch>,
146 opened: bool,
147 closed: bool,
148 }
149
150 impl MockCdcConnector {
151 fn new(batches: Vec<RecordBatch>) -> Self {
152 Self {
153 schema: test_schema(),
154 batches: VecDeque::from(batches),
155 opened: false,
156 closed: false,
157 }
158 }
159 }
160
161 #[async_trait::async_trait]
162 impl SourceConnector for MockCdcConnector {
163 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
164 self.opened = true;
165 Ok(())
166 }
167
168 async fn poll_batch(
169 &mut self,
170 _max_records: usize,
171 ) -> Result<Option<SourceBatch>, ConnectorError> {
172 Ok(self.batches.pop_front().map(SourceBatch::new))
173 }
174
175 fn schema(&self) -> SchemaRef {
176 Arc::clone(&self.schema)
177 }
178
179 fn checkpoint(&self) -> SourceCheckpoint {
180 SourceCheckpoint::new(0)
181 }
182
183 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
184 Ok(())
185 }
186
187 async fn close(&mut self) -> Result<(), ConnectorError> {
188 self.closed = true;
189 Ok(())
190 }
191 }
192
193 #[tokio::test]
194 async fn snapshot_then_changes() {
195 let connector = MockCdcConnector::new(vec![
196 test_batch(&[1, 2]),
197 test_batch(&[3]),
198 ]);
200 let mut src =
201 CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
202
203 let b1 = src.poll_snapshot().await.unwrap().unwrap();
205 assert_eq!(b1.num_rows(), 2);
206 assert!(!src.is_snapshot_complete());
207
208 let b2 = src.poll_snapshot().await.unwrap().unwrap();
209 assert_eq!(b2.num_rows(), 1);
210
211 assert!(src.poll_snapshot().await.unwrap().is_none());
213 assert!(src.is_snapshot_complete());
214
215 assert!(src.poll_changes().await.unwrap().is_none());
217 }
218
219 #[tokio::test]
220 async fn empty_source_completes_snapshot_immediately() {
221 let connector = MockCdcConnector::new(vec![]);
222 let mut src =
223 CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
224
225 assert!(src.poll_snapshot().await.unwrap().is_none());
226 assert!(src.is_snapshot_complete());
227 }
228
229 #[tokio::test]
230 async fn poll_changes_before_snapshot_returns_none() {
231 let connector = MockCdcConnector::new(vec![test_batch(&[1])]);
232 let mut src =
233 CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
234
235 assert!(src.poll_changes().await.unwrap().is_none());
237 }
238
239 #[tokio::test]
240 async fn checkpoint_delegates_to_connector() {
241 let connector = MockCdcConnector::new(vec![]);
242 let src = CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
243 let cp = src.checkpoint();
244 assert_eq!(cp.epoch(), 0);
245 }
246
247 #[tokio::test]
248 async fn close_transitions_to_closed() {
249 let connector = MockCdcConnector::new(vec![]);
250 let mut src =
251 CdcTableSource::new(Box::new(connector), ConnectorConfig::new("mock-cdc"), 1024);
252 src.close().await.unwrap();
253 assert!(src.is_snapshot_complete());
254 assert!(src.poll_snapshot().await.unwrap().is_none());
255 }
256}