1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10use arrow_array::{Int64Array, RecordBatch, StringArray};
11use arrow_schema::{DataType, Field, Schema, SchemaRef};
12use async_trait::async_trait;
13use parking_lot::Mutex;
14
15use crate::checkpoint::SourceCheckpoint;
16use crate::config::{ConnectorConfig, ConnectorInfo};
17use crate::connector::{
18 SinkConnector, SinkConnectorCapabilities, SourceBatch, SourceConnector, WriteResult,
19};
20use crate::error::ConnectorError;
21use crate::registry::ConnectorRegistry;
22
23#[must_use]
25pub fn mock_schema() -> SchemaRef {
26 Arc::new(Schema::new(vec![
27 Field::new("id", DataType::Int64, false),
28 Field::new("value", DataType::Utf8, false),
29 ]))
30}
31
32#[must_use]
38pub fn mock_batch(n: usize) -> RecordBatch {
39 #[allow(clippy::cast_possible_wrap)]
40 let ids: Vec<i64> = (0..n as i64).collect();
41 let values: Vec<String> = (0..n).map(|i| format!("value_{i}")).collect();
42 let value_refs: Vec<&str> = values.iter().map(String::as_str).collect();
43
44 RecordBatch::try_new(
45 mock_schema(),
46 vec![
47 Arc::new(Int64Array::from(ids)),
48 Arc::new(StringArray::from(value_refs)),
49 ],
50 )
51 .unwrap()
52}
53
54#[derive(Debug)]
58pub struct MockSourceConnector {
59 schema: SchemaRef,
60 batches_remaining: AtomicU64,
61 batch_size: usize,
62 records_produced: AtomicU64,
63 is_open: std::sync::atomic::AtomicBool,
64 committed_epochs: Arc<Mutex<Vec<u64>>>,
65}
66
67impl MockSourceConnector {
68 #[must_use]
70 pub fn new() -> Self {
71 Self {
72 schema: mock_schema(),
73 batches_remaining: AtomicU64::new(10),
74 batch_size: 5,
75 records_produced: AtomicU64::new(0),
76 is_open: std::sync::atomic::AtomicBool::new(false),
77 committed_epochs: Arc::new(Mutex::new(Vec::new())),
78 }
79 }
80
81 #[must_use]
83 pub fn with_batches(count: u64, batch_size: usize) -> Self {
84 Self {
85 schema: mock_schema(),
86 batches_remaining: AtomicU64::new(count),
87 batch_size,
88 records_produced: AtomicU64::new(0),
89 is_open: std::sync::atomic::AtomicBool::new(false),
90 committed_epochs: Arc::new(Mutex::new(Vec::new())),
91 }
92 }
93
94 #[must_use]
96 pub fn records_produced(&self) -> u64 {
97 self.records_produced.load(Ordering::Relaxed)
98 }
99
100 #[must_use]
105 pub fn committed_epochs_handle(&self) -> Arc<Mutex<Vec<u64>>> {
106 Arc::clone(&self.committed_epochs)
107 }
108}
109
110impl Default for MockSourceConnector {
111 fn default() -> Self {
112 Self::new()
113 }
114}
115
116#[async_trait]
117impl SourceConnector for MockSourceConnector {
118 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
119 self.is_open
120 .store(true, std::sync::atomic::Ordering::Relaxed);
121 Ok(())
122 }
123
124 async fn poll_batch(
125 &mut self,
126 _max_records: usize,
127 ) -> Result<Option<SourceBatch>, ConnectorError> {
128 let remaining = self.batches_remaining.load(Ordering::Relaxed);
129 if remaining == 0 {
130 return Ok(None);
131 }
132 self.batches_remaining.fetch_sub(1, Ordering::Relaxed);
133
134 let batch = mock_batch(self.batch_size);
135 self.records_produced
136 .fetch_add(self.batch_size as u64, Ordering::Relaxed);
137 Ok(Some(SourceBatch::new(batch)))
138 }
139
140 fn schema(&self) -> SchemaRef {
141 self.schema.clone()
142 }
143
144 fn checkpoint(&self) -> SourceCheckpoint {
145 let mut cp = SourceCheckpoint::new(0);
146 cp.set_offset(
147 "records",
148 self.records_produced.load(Ordering::Relaxed).to_string(),
149 );
150 cp
151 }
152
153 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
154 Ok(())
155 }
156
157 async fn close(&mut self) -> Result<(), ConnectorError> {
158 self.is_open
159 .store(false, std::sync::atomic::Ordering::Relaxed);
160 Ok(())
161 }
162
163 async fn notify_epoch_committed(
164 &mut self,
165 epoch: u64,
166 _checkpoint: &SourceCheckpoint,
167 ) -> Result<(), ConnectorError> {
168 self.committed_epochs.lock().push(epoch);
169 Ok(())
170 }
171}
172
173#[derive(Debug)]
177pub struct MockSinkConnector {
178 schema: SchemaRef,
179 written: Arc<Mutex<Vec<RecordBatch>>>,
180 records_written: AtomicU64,
181 is_open: std::sync::atomic::AtomicBool,
182 current_epoch: AtomicU64,
183}
184
185impl MockSinkConnector {
186 #[must_use]
188 pub fn new() -> Self {
189 Self {
190 schema: mock_schema(),
191 written: Arc::new(Mutex::new(Vec::new())),
192 records_written: AtomicU64::new(0),
193 is_open: std::sync::atomic::AtomicBool::new(false),
194 current_epoch: AtomicU64::new(0),
195 }
196 }
197
198 #[must_use]
200 pub fn batch_count(&self) -> usize {
201 self.written.lock().len()
202 }
203
204 #[must_use]
206 pub fn records_written(&self) -> u64 {
207 self.records_written.load(Ordering::Relaxed)
208 }
209
210 #[must_use]
212 pub fn written_batches(&self) -> Vec<RecordBatch> {
213 self.written.lock().clone()
214 }
215}
216
217impl Default for MockSinkConnector {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223#[async_trait]
224impl SinkConnector for MockSinkConnector {
225 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
226 self.is_open
227 .store(true, std::sync::atomic::Ordering::Relaxed);
228 Ok(())
229 }
230
231 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
232 let num_rows = batch.num_rows();
233 let bytes = batch.get_array_memory_size() as u64;
234 self.written.lock().push(batch.clone());
235 self.records_written
236 .fetch_add(num_rows as u64, Ordering::Relaxed);
237 Ok(WriteResult::new(num_rows, bytes))
238 }
239
240 fn schema(&self) -> SchemaRef {
241 self.schema.clone()
242 }
243
244 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
245 self.current_epoch.store(epoch, Ordering::Relaxed);
246 Ok(())
247 }
248
249 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
250 Ok(())
251 }
252
253 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
254 Ok(())
256 }
257
258 fn capabilities(&self) -> SinkConnectorCapabilities {
259 SinkConnectorCapabilities::new(Duration::from_secs(60))
260 .with_exactly_once()
261 .with_idempotent()
262 .with_two_phase_commit()
263 }
264
265 async fn close(&mut self) -> Result<(), ConnectorError> {
266 self.is_open
267 .store(false, std::sync::atomic::Ordering::Relaxed);
268 Ok(())
269 }
270}
271
272pub fn register_mock_source(registry: &ConnectorRegistry) {
274 registry.register_source(
275 "mock",
276 ConnectorInfo {
277 name: "mock".to_string(),
278 display_name: "Mock Source".to_string(),
279 version: "0.1.0".to_string(),
280 is_source: true,
281 is_sink: false,
282 config_keys: vec![],
283 },
284 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
285 );
286}
287
288pub fn register_mock_sink(registry: &ConnectorRegistry) {
290 registry.register_sink(
291 "mock",
292 ConnectorInfo {
293 name: "mock".to_string(),
294 display_name: "Mock Sink".to_string(),
295 version: "0.1.0".to_string(),
296 is_source: false,
297 is_sink: true,
298 config_keys: vec![],
299 },
300 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
301 );
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 #[test]
309 fn test_mock_batch() {
310 let batch = mock_batch(10);
311 assert_eq!(batch.num_rows(), 10);
312 assert_eq!(batch.num_columns(), 2);
313 }
314
315 #[tokio::test]
316 async fn test_mock_source_connector() {
317 let mut source = MockSourceConnector::with_batches(3, 5);
318 source.open(&ConnectorConfig::new("mock")).await.unwrap();
319
320 let b1 = source.poll_batch(100).await.unwrap();
321 assert!(b1.is_some());
322 assert_eq!(b1.unwrap().num_rows(), 5);
323
324 let b2 = source.poll_batch(100).await.unwrap();
325 assert!(b2.is_some());
326
327 let b3 = source.poll_batch(100).await.unwrap();
328 assert!(b3.is_some());
329
330 let b4 = source.poll_batch(100).await.unwrap();
331 assert!(b4.is_none());
332
333 assert_eq!(source.records_produced(), 15);
334
335 let cp = source.checkpoint();
336 assert_eq!(cp.get_offset("records"), Some("15"));
337
338 source.close().await.unwrap();
339 }
340
341 #[tokio::test]
342 async fn test_mock_sink_connector() {
343 let mut sink = MockSinkConnector::new();
344 sink.open(&ConnectorConfig::new("mock")).await.unwrap();
345
346 let batch = mock_batch(10);
347 let result = sink.write_batch(&batch).await.unwrap();
348 assert_eq!(result.records_written, 10);
349
350 assert_eq!(sink.batch_count(), 1);
351 assert_eq!(sink.records_written(), 10);
352
353 sink.begin_epoch(1).await.unwrap();
355 sink.write_batch(&mock_batch(5)).await.unwrap();
356 sink.commit_epoch(1).await.unwrap();
357
358 assert_eq!(sink.records_written(), 15);
359 assert_eq!(sink.batch_count(), 2);
360
361 let caps = sink.capabilities();
362 assert!(caps.exactly_once);
363 assert!(caps.idempotent);
364
365 sink.close().await.unwrap();
366 }
367
368 #[tokio::test]
369 async fn test_mock_source_restore() {
370 let mut source = MockSourceConnector::new();
371 source.open(&ConnectorConfig::new("mock")).await.unwrap();
372
373 let cp = SourceCheckpoint::new(5);
374 assert!(source.restore(&cp).await.is_ok());
375 }
376
377 #[test]
378 fn test_register_helpers() {
379 let registry = ConnectorRegistry::new();
380 register_mock_source(®istry);
381 register_mock_sink(®istry);
382
383 assert!(registry.source_info("mock").is_some());
384 assert!(registry.sink_info("mock").is_some());
385 }
386}