Skip to main content

laminar_connectors/
testing.rs

1//! Testing utilities for connector implementations.
2//!
3//! Provides mock connectors and helper functions for testing
4//! the connector SDK and concrete connector implementations.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use arrow_array::{Int64Array, RecordBatch, StringArray};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use parking_lot::Mutex;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::{ConnectorConfig, ConnectorInfo};
16use crate::connector::{
17    SinkConnector, SinkConnectorCapabilities, SourceBatch, SourceConnector, WriteResult,
18};
19use crate::error::ConnectorError;
20use crate::health::HealthStatus;
21use crate::metrics::ConnectorMetrics;
22use crate::registry::ConnectorRegistry;
23
24/// Creates a test schema with `id` (Int64) and `value` (Utf8) columns.
25#[must_use]
26pub fn mock_schema() -> SchemaRef {
27    Arc::new(Schema::new(vec![
28        Field::new("id", DataType::Int64, false),
29        Field::new("value", DataType::Utf8, false),
30    ]))
31}
32
33/// Creates a test `RecordBatch` with `n` rows.
34///
35/// # Panics
36///
37/// Panics if the batch cannot be created (should not happen with valid inputs).
38#[must_use]
39pub fn mock_batch(n: usize) -> RecordBatch {
40    #[allow(clippy::cast_possible_wrap)]
41    let ids: Vec<i64> = (0..n as i64).collect();
42    let values: Vec<String> = (0..n).map(|i| format!("value_{i}")).collect();
43    let value_refs: Vec<&str> = values.iter().map(String::as_str).collect();
44
45    RecordBatch::try_new(
46        mock_schema(),
47        vec![
48            Arc::new(Int64Array::from(ids)),
49            Arc::new(StringArray::from(value_refs)),
50        ],
51    )
52    .unwrap()
53}
54
55/// Mock source connector for testing.
56///
57/// Returns a configurable number of batches, then returns `None`.
58#[derive(Debug)]
59pub struct MockSourceConnector {
60    schema: SchemaRef,
61    batches_remaining: AtomicU64,
62    batch_size: usize,
63    records_produced: AtomicU64,
64    is_open: std::sync::atomic::AtomicBool,
65}
66
67impl MockSourceConnector {
68    /// Creates a new mock source that produces 10 batches of 5 records.
69    #[must_use]
70    pub fn new() -> Self {
71        Self {
72            schema: mock_schema(),
73            batches_remaining: AtomicU64::new(10),
74            batch_size: 5,
75            records_produced: AtomicU64::new(0),
76            is_open: std::sync::atomic::AtomicBool::new(false),
77        }
78    }
79
80    /// Creates a mock source with custom batch count and size.
81    #[must_use]
82    pub fn with_batches(count: u64, batch_size: usize) -> Self {
83        Self {
84            schema: mock_schema(),
85            batches_remaining: AtomicU64::new(count),
86            batch_size,
87            records_produced: AtomicU64::new(0),
88            is_open: std::sync::atomic::AtomicBool::new(false),
89        }
90    }
91
92    /// Returns the total records produced.
93    #[must_use]
94    pub fn records_produced(&self) -> u64 {
95        self.records_produced.load(Ordering::Relaxed)
96    }
97}
98
99impl Default for MockSourceConnector {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105#[async_trait]
106impl SourceConnector for MockSourceConnector {
107    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
108        self.is_open
109            .store(true, std::sync::atomic::Ordering::Relaxed);
110        Ok(())
111    }
112
113    async fn poll_batch(
114        &mut self,
115        _max_records: usize,
116    ) -> Result<Option<SourceBatch>, ConnectorError> {
117        let remaining = self.batches_remaining.load(Ordering::Relaxed);
118        if remaining == 0 {
119            return Ok(None);
120        }
121        self.batches_remaining.fetch_sub(1, Ordering::Relaxed);
122
123        let batch = mock_batch(self.batch_size);
124        self.records_produced
125            .fetch_add(self.batch_size as u64, Ordering::Relaxed);
126        Ok(Some(SourceBatch::new(batch)))
127    }
128
129    fn schema(&self) -> SchemaRef {
130        self.schema.clone()
131    }
132
133    fn checkpoint(&self) -> SourceCheckpoint {
134        let mut cp = SourceCheckpoint::new(0);
135        cp.set_offset(
136            "records",
137            self.records_produced.load(Ordering::Relaxed).to_string(),
138        );
139        cp
140    }
141
142    async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
143        Ok(())
144    }
145
146    fn health_check(&self) -> HealthStatus {
147        if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
148            HealthStatus::Healthy
149        } else {
150            HealthStatus::Unknown
151        }
152    }
153
154    fn metrics(&self) -> ConnectorMetrics {
155        ConnectorMetrics {
156            records_total: self.records_produced.load(Ordering::Relaxed),
157            ..Default::default()
158        }
159    }
160
161    async fn close(&mut self) -> Result<(), ConnectorError> {
162        self.is_open
163            .store(false, std::sync::atomic::Ordering::Relaxed);
164        Ok(())
165    }
166}
167
168/// Mock sink connector for testing.
169///
170/// Stores all written batches in memory for inspection.
171#[derive(Debug)]
172pub struct MockSinkConnector {
173    schema: SchemaRef,
174    written: Arc<Mutex<Vec<RecordBatch>>>,
175    records_written: AtomicU64,
176    is_open: std::sync::atomic::AtomicBool,
177    current_epoch: AtomicU64,
178}
179
180impl MockSinkConnector {
181    /// Creates a new mock sink.
182    #[must_use]
183    pub fn new() -> Self {
184        Self {
185            schema: mock_schema(),
186            written: Arc::new(Mutex::new(Vec::new())),
187            records_written: AtomicU64::new(0),
188            is_open: std::sync::atomic::AtomicBool::new(false),
189            current_epoch: AtomicU64::new(0),
190        }
191    }
192
193    /// Returns the number of batches written.
194    #[must_use]
195    pub fn batch_count(&self) -> usize {
196        self.written.lock().len()
197    }
198
199    /// Returns the total number of records written.
200    #[must_use]
201    pub fn records_written(&self) -> u64 {
202        self.records_written.load(Ordering::Relaxed)
203    }
204
205    /// Returns a clone of all written batches.
206    #[must_use]
207    pub fn written_batches(&self) -> Vec<RecordBatch> {
208        self.written.lock().clone()
209    }
210}
211
212impl Default for MockSinkConnector {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218#[async_trait]
219impl SinkConnector for MockSinkConnector {
220    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
221        self.is_open
222            .store(true, std::sync::atomic::Ordering::Relaxed);
223        Ok(())
224    }
225
226    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
227        let num_rows = batch.num_rows();
228        let bytes = batch.get_array_memory_size() as u64;
229        self.written.lock().push(batch.clone());
230        self.records_written
231            .fetch_add(num_rows as u64, Ordering::Relaxed);
232        Ok(WriteResult::new(num_rows, bytes))
233    }
234
235    fn schema(&self) -> SchemaRef {
236        self.schema.clone()
237    }
238
239    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
240        self.current_epoch.store(epoch, Ordering::Relaxed);
241        Ok(())
242    }
243
244    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
245        Ok(())
246    }
247
248    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
249        // In a real implementation, this would discard buffered data
250        Ok(())
251    }
252
253    fn health_check(&self) -> HealthStatus {
254        if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
255            HealthStatus::Healthy
256        } else {
257            HealthStatus::Unknown
258        }
259    }
260
261    fn metrics(&self) -> ConnectorMetrics {
262        ConnectorMetrics {
263            records_total: self.records_written.load(Ordering::Relaxed),
264            ..Default::default()
265        }
266    }
267
268    fn capabilities(&self) -> SinkConnectorCapabilities {
269        SinkConnectorCapabilities::default()
270            .with_exactly_once()
271            .with_idempotent()
272            .with_two_phase_commit()
273    }
274
275    async fn close(&mut self) -> Result<(), ConnectorError> {
276        self.is_open
277            .store(false, std::sync::atomic::Ordering::Relaxed);
278        Ok(())
279    }
280}
281
282/// Registers a mock source connector with the registry.
283pub fn register_mock_source(registry: &ConnectorRegistry) {
284    registry.register_source(
285        "mock",
286        ConnectorInfo {
287            name: "mock".to_string(),
288            display_name: "Mock Source".to_string(),
289            version: "0.1.0".to_string(),
290            is_source: true,
291            is_sink: false,
292            config_keys: vec![],
293        },
294        Arc::new(|| Box::new(MockSourceConnector::new())),
295    );
296}
297
298/// Registers a mock sink connector with the registry.
299pub fn register_mock_sink(registry: &ConnectorRegistry) {
300    registry.register_sink(
301        "mock",
302        ConnectorInfo {
303            name: "mock".to_string(),
304            display_name: "Mock Sink".to_string(),
305            version: "0.1.0".to_string(),
306            is_source: false,
307            is_sink: true,
308            config_keys: vec![],
309        },
310        Arc::new(|| Box::new(MockSinkConnector::new())),
311    );
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_mock_batch() {
320        let batch = mock_batch(10);
321        assert_eq!(batch.num_rows(), 10);
322        assert_eq!(batch.num_columns(), 2);
323    }
324
325    #[tokio::test]
326    async fn test_mock_source_connector() {
327        let mut source = MockSourceConnector::with_batches(3, 5);
328        source.open(&ConnectorConfig::new("mock")).await.unwrap();
329
330        assert!(source.health_check().is_healthy());
331
332        let b1 = source.poll_batch(100).await.unwrap();
333        assert!(b1.is_some());
334        assert_eq!(b1.unwrap().num_rows(), 5);
335
336        let b2 = source.poll_batch(100).await.unwrap();
337        assert!(b2.is_some());
338
339        let b3 = source.poll_batch(100).await.unwrap();
340        assert!(b3.is_some());
341
342        let b4 = source.poll_batch(100).await.unwrap();
343        assert!(b4.is_none());
344
345        assert_eq!(source.records_produced(), 15);
346
347        let cp = source.checkpoint();
348        assert_eq!(cp.get_offset("records"), Some("15"));
349
350        source.close().await.unwrap();
351    }
352
353    #[tokio::test]
354    async fn test_mock_sink_connector() {
355        let mut sink = MockSinkConnector::new();
356        sink.open(&ConnectorConfig::new("mock")).await.unwrap();
357
358        assert!(sink.health_check().is_healthy());
359
360        let batch = mock_batch(10);
361        let result = sink.write_batch(&batch).await.unwrap();
362        assert_eq!(result.records_written, 10);
363
364        assert_eq!(sink.batch_count(), 1);
365        assert_eq!(sink.records_written(), 10);
366
367        // Test epoch management
368        sink.begin_epoch(1).await.unwrap();
369        sink.write_batch(&mock_batch(5)).await.unwrap();
370        sink.commit_epoch(1).await.unwrap();
371
372        assert_eq!(sink.records_written(), 15);
373        assert_eq!(sink.batch_count(), 2);
374
375        let caps = sink.capabilities();
376        assert!(caps.exactly_once);
377        assert!(caps.idempotent);
378
379        sink.close().await.unwrap();
380    }
381
382    #[tokio::test]
383    async fn test_mock_source_restore() {
384        let mut source = MockSourceConnector::new();
385        source.open(&ConnectorConfig::new("mock")).await.unwrap();
386
387        let cp = SourceCheckpoint::new(5);
388        assert!(source.restore(&cp).await.is_ok());
389    }
390
391    #[test]
392    fn test_register_helpers() {
393        let registry = ConnectorRegistry::new();
394        register_mock_source(&registry);
395        register_mock_sink(&registry);
396
397        assert!(registry.source_info("mock").is_some());
398        assert!(registry.sink_info("mock").is_some());
399    }
400}