laminar_connectors/
reference.rs1#[cfg(any(test, feature = "testing"))]
14use std::collections::VecDeque;
15
16use arrow_array::RecordBatch;
17
18use crate::checkpoint::SourceCheckpoint;
19use crate::error::ConnectorError;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum RefreshMode {
24 SnapshotOnly,
26 SnapshotPlusCdc,
28 Manual,
30}
31
32#[async_trait::async_trait]
44pub trait ReferenceTableSource: Send {
45 async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError>;
54
55 fn is_snapshot_complete(&self) -> bool;
57
58 async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError>;
67
68 fn checkpoint(&self) -> SourceCheckpoint;
70
71 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
77
78 async fn close(&mut self) -> Result<(), ConnectorError>;
84}
85
86#[cfg(any(test, feature = "testing"))]
93pub struct MockReferenceTableSource {
94 pub snapshot_batches: VecDeque<RecordBatch>,
96 pub change_batches: VecDeque<RecordBatch>,
98 pub snapshot_complete: bool,
100 pub restored: bool,
102 pub closed: bool,
104 pub mock_checkpoint: SourceCheckpoint,
106}
107
108#[cfg(any(test, feature = "testing"))]
109impl MockReferenceTableSource {
110 #[must_use]
112 pub fn new(snapshot_batches: Vec<RecordBatch>, change_batches: Vec<RecordBatch>) -> Self {
113 Self {
114 snapshot_batches: VecDeque::from(snapshot_batches),
115 change_batches: VecDeque::from(change_batches),
116 snapshot_complete: false,
117 restored: false,
118 closed: false,
119 mock_checkpoint: SourceCheckpoint::new(0),
120 }
121 }
122
123 #[must_use]
125 pub fn empty() -> Self {
126 Self::new(vec![], vec![])
127 }
128}
129
130#[cfg(any(test, feature = "testing"))]
131#[async_trait::async_trait]
132impl ReferenceTableSource for MockReferenceTableSource {
133 async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
134 if let Some(batch) = self.snapshot_batches.pop_front() {
135 Ok(Some(batch))
136 } else {
137 self.snapshot_complete = true;
138 Ok(None)
139 }
140 }
141
142 fn is_snapshot_complete(&self) -> bool {
143 self.snapshot_complete
144 }
145
146 async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
147 Ok(self.change_batches.pop_front())
148 }
149
150 fn checkpoint(&self) -> SourceCheckpoint {
151 self.mock_checkpoint.clone()
152 }
153
154 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
155 self.restored = true;
156 Ok(())
157 }
158
159 async fn close(&mut self) -> Result<(), ConnectorError> {
160 self.closed = true;
161 Ok(())
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use arrow_array::Int32Array;
169 use arrow_schema::{DataType, Field, Schema};
170 use std::sync::Arc;
171
172 fn test_batch(values: &[i32]) -> RecordBatch {
173 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
174 RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values.to_vec()))]).unwrap()
175 }
176
177 #[tokio::test]
178 async fn test_mock_snapshot_exhaustion() {
179 let mut src =
180 MockReferenceTableSource::new(vec![test_batch(&[1, 2]), test_batch(&[3])], vec![]);
181
182 assert!(!src.is_snapshot_complete());
183
184 let b1 = src.poll_snapshot().await.unwrap().unwrap();
185 assert_eq!(b1.num_rows(), 2);
186 assert!(!src.is_snapshot_complete());
187
188 let b2 = src.poll_snapshot().await.unwrap().unwrap();
189 assert_eq!(b2.num_rows(), 1);
190 assert!(!src.is_snapshot_complete());
191
192 let none = src.poll_snapshot().await.unwrap();
193 assert!(none.is_none());
194 assert!(src.is_snapshot_complete());
195
196 assert!(src.poll_snapshot().await.unwrap().is_none());
198 }
199
200 #[tokio::test]
201 async fn test_mock_change_polling() {
202 let mut src =
203 MockReferenceTableSource::new(vec![], vec![test_batch(&[10]), test_batch(&[20, 30])]);
204
205 assert!(src.poll_snapshot().await.unwrap().is_none());
207
208 let c1 = src.poll_changes().await.unwrap().unwrap();
209 assert_eq!(c1.num_rows(), 1);
210
211 let c2 = src.poll_changes().await.unwrap().unwrap();
212 assert_eq!(c2.num_rows(), 2);
213
214 assert!(src.poll_changes().await.unwrap().is_none());
215 }
216
217 #[tokio::test]
218 async fn test_mock_checkpoint_round_trip() {
219 let mut cp = SourceCheckpoint::new(5);
220 cp.set_offset("lsn", "0/ABCD");
221
222 let mut src = MockReferenceTableSource::empty();
223 src.mock_checkpoint = cp.clone();
224
225 let returned = src.checkpoint();
226 assert_eq!(returned.epoch(), 5);
227 assert_eq!(returned.get_offset("lsn"), Some("0/ABCD"));
228 }
229
230 #[tokio::test]
231 async fn test_mock_restore_sets_flag() {
232 let mut src = MockReferenceTableSource::empty();
233 assert!(!src.restored);
234
235 let cp = SourceCheckpoint::new(1);
236 src.restore(&cp).await.unwrap();
237 assert!(src.restored);
238 }
239
240 #[tokio::test]
241 async fn test_mock_close_idempotent() {
242 let mut src = MockReferenceTableSource::empty();
243 assert!(!src.closed);
244
245 src.close().await.unwrap();
246 assert!(src.closed);
247
248 src.close().await.unwrap();
250 assert!(src.closed);
251 }
252
253 #[tokio::test]
254 async fn test_trait_compliance_with_mock() {
255 let mut src: Box<dyn ReferenceTableSource> = Box::new(MockReferenceTableSource::new(
257 vec![test_batch(&[1])],
258 vec![test_batch(&[2])],
259 ));
260
261 let batch = src.poll_snapshot().await.unwrap().unwrap();
263 assert_eq!(batch.num_rows(), 1);
264 assert!(src.poll_snapshot().await.unwrap().is_none());
265 assert!(src.is_snapshot_complete());
266
267 let change = src.poll_changes().await.unwrap().unwrap();
269 assert_eq!(change.num_rows(), 1);
270 assert!(src.poll_changes().await.unwrap().is_none());
271
272 let _cp = src.checkpoint();
274
275 let cp = SourceCheckpoint::new(0);
277 src.restore(&cp).await.unwrap();
278
279 src.close().await.unwrap();
281 }
282}