Skip to main content

laminar_connectors/
testing.rs

1//! Testing utilities for connector implementations.
2//!
3//! Provides mock connectors and helper functions for testing
4//! the connector SDK and concrete connector implementations.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10use arrow_array::{Int64Array, RecordBatch, StringArray};
11use arrow_schema::{DataType, Field, Schema, SchemaRef};
12use async_trait::async_trait;
13use parking_lot::Mutex;
14
15use crate::checkpoint::SourceCheckpoint;
16use crate::config::{ConnectorConfig, ConnectorInfo};
17use crate::connector::{
18    SinkConnector, SinkConnectorCapabilities, SourceBatch, SourceConnector, WriteResult,
19};
20use crate::error::ConnectorError;
21use crate::health::HealthStatus;
22use crate::metrics::ConnectorMetrics;
23use crate::registry::ConnectorRegistry;
24
25/// Creates a test schema with `id` (Int64) and `value` (Utf8) columns.
26#[must_use]
27pub fn mock_schema() -> SchemaRef {
28    Arc::new(Schema::new(vec![
29        Field::new("id", DataType::Int64, false),
30        Field::new("value", DataType::Utf8, false),
31    ]))
32}
33
34/// Creates a test `RecordBatch` with `n` rows.
35///
36/// # Panics
37///
38/// Panics if the batch cannot be created (should not happen with valid inputs).
39#[must_use]
40pub fn mock_batch(n: usize) -> RecordBatch {
41    #[allow(clippy::cast_possible_wrap)]
42    let ids: Vec<i64> = (0..n as i64).collect();
43    let values: Vec<String> = (0..n).map(|i| format!("value_{i}")).collect();
44    let value_refs: Vec<&str> = values.iter().map(String::as_str).collect();
45
46    RecordBatch::try_new(
47        mock_schema(),
48        vec![
49            Arc::new(Int64Array::from(ids)),
50            Arc::new(StringArray::from(value_refs)),
51        ],
52    )
53    .unwrap()
54}
55
56/// Mock source connector for testing.
57///
58/// Returns a configurable number of batches, then returns `None`.
59#[derive(Debug)]
60pub struct MockSourceConnector {
61    schema: SchemaRef,
62    batches_remaining: AtomicU64,
63    batch_size: usize,
64    records_produced: AtomicU64,
65    is_open: std::sync::atomic::AtomicBool,
66    committed_epochs: Arc<Mutex<Vec<u64>>>,
67}
68
69impl MockSourceConnector {
70    /// Creates a new mock source that produces 10 batches of 5 records.
71    #[must_use]
72    pub fn new() -> Self {
73        Self {
74            schema: mock_schema(),
75            batches_remaining: AtomicU64::new(10),
76            batch_size: 5,
77            records_produced: AtomicU64::new(0),
78            is_open: std::sync::atomic::AtomicBool::new(false),
79            committed_epochs: Arc::new(Mutex::new(Vec::new())),
80        }
81    }
82
83    /// Creates a mock source with custom batch count and size.
84    #[must_use]
85    pub fn with_batches(count: u64, batch_size: usize) -> Self {
86        Self {
87            schema: mock_schema(),
88            batches_remaining: AtomicU64::new(count),
89            batch_size,
90            records_produced: AtomicU64::new(0),
91            is_open: std::sync::atomic::AtomicBool::new(false),
92            committed_epochs: Arc::new(Mutex::new(Vec::new())),
93        }
94    }
95
96    /// Returns the total records produced.
97    #[must_use]
98    pub fn records_produced(&self) -> u64 {
99        self.records_produced.load(Ordering::Relaxed)
100    }
101
102    /// Returns a handle for inspecting epochs reported to
103    /// `notify_epoch_committed` from outside the connector (the
104    /// connector itself is moved into a background task when run via
105    /// the pipeline).
106    #[must_use]
107    pub fn committed_epochs_handle(&self) -> Arc<Mutex<Vec<u64>>> {
108        Arc::clone(&self.committed_epochs)
109    }
110}
111
112impl Default for MockSourceConnector {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118#[async_trait]
119impl SourceConnector for MockSourceConnector {
120    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
121        self.is_open
122            .store(true, std::sync::atomic::Ordering::Relaxed);
123        Ok(())
124    }
125
126    async fn poll_batch(
127        &mut self,
128        _max_records: usize,
129    ) -> Result<Option<SourceBatch>, ConnectorError> {
130        let remaining = self.batches_remaining.load(Ordering::Relaxed);
131        if remaining == 0 {
132            return Ok(None);
133        }
134        self.batches_remaining.fetch_sub(1, Ordering::Relaxed);
135
136        let batch = mock_batch(self.batch_size);
137        self.records_produced
138            .fetch_add(self.batch_size as u64, Ordering::Relaxed);
139        Ok(Some(SourceBatch::new(batch)))
140    }
141
142    fn schema(&self) -> SchemaRef {
143        self.schema.clone()
144    }
145
146    fn checkpoint(&self) -> SourceCheckpoint {
147        let mut cp = SourceCheckpoint::new(0);
148        cp.set_offset(
149            "records",
150            self.records_produced.load(Ordering::Relaxed).to_string(),
151        );
152        cp
153    }
154
155    async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
156        Ok(())
157    }
158
159    fn health_check(&self) -> HealthStatus {
160        if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
161            HealthStatus::Healthy
162        } else {
163            HealthStatus::Unknown
164        }
165    }
166
167    fn metrics(&self) -> ConnectorMetrics {
168        ConnectorMetrics {
169            records_total: self.records_produced.load(Ordering::Relaxed),
170            ..Default::default()
171        }
172    }
173
174    async fn close(&mut self) -> Result<(), ConnectorError> {
175        self.is_open
176            .store(false, std::sync::atomic::Ordering::Relaxed);
177        Ok(())
178    }
179
180    async fn notify_epoch_committed(&mut self, epoch: u64) -> Result<(), ConnectorError> {
181        self.committed_epochs.lock().push(epoch);
182        Ok(())
183    }
184}
185
186/// Mock sink connector for testing.
187///
188/// Stores all written batches in memory for inspection.
189#[derive(Debug)]
190pub struct MockSinkConnector {
191    schema: SchemaRef,
192    written: Arc<Mutex<Vec<RecordBatch>>>,
193    records_written: AtomicU64,
194    is_open: std::sync::atomic::AtomicBool,
195    current_epoch: AtomicU64,
196}
197
198impl MockSinkConnector {
199    /// Creates a new mock sink.
200    #[must_use]
201    pub fn new() -> Self {
202        Self {
203            schema: mock_schema(),
204            written: Arc::new(Mutex::new(Vec::new())),
205            records_written: AtomicU64::new(0),
206            is_open: std::sync::atomic::AtomicBool::new(false),
207            current_epoch: AtomicU64::new(0),
208        }
209    }
210
211    /// Returns the number of batches written.
212    #[must_use]
213    pub fn batch_count(&self) -> usize {
214        self.written.lock().len()
215    }
216
217    /// Returns the total number of records written.
218    #[must_use]
219    pub fn records_written(&self) -> u64 {
220        self.records_written.load(Ordering::Relaxed)
221    }
222
223    /// Returns a clone of all written batches.
224    #[must_use]
225    pub fn written_batches(&self) -> Vec<RecordBatch> {
226        self.written.lock().clone()
227    }
228}
229
230impl Default for MockSinkConnector {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236#[async_trait]
237impl SinkConnector for MockSinkConnector {
238    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
239        self.is_open
240            .store(true, std::sync::atomic::Ordering::Relaxed);
241        Ok(())
242    }
243
244    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
245        let num_rows = batch.num_rows();
246        let bytes = batch.get_array_memory_size() as u64;
247        self.written.lock().push(batch.clone());
248        self.records_written
249            .fetch_add(num_rows as u64, Ordering::Relaxed);
250        Ok(WriteResult::new(num_rows, bytes))
251    }
252
253    fn schema(&self) -> SchemaRef {
254        self.schema.clone()
255    }
256
257    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
258        self.current_epoch.store(epoch, Ordering::Relaxed);
259        Ok(())
260    }
261
262    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
263        Ok(())
264    }
265
266    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
267        // In a real implementation, this would discard buffered data
268        Ok(())
269    }
270
271    fn health_check(&self) -> HealthStatus {
272        if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
273            HealthStatus::Healthy
274        } else {
275            HealthStatus::Unknown
276        }
277    }
278
279    fn metrics(&self) -> ConnectorMetrics {
280        ConnectorMetrics {
281            records_total: self.records_written.load(Ordering::Relaxed),
282            ..Default::default()
283        }
284    }
285
286    fn capabilities(&self) -> SinkConnectorCapabilities {
287        SinkConnectorCapabilities::new(Duration::from_secs(60))
288            .with_exactly_once()
289            .with_idempotent()
290            .with_two_phase_commit()
291    }
292
293    async fn close(&mut self) -> Result<(), ConnectorError> {
294        self.is_open
295            .store(false, std::sync::atomic::Ordering::Relaxed);
296        Ok(())
297    }
298}
299
300/// Registers a mock source connector with the registry.
301pub fn register_mock_source(registry: &ConnectorRegistry) {
302    registry.register_source(
303        "mock",
304        ConnectorInfo {
305            name: "mock".to_string(),
306            display_name: "Mock Source".to_string(),
307            version: "0.1.0".to_string(),
308            is_source: true,
309            is_sink: false,
310            config_keys: vec![],
311        },
312        Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
313    );
314}
315
316/// Registers a mock sink connector with the registry.
317pub fn register_mock_sink(registry: &ConnectorRegistry) {
318    registry.register_sink(
319        "mock",
320        ConnectorInfo {
321            name: "mock".to_string(),
322            display_name: "Mock Sink".to_string(),
323            version: "0.1.0".to_string(),
324            is_source: false,
325            is_sink: true,
326            config_keys: vec![],
327        },
328        Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
329    );
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn test_mock_batch() {
338        let batch = mock_batch(10);
339        assert_eq!(batch.num_rows(), 10);
340        assert_eq!(batch.num_columns(), 2);
341    }
342
343    #[tokio::test]
344    async fn test_mock_source_connector() {
345        let mut source = MockSourceConnector::with_batches(3, 5);
346        source.open(&ConnectorConfig::new("mock")).await.unwrap();
347
348        assert!(source.health_check().is_healthy());
349
350        let b1 = source.poll_batch(100).await.unwrap();
351        assert!(b1.is_some());
352        assert_eq!(b1.unwrap().num_rows(), 5);
353
354        let b2 = source.poll_batch(100).await.unwrap();
355        assert!(b2.is_some());
356
357        let b3 = source.poll_batch(100).await.unwrap();
358        assert!(b3.is_some());
359
360        let b4 = source.poll_batch(100).await.unwrap();
361        assert!(b4.is_none());
362
363        assert_eq!(source.records_produced(), 15);
364
365        let cp = source.checkpoint();
366        assert_eq!(cp.get_offset("records"), Some("15"));
367
368        source.close().await.unwrap();
369    }
370
371    #[tokio::test]
372    async fn test_mock_sink_connector() {
373        let mut sink = MockSinkConnector::new();
374        sink.open(&ConnectorConfig::new("mock")).await.unwrap();
375
376        assert!(sink.health_check().is_healthy());
377
378        let batch = mock_batch(10);
379        let result = sink.write_batch(&batch).await.unwrap();
380        assert_eq!(result.records_written, 10);
381
382        assert_eq!(sink.batch_count(), 1);
383        assert_eq!(sink.records_written(), 10);
384
385        // Test epoch management
386        sink.begin_epoch(1).await.unwrap();
387        sink.write_batch(&mock_batch(5)).await.unwrap();
388        sink.commit_epoch(1).await.unwrap();
389
390        assert_eq!(sink.records_written(), 15);
391        assert_eq!(sink.batch_count(), 2);
392
393        let caps = sink.capabilities();
394        assert!(caps.exactly_once);
395        assert!(caps.idempotent);
396
397        sink.close().await.unwrap();
398    }
399
400    #[tokio::test]
401    async fn test_mock_source_restore() {
402        let mut source = MockSourceConnector::new();
403        source.open(&ConnectorConfig::new("mock")).await.unwrap();
404
405        let cp = SourceCheckpoint::new(5);
406        assert!(source.restore(&cp).await.is_ok());
407    }
408
409    #[test]
410    fn test_register_helpers() {
411        let registry = ConnectorRegistry::new();
412        register_mock_source(&registry);
413        register_mock_sink(&registry);
414
415        assert!(registry.source_info("mock").is_some());
416        assert!(registry.sink_info("mock").is_some());
417    }
418}