1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10use arrow_array::{Int64Array, RecordBatch, StringArray};
11use arrow_schema::{DataType, Field, Schema, SchemaRef};
12use async_trait::async_trait;
13use parking_lot::Mutex;
14
15use crate::checkpoint::SourceCheckpoint;
16use crate::config::{ConnectorConfig, ConnectorInfo};
17use crate::connector::{
18 SinkConnector, SinkConnectorCapabilities, SourceBatch, SourceConnector, WriteResult,
19};
20use crate::error::ConnectorError;
21use crate::health::HealthStatus;
22use crate::metrics::ConnectorMetrics;
23use crate::registry::ConnectorRegistry;
24
25#[must_use]
27pub fn mock_schema() -> SchemaRef {
28 Arc::new(Schema::new(vec![
29 Field::new("id", DataType::Int64, false),
30 Field::new("value", DataType::Utf8, false),
31 ]))
32}
33
34#[must_use]
40pub fn mock_batch(n: usize) -> RecordBatch {
41 #[allow(clippy::cast_possible_wrap)]
42 let ids: Vec<i64> = (0..n as i64).collect();
43 let values: Vec<String> = (0..n).map(|i| format!("value_{i}")).collect();
44 let value_refs: Vec<&str> = values.iter().map(String::as_str).collect();
45
46 RecordBatch::try_new(
47 mock_schema(),
48 vec![
49 Arc::new(Int64Array::from(ids)),
50 Arc::new(StringArray::from(value_refs)),
51 ],
52 )
53 .unwrap()
54}
55
56#[derive(Debug)]
60pub struct MockSourceConnector {
61 schema: SchemaRef,
62 batches_remaining: AtomicU64,
63 batch_size: usize,
64 records_produced: AtomicU64,
65 is_open: std::sync::atomic::AtomicBool,
66 committed_epochs: Arc<Mutex<Vec<u64>>>,
67}
68
69impl MockSourceConnector {
70 #[must_use]
72 pub fn new() -> Self {
73 Self {
74 schema: mock_schema(),
75 batches_remaining: AtomicU64::new(10),
76 batch_size: 5,
77 records_produced: AtomicU64::new(0),
78 is_open: std::sync::atomic::AtomicBool::new(false),
79 committed_epochs: Arc::new(Mutex::new(Vec::new())),
80 }
81 }
82
83 #[must_use]
85 pub fn with_batches(count: u64, batch_size: usize) -> Self {
86 Self {
87 schema: mock_schema(),
88 batches_remaining: AtomicU64::new(count),
89 batch_size,
90 records_produced: AtomicU64::new(0),
91 is_open: std::sync::atomic::AtomicBool::new(false),
92 committed_epochs: Arc::new(Mutex::new(Vec::new())),
93 }
94 }
95
96 #[must_use]
98 pub fn records_produced(&self) -> u64 {
99 self.records_produced.load(Ordering::Relaxed)
100 }
101
102 #[must_use]
107 pub fn committed_epochs_handle(&self) -> Arc<Mutex<Vec<u64>>> {
108 Arc::clone(&self.committed_epochs)
109 }
110}
111
112impl Default for MockSourceConnector {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118#[async_trait]
119impl SourceConnector for MockSourceConnector {
120 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
121 self.is_open
122 .store(true, std::sync::atomic::Ordering::Relaxed);
123 Ok(())
124 }
125
126 async fn poll_batch(
127 &mut self,
128 _max_records: usize,
129 ) -> Result<Option<SourceBatch>, ConnectorError> {
130 let remaining = self.batches_remaining.load(Ordering::Relaxed);
131 if remaining == 0 {
132 return Ok(None);
133 }
134 self.batches_remaining.fetch_sub(1, Ordering::Relaxed);
135
136 let batch = mock_batch(self.batch_size);
137 self.records_produced
138 .fetch_add(self.batch_size as u64, Ordering::Relaxed);
139 Ok(Some(SourceBatch::new(batch)))
140 }
141
142 fn schema(&self) -> SchemaRef {
143 self.schema.clone()
144 }
145
146 fn checkpoint(&self) -> SourceCheckpoint {
147 let mut cp = SourceCheckpoint::new(0);
148 cp.set_offset(
149 "records",
150 self.records_produced.load(Ordering::Relaxed).to_string(),
151 );
152 cp
153 }
154
155 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
156 Ok(())
157 }
158
159 fn health_check(&self) -> HealthStatus {
160 if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
161 HealthStatus::Healthy
162 } else {
163 HealthStatus::Unknown
164 }
165 }
166
167 fn metrics(&self) -> ConnectorMetrics {
168 ConnectorMetrics {
169 records_total: self.records_produced.load(Ordering::Relaxed),
170 ..Default::default()
171 }
172 }
173
174 async fn close(&mut self) -> Result<(), ConnectorError> {
175 self.is_open
176 .store(false, std::sync::atomic::Ordering::Relaxed);
177 Ok(())
178 }
179
180 async fn notify_epoch_committed(&mut self, epoch: u64) -> Result<(), ConnectorError> {
181 self.committed_epochs.lock().push(epoch);
182 Ok(())
183 }
184}
185
186#[derive(Debug)]
190pub struct MockSinkConnector {
191 schema: SchemaRef,
192 written: Arc<Mutex<Vec<RecordBatch>>>,
193 records_written: AtomicU64,
194 is_open: std::sync::atomic::AtomicBool,
195 current_epoch: AtomicU64,
196}
197
198impl MockSinkConnector {
199 #[must_use]
201 pub fn new() -> Self {
202 Self {
203 schema: mock_schema(),
204 written: Arc::new(Mutex::new(Vec::new())),
205 records_written: AtomicU64::new(0),
206 is_open: std::sync::atomic::AtomicBool::new(false),
207 current_epoch: AtomicU64::new(0),
208 }
209 }
210
211 #[must_use]
213 pub fn batch_count(&self) -> usize {
214 self.written.lock().len()
215 }
216
217 #[must_use]
219 pub fn records_written(&self) -> u64 {
220 self.records_written.load(Ordering::Relaxed)
221 }
222
223 #[must_use]
225 pub fn written_batches(&self) -> Vec<RecordBatch> {
226 self.written.lock().clone()
227 }
228}
229
230impl Default for MockSinkConnector {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236#[async_trait]
237impl SinkConnector for MockSinkConnector {
238 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
239 self.is_open
240 .store(true, std::sync::atomic::Ordering::Relaxed);
241 Ok(())
242 }
243
244 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
245 let num_rows = batch.num_rows();
246 let bytes = batch.get_array_memory_size() as u64;
247 self.written.lock().push(batch.clone());
248 self.records_written
249 .fetch_add(num_rows as u64, Ordering::Relaxed);
250 Ok(WriteResult::new(num_rows, bytes))
251 }
252
253 fn schema(&self) -> SchemaRef {
254 self.schema.clone()
255 }
256
257 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
258 self.current_epoch.store(epoch, Ordering::Relaxed);
259 Ok(())
260 }
261
262 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
263 Ok(())
264 }
265
266 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
267 Ok(())
269 }
270
271 fn health_check(&self) -> HealthStatus {
272 if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
273 HealthStatus::Healthy
274 } else {
275 HealthStatus::Unknown
276 }
277 }
278
279 fn metrics(&self) -> ConnectorMetrics {
280 ConnectorMetrics {
281 records_total: self.records_written.load(Ordering::Relaxed),
282 ..Default::default()
283 }
284 }
285
286 fn capabilities(&self) -> SinkConnectorCapabilities {
287 SinkConnectorCapabilities::new(Duration::from_secs(60))
288 .with_exactly_once()
289 .with_idempotent()
290 .with_two_phase_commit()
291 }
292
293 async fn close(&mut self) -> Result<(), ConnectorError> {
294 self.is_open
295 .store(false, std::sync::atomic::Ordering::Relaxed);
296 Ok(())
297 }
298}
299
300pub fn register_mock_source(registry: &ConnectorRegistry) {
302 registry.register_source(
303 "mock",
304 ConnectorInfo {
305 name: "mock".to_string(),
306 display_name: "Mock Source".to_string(),
307 version: "0.1.0".to_string(),
308 is_source: true,
309 is_sink: false,
310 config_keys: vec![],
311 },
312 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
313 );
314}
315
316pub fn register_mock_sink(registry: &ConnectorRegistry) {
318 registry.register_sink(
319 "mock",
320 ConnectorInfo {
321 name: "mock".to_string(),
322 display_name: "Mock Sink".to_string(),
323 version: "0.1.0".to_string(),
324 is_source: false,
325 is_sink: true,
326 config_keys: vec![],
327 },
328 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
329 );
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn test_mock_batch() {
338 let batch = mock_batch(10);
339 assert_eq!(batch.num_rows(), 10);
340 assert_eq!(batch.num_columns(), 2);
341 }
342
343 #[tokio::test]
344 async fn test_mock_source_connector() {
345 let mut source = MockSourceConnector::with_batches(3, 5);
346 source.open(&ConnectorConfig::new("mock")).await.unwrap();
347
348 assert!(source.health_check().is_healthy());
349
350 let b1 = source.poll_batch(100).await.unwrap();
351 assert!(b1.is_some());
352 assert_eq!(b1.unwrap().num_rows(), 5);
353
354 let b2 = source.poll_batch(100).await.unwrap();
355 assert!(b2.is_some());
356
357 let b3 = source.poll_batch(100).await.unwrap();
358 assert!(b3.is_some());
359
360 let b4 = source.poll_batch(100).await.unwrap();
361 assert!(b4.is_none());
362
363 assert_eq!(source.records_produced(), 15);
364
365 let cp = source.checkpoint();
366 assert_eq!(cp.get_offset("records"), Some("15"));
367
368 source.close().await.unwrap();
369 }
370
371 #[tokio::test]
372 async fn test_mock_sink_connector() {
373 let mut sink = MockSinkConnector::new();
374 sink.open(&ConnectorConfig::new("mock")).await.unwrap();
375
376 assert!(sink.health_check().is_healthy());
377
378 let batch = mock_batch(10);
379 let result = sink.write_batch(&batch).await.unwrap();
380 assert_eq!(result.records_written, 10);
381
382 assert_eq!(sink.batch_count(), 1);
383 assert_eq!(sink.records_written(), 10);
384
385 sink.begin_epoch(1).await.unwrap();
387 sink.write_batch(&mock_batch(5)).await.unwrap();
388 sink.commit_epoch(1).await.unwrap();
389
390 assert_eq!(sink.records_written(), 15);
391 assert_eq!(sink.batch_count(), 2);
392
393 let caps = sink.capabilities();
394 assert!(caps.exactly_once);
395 assert!(caps.idempotent);
396
397 sink.close().await.unwrap();
398 }
399
400 #[tokio::test]
401 async fn test_mock_source_restore() {
402 let mut source = MockSourceConnector::new();
403 source.open(&ConnectorConfig::new("mock")).await.unwrap();
404
405 let cp = SourceCheckpoint::new(5);
406 assert!(source.restore(&cp).await.is_ok());
407 }
408
409 #[test]
410 fn test_register_helpers() {
411 let registry = ConnectorRegistry::new();
412 register_mock_source(®istry);
413 register_mock_sink(®istry);
414
415 assert!(registry.source_info("mock").is_some());
416 assert!(registry.sink_info("mock").is_some());
417 }
418}