1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use arrow_array::{Int64Array, RecordBatch, StringArray};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use parking_lot::Mutex;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::{ConnectorConfig, ConnectorInfo};
16use crate::connector::{
17 SinkConnector, SinkConnectorCapabilities, SourceBatch, SourceConnector, WriteResult,
18};
19use crate::error::ConnectorError;
20use crate::health::HealthStatus;
21use crate::metrics::ConnectorMetrics;
22use crate::registry::ConnectorRegistry;
23
24#[must_use]
26pub fn mock_schema() -> SchemaRef {
27 Arc::new(Schema::new(vec![
28 Field::new("id", DataType::Int64, false),
29 Field::new("value", DataType::Utf8, false),
30 ]))
31}
32
33#[must_use]
39pub fn mock_batch(n: usize) -> RecordBatch {
40 #[allow(clippy::cast_possible_wrap)]
41 let ids: Vec<i64> = (0..n as i64).collect();
42 let values: Vec<String> = (0..n).map(|i| format!("value_{i}")).collect();
43 let value_refs: Vec<&str> = values.iter().map(String::as_str).collect();
44
45 RecordBatch::try_new(
46 mock_schema(),
47 vec![
48 Arc::new(Int64Array::from(ids)),
49 Arc::new(StringArray::from(value_refs)),
50 ],
51 )
52 .unwrap()
53}
54
55#[derive(Debug)]
59pub struct MockSourceConnector {
60 schema: SchemaRef,
61 batches_remaining: AtomicU64,
62 batch_size: usize,
63 records_produced: AtomicU64,
64 is_open: std::sync::atomic::AtomicBool,
65}
66
67impl MockSourceConnector {
68 #[must_use]
70 pub fn new() -> Self {
71 Self {
72 schema: mock_schema(),
73 batches_remaining: AtomicU64::new(10),
74 batch_size: 5,
75 records_produced: AtomicU64::new(0),
76 is_open: std::sync::atomic::AtomicBool::new(false),
77 }
78 }
79
80 #[must_use]
82 pub fn with_batches(count: u64, batch_size: usize) -> Self {
83 Self {
84 schema: mock_schema(),
85 batches_remaining: AtomicU64::new(count),
86 batch_size,
87 records_produced: AtomicU64::new(0),
88 is_open: std::sync::atomic::AtomicBool::new(false),
89 }
90 }
91
92 #[must_use]
94 pub fn records_produced(&self) -> u64 {
95 self.records_produced.load(Ordering::Relaxed)
96 }
97}
98
99impl Default for MockSourceConnector {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105#[async_trait]
106impl SourceConnector for MockSourceConnector {
107 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
108 self.is_open
109 .store(true, std::sync::atomic::Ordering::Relaxed);
110 Ok(())
111 }
112
113 async fn poll_batch(
114 &mut self,
115 _max_records: usize,
116 ) -> Result<Option<SourceBatch>, ConnectorError> {
117 let remaining = self.batches_remaining.load(Ordering::Relaxed);
118 if remaining == 0 {
119 return Ok(None);
120 }
121 self.batches_remaining.fetch_sub(1, Ordering::Relaxed);
122
123 let batch = mock_batch(self.batch_size);
124 self.records_produced
125 .fetch_add(self.batch_size as u64, Ordering::Relaxed);
126 Ok(Some(SourceBatch::new(batch)))
127 }
128
129 fn schema(&self) -> SchemaRef {
130 self.schema.clone()
131 }
132
133 fn checkpoint(&self) -> SourceCheckpoint {
134 let mut cp = SourceCheckpoint::new(0);
135 cp.set_offset(
136 "records",
137 self.records_produced.load(Ordering::Relaxed).to_string(),
138 );
139 cp
140 }
141
142 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
143 Ok(())
144 }
145
146 fn health_check(&self) -> HealthStatus {
147 if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
148 HealthStatus::Healthy
149 } else {
150 HealthStatus::Unknown
151 }
152 }
153
154 fn metrics(&self) -> ConnectorMetrics {
155 ConnectorMetrics {
156 records_total: self.records_produced.load(Ordering::Relaxed),
157 ..Default::default()
158 }
159 }
160
161 async fn close(&mut self) -> Result<(), ConnectorError> {
162 self.is_open
163 .store(false, std::sync::atomic::Ordering::Relaxed);
164 Ok(())
165 }
166}
167
168#[derive(Debug)]
172pub struct MockSinkConnector {
173 schema: SchemaRef,
174 written: Arc<Mutex<Vec<RecordBatch>>>,
175 records_written: AtomicU64,
176 is_open: std::sync::atomic::AtomicBool,
177 current_epoch: AtomicU64,
178}
179
180impl MockSinkConnector {
181 #[must_use]
183 pub fn new() -> Self {
184 Self {
185 schema: mock_schema(),
186 written: Arc::new(Mutex::new(Vec::new())),
187 records_written: AtomicU64::new(0),
188 is_open: std::sync::atomic::AtomicBool::new(false),
189 current_epoch: AtomicU64::new(0),
190 }
191 }
192
193 #[must_use]
195 pub fn batch_count(&self) -> usize {
196 self.written.lock().len()
197 }
198
199 #[must_use]
201 pub fn records_written(&self) -> u64 {
202 self.records_written.load(Ordering::Relaxed)
203 }
204
205 #[must_use]
207 pub fn written_batches(&self) -> Vec<RecordBatch> {
208 self.written.lock().clone()
209 }
210}
211
212impl Default for MockSinkConnector {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218#[async_trait]
219impl SinkConnector for MockSinkConnector {
220 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
221 self.is_open
222 .store(true, std::sync::atomic::Ordering::Relaxed);
223 Ok(())
224 }
225
226 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
227 let num_rows = batch.num_rows();
228 let bytes = batch.get_array_memory_size() as u64;
229 self.written.lock().push(batch.clone());
230 self.records_written
231 .fetch_add(num_rows as u64, Ordering::Relaxed);
232 Ok(WriteResult::new(num_rows, bytes))
233 }
234
235 fn schema(&self) -> SchemaRef {
236 self.schema.clone()
237 }
238
239 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
240 self.current_epoch.store(epoch, Ordering::Relaxed);
241 Ok(())
242 }
243
244 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
245 Ok(())
246 }
247
248 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
249 Ok(())
251 }
252
253 fn health_check(&self) -> HealthStatus {
254 if self.is_open.load(std::sync::atomic::Ordering::Relaxed) {
255 HealthStatus::Healthy
256 } else {
257 HealthStatus::Unknown
258 }
259 }
260
261 fn metrics(&self) -> ConnectorMetrics {
262 ConnectorMetrics {
263 records_total: self.records_written.load(Ordering::Relaxed),
264 ..Default::default()
265 }
266 }
267
268 fn capabilities(&self) -> SinkConnectorCapabilities {
269 SinkConnectorCapabilities::default()
270 .with_exactly_once()
271 .with_idempotent()
272 .with_two_phase_commit()
273 }
274
275 async fn close(&mut self) -> Result<(), ConnectorError> {
276 self.is_open
277 .store(false, std::sync::atomic::Ordering::Relaxed);
278 Ok(())
279 }
280}
281
282pub fn register_mock_source(registry: &ConnectorRegistry) {
284 registry.register_source(
285 "mock",
286 ConnectorInfo {
287 name: "mock".to_string(),
288 display_name: "Mock Source".to_string(),
289 version: "0.1.0".to_string(),
290 is_source: true,
291 is_sink: false,
292 config_keys: vec![],
293 },
294 Arc::new(|| Box::new(MockSourceConnector::new())),
295 );
296}
297
298pub fn register_mock_sink(registry: &ConnectorRegistry) {
300 registry.register_sink(
301 "mock",
302 ConnectorInfo {
303 name: "mock".to_string(),
304 display_name: "Mock Sink".to_string(),
305 version: "0.1.0".to_string(),
306 is_source: false,
307 is_sink: true,
308 config_keys: vec![],
309 },
310 Arc::new(|| Box::new(MockSinkConnector::new())),
311 );
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn test_mock_batch() {
320 let batch = mock_batch(10);
321 assert_eq!(batch.num_rows(), 10);
322 assert_eq!(batch.num_columns(), 2);
323 }
324
325 #[tokio::test]
326 async fn test_mock_source_connector() {
327 let mut source = MockSourceConnector::with_batches(3, 5);
328 source.open(&ConnectorConfig::new("mock")).await.unwrap();
329
330 assert!(source.health_check().is_healthy());
331
332 let b1 = source.poll_batch(100).await.unwrap();
333 assert!(b1.is_some());
334 assert_eq!(b1.unwrap().num_rows(), 5);
335
336 let b2 = source.poll_batch(100).await.unwrap();
337 assert!(b2.is_some());
338
339 let b3 = source.poll_batch(100).await.unwrap();
340 assert!(b3.is_some());
341
342 let b4 = source.poll_batch(100).await.unwrap();
343 assert!(b4.is_none());
344
345 assert_eq!(source.records_produced(), 15);
346
347 let cp = source.checkpoint();
348 assert_eq!(cp.get_offset("records"), Some("15"));
349
350 source.close().await.unwrap();
351 }
352
353 #[tokio::test]
354 async fn test_mock_sink_connector() {
355 let mut sink = MockSinkConnector::new();
356 sink.open(&ConnectorConfig::new("mock")).await.unwrap();
357
358 assert!(sink.health_check().is_healthy());
359
360 let batch = mock_batch(10);
361 let result = sink.write_batch(&batch).await.unwrap();
362 assert_eq!(result.records_written, 10);
363
364 assert_eq!(sink.batch_count(), 1);
365 assert_eq!(sink.records_written(), 10);
366
367 sink.begin_epoch(1).await.unwrap();
369 sink.write_batch(&mock_batch(5)).await.unwrap();
370 sink.commit_epoch(1).await.unwrap();
371
372 assert_eq!(sink.records_written(), 15);
373 assert_eq!(sink.batch_count(), 2);
374
375 let caps = sink.capabilities();
376 assert!(caps.exactly_once);
377 assert!(caps.idempotent);
378
379 sink.close().await.unwrap();
380 }
381
382 #[tokio::test]
383 async fn test_mock_source_restore() {
384 let mut source = MockSourceConnector::new();
385 source.open(&ConnectorConfig::new("mock")).await.unwrap();
386
387 let cp = SourceCheckpoint::new(5);
388 assert!(source.restore(&cp).await.is_ok());
389 }
390
391 #[test]
392 fn test_register_helpers() {
393 let registry = ConnectorRegistry::new();
394 register_mock_source(®istry);
395 register_mock_sink(®istry);
396
397 assert!(registry.source_info("mock").is_some());
398 assert!(registry.sink_info("mock").is_some());
399 }
400}