Skip to main content

laminar_connectors/
reference.rs

1//! Reference table source trait and refresh modes.
2//!
3//! A [`ReferenceTableSource`](crate::reference::ReferenceTableSource) populates a reference/dimension table from an
4//! external connector. The source produces an initial snapshot (one or more
5//! `RecordBatch`es) followed by an optional stream of incremental changes.
6//!
7//! [`RefreshMode`](crate::reference::RefreshMode) controls how and when the table is refreshed:
8//!
9//! - `SnapshotOnly` — load once at startup, never update.
10//! - `SnapshotPlusCdc` — load at startup, then apply CDC changes.
11//! - `Manual` — no automatic loading; the user triggers refreshes.
12
13#[cfg(any(test, feature = "testing"))]
14use std::collections::VecDeque;
15
16use arrow_array::RecordBatch;
17
18use crate::checkpoint::SourceCheckpoint;
19use crate::error::ConnectorError;
20
21/// How a reference table is refreshed after initial population.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum RefreshMode {
24    /// Load the table once at startup and never update.
25    SnapshotOnly,
26    /// Load at startup, then apply incremental CDC changes.
27    SnapshotPlusCdc,
28    /// No automatic loading; the user triggers refreshes explicitly.
29    Manual,
30}
31
32/// A source that populates a reference/dimension table.
33///
34/// The lifecycle is:
35/// 1. Call [`poll_snapshot`](Self::poll_snapshot) repeatedly until it returns
36///    `Ok(None)` (snapshot complete).
37/// 2. Optionally call [`poll_changes`](Self::poll_changes) in a loop to receive
38///    incremental updates (CDC mode).
39/// 3. Call [`close`](Self::close) when the table is no longer needed.
40///
41/// Checkpoint/restore support allows resuming from a saved position across
42/// restarts.
43#[async_trait::async_trait]
44pub trait ReferenceTableSource: Send {
45    /// Polls for the next batch of snapshot data.
46    ///
47    /// Returns `Ok(Some(batch))` while snapshot data is available.
48    /// Returns `Ok(None)` when the snapshot is complete.
49    ///
50    /// # Errors
51    ///
52    /// Returns `ConnectorError` on read failure.
53    async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError>;
54
55    /// Returns `true` once all snapshot batches have been delivered.
56    fn is_snapshot_complete(&self) -> bool;
57
58    /// Polls for the next batch of incremental changes (CDC).
59    ///
60    /// Returns `Ok(Some(batch))` when change data is available,
61    /// `Ok(None)` when no changes are pending.
62    ///
63    /// # Errors
64    ///
65    /// Returns `ConnectorError` on read failure.
66    async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError>;
67
68    /// Creates a checkpoint of the current source position.
69    fn checkpoint(&self) -> SourceCheckpoint;
70
71    /// Restores the source position from a checkpoint.
72    ///
73    /// # Errors
74    ///
75    /// Returns `ConnectorError` if the checkpoint is invalid or restore fails.
76    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
77
78    /// Closes the source and releases resources.
79    ///
80    /// # Errors
81    ///
82    /// Returns `ConnectorError` if shutdown fails.
83    async fn close(&mut self) -> Result<(), ConnectorError>;
84}
85
86// ── Mock Implementation ──
87
88/// A mock [`ReferenceTableSource`] for testing.
89///
90/// Configurable queues of snapshot and change batches. Tracks lifecycle flags
91/// (`snapshot_complete`, `restored`, `closed`) for test assertions.
92#[cfg(any(test, feature = "testing"))]
93pub struct MockReferenceTableSource {
94    /// Snapshot batches to deliver (drained in order).
95    pub snapshot_batches: VecDeque<RecordBatch>,
96    /// Change batches to deliver after snapshot (drained in order).
97    pub change_batches: VecDeque<RecordBatch>,
98    /// Set to `true` once all snapshot batches have been delivered.
99    pub snapshot_complete: bool,
100    /// Set to `true` after [`restore`](ReferenceTableSource::restore) is called.
101    pub restored: bool,
102    /// Set to `true` after [`close`](ReferenceTableSource::close) is called.
103    pub closed: bool,
104    /// The checkpoint returned by [`checkpoint`](ReferenceTableSource::checkpoint).
105    pub mock_checkpoint: SourceCheckpoint,
106}
107
108#[cfg(any(test, feature = "testing"))]
109impl MockReferenceTableSource {
110    /// Creates a new mock with the given snapshot and change batches.
111    #[must_use]
112    pub fn new(snapshot_batches: Vec<RecordBatch>, change_batches: Vec<RecordBatch>) -> Self {
113        Self {
114            snapshot_batches: VecDeque::from(snapshot_batches),
115            change_batches: VecDeque::from(change_batches),
116            snapshot_complete: false,
117            restored: false,
118            closed: false,
119            mock_checkpoint: SourceCheckpoint::new(0),
120        }
121    }
122
123    /// Creates a new mock with no data.
124    #[must_use]
125    pub fn empty() -> Self {
126        Self::new(vec![], vec![])
127    }
128}
129
130#[cfg(any(test, feature = "testing"))]
131#[async_trait::async_trait]
132impl ReferenceTableSource for MockReferenceTableSource {
133    async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
134        if let Some(batch) = self.snapshot_batches.pop_front() {
135            Ok(Some(batch))
136        } else {
137            self.snapshot_complete = true;
138            Ok(None)
139        }
140    }
141
142    fn is_snapshot_complete(&self) -> bool {
143        self.snapshot_complete
144    }
145
146    async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
147        Ok(self.change_batches.pop_front())
148    }
149
150    fn checkpoint(&self) -> SourceCheckpoint {
151        self.mock_checkpoint.clone()
152    }
153
154    async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
155        self.restored = true;
156        Ok(())
157    }
158
159    async fn close(&mut self) -> Result<(), ConnectorError> {
160        self.closed = true;
161        Ok(())
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use arrow_array::Int32Array;
169    use arrow_schema::{DataType, Field, Schema};
170    use std::sync::Arc;
171
172    fn test_batch(values: &[i32]) -> RecordBatch {
173        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
174        RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values.to_vec()))]).unwrap()
175    }
176
177    #[tokio::test]
178    async fn test_mock_snapshot_exhaustion() {
179        let mut src =
180            MockReferenceTableSource::new(vec![test_batch(&[1, 2]), test_batch(&[3])], vec![]);
181
182        assert!(!src.is_snapshot_complete());
183
184        let b1 = src.poll_snapshot().await.unwrap().unwrap();
185        assert_eq!(b1.num_rows(), 2);
186        assert!(!src.is_snapshot_complete());
187
188        let b2 = src.poll_snapshot().await.unwrap().unwrap();
189        assert_eq!(b2.num_rows(), 1);
190        assert!(!src.is_snapshot_complete());
191
192        let none = src.poll_snapshot().await.unwrap();
193        assert!(none.is_none());
194        assert!(src.is_snapshot_complete());
195
196        // Subsequent calls also return None
197        assert!(src.poll_snapshot().await.unwrap().is_none());
198    }
199
200    #[tokio::test]
201    async fn test_mock_change_polling() {
202        let mut src =
203            MockReferenceTableSource::new(vec![], vec![test_batch(&[10]), test_batch(&[20, 30])]);
204
205        // Exhaust snapshot first
206        assert!(src.poll_snapshot().await.unwrap().is_none());
207
208        let c1 = src.poll_changes().await.unwrap().unwrap();
209        assert_eq!(c1.num_rows(), 1);
210
211        let c2 = src.poll_changes().await.unwrap().unwrap();
212        assert_eq!(c2.num_rows(), 2);
213
214        assert!(src.poll_changes().await.unwrap().is_none());
215    }
216
217    #[tokio::test]
218    async fn test_mock_checkpoint_round_trip() {
219        let mut cp = SourceCheckpoint::new(5);
220        cp.set_offset("lsn", "0/ABCD");
221
222        let mut src = MockReferenceTableSource::empty();
223        src.mock_checkpoint = cp.clone();
224
225        let returned = src.checkpoint();
226        assert_eq!(returned.epoch(), 5);
227        assert_eq!(returned.get_offset("lsn"), Some("0/ABCD"));
228    }
229
230    #[tokio::test]
231    async fn test_mock_restore_sets_flag() {
232        let mut src = MockReferenceTableSource::empty();
233        assert!(!src.restored);
234
235        let cp = SourceCheckpoint::new(1);
236        src.restore(&cp).await.unwrap();
237        assert!(src.restored);
238    }
239
240    #[tokio::test]
241    async fn test_mock_close_idempotent() {
242        let mut src = MockReferenceTableSource::empty();
243        assert!(!src.closed);
244
245        src.close().await.unwrap();
246        assert!(src.closed);
247
248        // Calling close again should succeed
249        src.close().await.unwrap();
250        assert!(src.closed);
251    }
252
253    #[tokio::test]
254    async fn test_trait_compliance_with_mock() {
255        // Exercise the full lifecycle through trait object
256        let mut src: Box<dyn ReferenceTableSource> = Box::new(MockReferenceTableSource::new(
257            vec![test_batch(&[1])],
258            vec![test_batch(&[2])],
259        ));
260
261        // Snapshot
262        let batch = src.poll_snapshot().await.unwrap().unwrap();
263        assert_eq!(batch.num_rows(), 1);
264        assert!(src.poll_snapshot().await.unwrap().is_none());
265        assert!(src.is_snapshot_complete());
266
267        // Changes
268        let change = src.poll_changes().await.unwrap().unwrap();
269        assert_eq!(change.num_rows(), 1);
270        assert!(src.poll_changes().await.unwrap().is_none());
271
272        // Checkpoint round-trip
273        let _cp = src.checkpoint();
274
275        // Restore
276        let cp = SourceCheckpoint::new(0);
277        src.restore(&cp).await.unwrap();
278
279        // Close
280        src.close().await.unwrap();
281    }
282}