Skip to main content

laminar_connectors/
testing.rs

1//! Testing utilities for connector implementations.
2//!
3//! Provides mock connectors and helper functions for testing
4//! the connector SDK and concrete connector implementations.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10use arrow_array::{Int64Array, RecordBatch, StringArray};
11use arrow_schema::{DataType, Field, Schema, SchemaRef};
12use async_trait::async_trait;
13use parking_lot::Mutex;
14
15use crate::checkpoint::SourceCheckpoint;
16use crate::config::{ConnectorConfig, ConnectorInfo};
17use crate::connector::{
18    SinkConnector, SinkConnectorCapabilities, SourceBatch, SourceConnector, WriteResult,
19};
20use crate::error::ConnectorError;
21use crate::registry::ConnectorRegistry;
22
23/// Creates a test schema with `id` (Int64) and `value` (Utf8) columns.
24#[must_use]
25pub fn mock_schema() -> SchemaRef {
26    Arc::new(Schema::new(vec![
27        Field::new("id", DataType::Int64, false),
28        Field::new("value", DataType::Utf8, false),
29    ]))
30}
31
32/// Creates a test `RecordBatch` with `n` rows.
33///
34/// # Panics
35///
36/// Panics if the batch cannot be created (should not happen with valid inputs).
37#[must_use]
38pub fn mock_batch(n: usize) -> RecordBatch {
39    #[allow(clippy::cast_possible_wrap)]
40    let ids: Vec<i64> = (0..n as i64).collect();
41    let values: Vec<String> = (0..n).map(|i| format!("value_{i}")).collect();
42    let value_refs: Vec<&str> = values.iter().map(String::as_str).collect();
43
44    RecordBatch::try_new(
45        mock_schema(),
46        vec![
47            Arc::new(Int64Array::from(ids)),
48            Arc::new(StringArray::from(value_refs)),
49        ],
50    )
51    .unwrap()
52}
53
54/// Mock source connector for testing.
55///
56/// Returns a configurable number of batches, then returns `None`.
57#[derive(Debug)]
58pub struct MockSourceConnector {
59    schema: SchemaRef,
60    batches_remaining: AtomicU64,
61    batch_size: usize,
62    records_produced: AtomicU64,
63    is_open: std::sync::atomic::AtomicBool,
64    committed_epochs: Arc<Mutex<Vec<u64>>>,
65}
66
67impl MockSourceConnector {
68    /// Creates a new mock source that produces 10 batches of 5 records.
69    #[must_use]
70    pub fn new() -> Self {
71        Self {
72            schema: mock_schema(),
73            batches_remaining: AtomicU64::new(10),
74            batch_size: 5,
75            records_produced: AtomicU64::new(0),
76            is_open: std::sync::atomic::AtomicBool::new(false),
77            committed_epochs: Arc::new(Mutex::new(Vec::new())),
78        }
79    }
80
81    /// Creates a mock source with custom batch count and size.
82    #[must_use]
83    pub fn with_batches(count: u64, batch_size: usize) -> Self {
84        Self {
85            schema: mock_schema(),
86            batches_remaining: AtomicU64::new(count),
87            batch_size,
88            records_produced: AtomicU64::new(0),
89            is_open: std::sync::atomic::AtomicBool::new(false),
90            committed_epochs: Arc::new(Mutex::new(Vec::new())),
91        }
92    }
93
94    /// Returns the total records produced.
95    #[must_use]
96    pub fn records_produced(&self) -> u64 {
97        self.records_produced.load(Ordering::Relaxed)
98    }
99
100    /// Returns a handle for inspecting epochs reported to
101    /// `notify_epoch_committed` from outside the connector (the
102    /// connector itself is moved into a background task when run via
103    /// the pipeline).
104    #[must_use]
105    pub fn committed_epochs_handle(&self) -> Arc<Mutex<Vec<u64>>> {
106        Arc::clone(&self.committed_epochs)
107    }
108}
109
110impl Default for MockSourceConnector {
111    fn default() -> Self {
112        Self::new()
113    }
114}
115
116#[async_trait]
117impl SourceConnector for MockSourceConnector {
118    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
119        self.is_open
120            .store(true, std::sync::atomic::Ordering::Relaxed);
121        Ok(())
122    }
123
124    async fn poll_batch(
125        &mut self,
126        _max_records: usize,
127    ) -> Result<Option<SourceBatch>, ConnectorError> {
128        let remaining = self.batches_remaining.load(Ordering::Relaxed);
129        if remaining == 0 {
130            return Ok(None);
131        }
132        self.batches_remaining.fetch_sub(1, Ordering::Relaxed);
133
134        let batch = mock_batch(self.batch_size);
135        self.records_produced
136            .fetch_add(self.batch_size as u64, Ordering::Relaxed);
137        Ok(Some(SourceBatch::new(batch)))
138    }
139
140    fn schema(&self) -> SchemaRef {
141        self.schema.clone()
142    }
143
144    fn checkpoint(&self) -> SourceCheckpoint {
145        let mut cp = SourceCheckpoint::new(0);
146        cp.set_offset(
147            "records",
148            self.records_produced.load(Ordering::Relaxed).to_string(),
149        );
150        cp
151    }
152
153    async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
154        Ok(())
155    }
156
157    async fn close(&mut self) -> Result<(), ConnectorError> {
158        self.is_open
159            .store(false, std::sync::atomic::Ordering::Relaxed);
160        Ok(())
161    }
162
163    async fn notify_epoch_committed(
164        &mut self,
165        epoch: u64,
166        _checkpoint: &SourceCheckpoint,
167    ) -> Result<(), ConnectorError> {
168        self.committed_epochs.lock().push(epoch);
169        Ok(())
170    }
171}
172
173/// Mock sink connector for testing.
174///
175/// Stores all written batches in memory for inspection.
176#[derive(Debug)]
177pub struct MockSinkConnector {
178    schema: SchemaRef,
179    written: Arc<Mutex<Vec<RecordBatch>>>,
180    records_written: AtomicU64,
181    is_open: std::sync::atomic::AtomicBool,
182    current_epoch: AtomicU64,
183}
184
185impl MockSinkConnector {
186    /// Creates a new mock sink.
187    #[must_use]
188    pub fn new() -> Self {
189        Self {
190            schema: mock_schema(),
191            written: Arc::new(Mutex::new(Vec::new())),
192            records_written: AtomicU64::new(0),
193            is_open: std::sync::atomic::AtomicBool::new(false),
194            current_epoch: AtomicU64::new(0),
195        }
196    }
197
198    /// Returns the number of batches written.
199    #[must_use]
200    pub fn batch_count(&self) -> usize {
201        self.written.lock().len()
202    }
203
204    /// Returns the total number of records written.
205    #[must_use]
206    pub fn records_written(&self) -> u64 {
207        self.records_written.load(Ordering::Relaxed)
208    }
209
210    /// Returns a clone of all written batches.
211    #[must_use]
212    pub fn written_batches(&self) -> Vec<RecordBatch> {
213        self.written.lock().clone()
214    }
215}
216
217impl Default for MockSinkConnector {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223#[async_trait]
224impl SinkConnector for MockSinkConnector {
225    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
226        self.is_open
227            .store(true, std::sync::atomic::Ordering::Relaxed);
228        Ok(())
229    }
230
231    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
232        let num_rows = batch.num_rows();
233        let bytes = batch.get_array_memory_size() as u64;
234        self.written.lock().push(batch.clone());
235        self.records_written
236            .fetch_add(num_rows as u64, Ordering::Relaxed);
237        Ok(WriteResult::new(num_rows, bytes))
238    }
239
240    fn schema(&self) -> SchemaRef {
241        self.schema.clone()
242    }
243
244    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
245        self.current_epoch.store(epoch, Ordering::Relaxed);
246        Ok(())
247    }
248
249    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
250        Ok(())
251    }
252
253    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
254        // In a real implementation, this would discard buffered data
255        Ok(())
256    }
257
258    fn capabilities(&self) -> SinkConnectorCapabilities {
259        SinkConnectorCapabilities::new(Duration::from_secs(60))
260            .with_exactly_once()
261            .with_idempotent()
262            .with_two_phase_commit()
263    }
264
265    async fn close(&mut self) -> Result<(), ConnectorError> {
266        self.is_open
267            .store(false, std::sync::atomic::Ordering::Relaxed);
268        Ok(())
269    }
270}
271
272/// Registers a mock source connector with the registry.
273pub fn register_mock_source(registry: &ConnectorRegistry) {
274    registry.register_source(
275        "mock",
276        ConnectorInfo {
277            name: "mock".to_string(),
278            display_name: "Mock Source".to_string(),
279            version: "0.1.0".to_string(),
280            is_source: true,
281            is_sink: false,
282            config_keys: vec![],
283        },
284        Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
285    );
286}
287
288/// Registers a mock sink connector with the registry.
289pub fn register_mock_sink(registry: &ConnectorRegistry) {
290    registry.register_sink(
291        "mock",
292        ConnectorInfo {
293            name: "mock".to_string(),
294            display_name: "Mock Sink".to_string(),
295            version: "0.1.0".to_string(),
296            is_source: false,
297            is_sink: true,
298            config_keys: vec![],
299        },
300        Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
301    );
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    #[test]
309    fn test_mock_batch() {
310        let batch = mock_batch(10);
311        assert_eq!(batch.num_rows(), 10);
312        assert_eq!(batch.num_columns(), 2);
313    }
314
315    #[tokio::test]
316    async fn test_mock_source_connector() {
317        let mut source = MockSourceConnector::with_batches(3, 5);
318        source.open(&ConnectorConfig::new("mock")).await.unwrap();
319
320        let b1 = source.poll_batch(100).await.unwrap();
321        assert!(b1.is_some());
322        assert_eq!(b1.unwrap().num_rows(), 5);
323
324        let b2 = source.poll_batch(100).await.unwrap();
325        assert!(b2.is_some());
326
327        let b3 = source.poll_batch(100).await.unwrap();
328        assert!(b3.is_some());
329
330        let b4 = source.poll_batch(100).await.unwrap();
331        assert!(b4.is_none());
332
333        assert_eq!(source.records_produced(), 15);
334
335        let cp = source.checkpoint();
336        assert_eq!(cp.get_offset("records"), Some("15"));
337
338        source.close().await.unwrap();
339    }
340
341    #[tokio::test]
342    async fn test_mock_sink_connector() {
343        let mut sink = MockSinkConnector::new();
344        sink.open(&ConnectorConfig::new("mock")).await.unwrap();
345
346        let batch = mock_batch(10);
347        let result = sink.write_batch(&batch).await.unwrap();
348        assert_eq!(result.records_written, 10);
349
350        assert_eq!(sink.batch_count(), 1);
351        assert_eq!(sink.records_written(), 10);
352
353        // Test epoch management
354        sink.begin_epoch(1).await.unwrap();
355        sink.write_batch(&mock_batch(5)).await.unwrap();
356        sink.commit_epoch(1).await.unwrap();
357
358        assert_eq!(sink.records_written(), 15);
359        assert_eq!(sink.batch_count(), 2);
360
361        let caps = sink.capabilities();
362        assert!(caps.exactly_once);
363        assert!(caps.idempotent);
364
365        sink.close().await.unwrap();
366    }
367
368    #[tokio::test]
369    async fn test_mock_source_restore() {
370        let mut source = MockSourceConnector::new();
371        source.open(&ConnectorConfig::new("mock")).await.unwrap();
372
373        let cp = SourceCheckpoint::new(5);
374        assert!(source.restore(&cp).await.is_ok());
375    }
376
377    #[test]
378    fn test_register_helpers() {
379        let registry = ConnectorRegistry::new();
380        register_mock_source(&registry);
381        register_mock_sink(&registry);
382
383        assert!(registry.source_info("mock").is_some());
384        assert!(registry.sink_info("mock").is_some());
385    }
386}