1#![allow(clippy::disallowed_types)] use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14use arrow_array::RecordBatch;
15use laminar_connectors::checkpoint::SourceCheckpoint;
16use laminar_connectors::config::ConnectorConfig;
17use laminar_connectors::connector::{DeliveryGuarantee, SourceConnector};
18use laminar_core::checkpoint::{BarrierPollHandle, CheckpointBarrierInjector};
19use laminar_core::operator::Event;
20use laminar_core::tpc::{CoreMessage, SpscQueue};
21
22#[derive(Debug)]
24pub struct SourceIoMetrics {
25 pub batches: AtomicU64,
27 pub records: AtomicU64,
29 pub errors: AtomicU64,
31 pub last_poll_ns: AtomicU64,
33}
34
35impl Default for SourceIoMetrics {
36 fn default() -> Self {
37 Self {
38 batches: AtomicU64::new(0),
39 records: AtomicU64::new(0),
40 errors: AtomicU64::new(0),
41 last_poll_ns: AtomicU64::new(0),
42 }
43 }
44}
45
46pub struct SourceIoThread {
52 thread: Option<JoinHandle<Option<Box<dyn SourceConnector>>>>,
53 shutdown: Arc<AtomicBool>,
54 started: Arc<AtomicBool>,
57 pub injector: CheckpointBarrierInjector,
59 pub metrics: Arc<SourceIoMetrics>,
61 pub checkpoint_rx: tokio::sync::watch::Receiver<SourceCheckpoint>,
63}
64
65impl SourceIoThread {
66 #[allow(clippy::too_many_arguments)]
78 pub fn spawn(
79 source_idx: usize,
80 name: String,
81 connector: Box<dyn SourceConnector>,
82 config: ConnectorConfig,
83 target_inbox: Arc<SpscQueue<CoreMessage>>,
84 max_poll_records: usize,
85 fallback_poll_interval: Duration,
86 core_thread: thread::Thread,
87 restore_checkpoint: Option<SourceCheckpoint>,
88 delivery_guarantee: DeliveryGuarantee,
89 ) -> std::io::Result<Self> {
90 let shutdown = Arc::new(AtomicBool::new(false));
91 let started = Arc::new(AtomicBool::new(false));
92 let injector = CheckpointBarrierInjector::new();
93 let barrier_handle = injector.handle();
94 let metrics = Arc::new(SourceIoMetrics::default());
95 let (cp_tx, cp_rx) = tokio::sync::watch::channel(SourceCheckpoint::new(0));
96
97 let shutdown_clone = Arc::clone(&shutdown);
98 let started_clone = Arc::clone(&started);
99 let metrics_clone = Arc::clone(&metrics);
100
101 let thread = thread::Builder::new()
102 .name(format!("laminar-src-{name}"))
103 .spawn(move || {
104 source_io_main(
105 source_idx,
106 name,
107 connector,
108 config,
109 target_inbox,
110 max_poll_records,
111 fallback_poll_interval,
112 shutdown_clone,
113 barrier_handle,
114 metrics_clone,
115 cp_tx,
116 core_thread,
117 restore_checkpoint,
118 delivery_guarantee,
119 started_clone,
120 )
121 })?;
122
123 Ok(Self {
124 thread: Some(thread),
125 shutdown,
126 started,
127 injector,
128 metrics,
129 checkpoint_rx: cp_rx,
130 })
131 }
132
133 #[must_use]
135 pub fn has_started(&self) -> bool {
136 self.started.load(Ordering::Acquire)
137 }
138
139 #[must_use]
143 pub fn has_failed(&self) -> bool {
144 !self.has_started()
145 && self
146 .thread
147 .as_ref()
148 .is_some_and(std::thread::JoinHandle::is_finished)
149 }
150
151 pub fn shutdown_and_join(&mut self) -> Option<Box<dyn SourceConnector>> {
153 self.shutdown.store(true, Ordering::Release);
154 self.thread.take().and_then(|h| h.join().ok()).flatten()
155 }
156}
157
158impl Drop for SourceIoThread {
159 fn drop(&mut self) {
160 self.shutdown.store(true, Ordering::Release);
161 if let Some(handle) = self.thread.take() {
162 let _ = handle.join();
163 }
164 }
165}
166
167fn extract_timestamp(batch: &RecordBatch) -> i64 {
174 use arrow::compute::kernels::aggregate::max as arrow_max;
175 use arrow_array::Array;
176
177 for name in &["event_time", "timestamp"] {
178 if let Ok(col_idx) = batch.schema().index_of(name) {
179 let col = batch.column(col_idx);
180 if let Some(arr) = col.as_any().downcast_ref::<arrow_array::Int64Array>() {
182 if !arr.is_empty() {
183 if let Some(max_val) = arrow_max(arr) {
184 return max_val;
185 }
186 }
187 }
188 if let Some(arr) = col
190 .as_any()
191 .downcast_ref::<arrow_array::TimestampMillisecondArray>()
192 {
193 if !arr.is_empty() {
194 if let Some(max_val) = arrow_max(arr) {
195 return max_val;
196 }
197 }
198 }
199 }
200 }
201
202 #[allow(clippy::cast_possible_truncation)]
204 std::time::SystemTime::now()
205 .duration_since(std::time::UNIX_EPOCH)
206 .map_or(0, |d| d.as_millis() as i64)
207}
208
209#[allow(
211 clippy::too_many_arguments,
212 clippy::needless_pass_by_value,
213 clippy::too_many_lines
214)]
215fn source_io_main(
216 source_idx: usize,
217 name: String,
218 mut connector: Box<dyn SourceConnector>,
219 config: ConnectorConfig,
220 target_inbox: Arc<SpscQueue<CoreMessage>>,
221 max_poll_records: usize,
222 fallback_poll_interval: Duration,
223 shutdown: Arc<AtomicBool>,
224 barrier_handle: BarrierPollHandle,
225 metrics: Arc<SourceIoMetrics>,
226 cp_tx: tokio::sync::watch::Sender<SourceCheckpoint>,
227 core_thread: thread::Thread,
228 restore_checkpoint: Option<SourceCheckpoint>,
229 delivery_guarantee: DeliveryGuarantee,
230 started: Arc<AtomicBool>,
231) -> Option<Box<dyn SourceConnector>> {
232 let rt = match tokio::runtime::Builder::new_current_thread()
233 .enable_all()
234 .build()
235 {
236 Ok(rt) => rt,
237 Err(e) => {
238 tracing::error!("Source '{name}': failed to create tokio runtime: {e}");
239 return Some(connector);
240 }
241 };
242
243 rt.block_on(async {
244 if let Err(e) = connector.open(&config).await {
246 tracing::error!("Source '{name}': failed to open connector: {e}");
247 return None;
248 }
249
250 if let Some(ref checkpoint) = restore_checkpoint {
255 match connector.restore(checkpoint).await {
256 Ok(()) => {
257 tracing::info!(
258 source = %name,
259 "restored source from checkpoint"
260 );
261 }
262 Err(e) => {
263 if matches!(delivery_guarantee, DeliveryGuarantee::ExactlyOnce) {
264 tracing::error!(
265 source = %name,
266 error = %e,
267 "[LDB-5030] source checkpoint restore failed under exactly-once \
268 — cannot guarantee delivery semantics"
269 );
270 return None;
271 }
272 tracing::warn!(
273 source = %name,
274 error = %e,
275 "source checkpoint restore failed, starting from group offsets"
276 );
277 }
278 }
279 }
280
281 started.store(true, Ordering::Release);
282 let notify = connector.data_ready_notify();
283 let mut epoch: u64 = 0;
284
285 loop {
286 if shutdown.load(Ordering::Acquire) {
287 break;
288 }
289
290 if let Some(ref notifier) = notify {
292 tokio::select! {
293 biased;
294 () = notifier.notified() => {}
295 () = tokio::time::sleep(fallback_poll_interval) => {}
296 }
297 } else {
298 tokio::time::sleep(fallback_poll_interval).await;
299 }
300
301 if shutdown.load(Ordering::Acquire) {
302 break;
303 }
304
305 let poll_start = std::time::Instant::now();
307 match connector.poll_batch(max_poll_records).await {
308 Ok(Some(batch)) => {
309 let num_rows = batch.records.num_rows();
310 metrics.batches.fetch_add(1, Ordering::Relaxed);
311 #[allow(clippy::cast_possible_truncation)]
312 metrics
313 .records
314 .fetch_add(num_rows as u64, Ordering::Relaxed);
315 #[allow(clippy::cast_possible_truncation)]
316 metrics
317 .last_poll_ns
318 .store(poll_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
319
320 let timestamp = extract_timestamp(&batch.records);
322 let event = Event::new(timestamp, batch.records);
323
324 let mut msg = CoreMessage::Event { source_idx, event };
326 let mut backoff = 0u32;
327 loop {
328 match target_inbox.push(msg) {
329 Ok(()) => {
330 core_thread.unpark();
332 break;
333 }
334 Err(returned) => {
335 if shutdown.load(Ordering::Acquire) {
336 break;
337 }
338 msg = returned;
339 backoff += 1;
340 match backoff {
341 0..=63 => std::hint::spin_loop(),
342 64..=255 => std::thread::yield_now(),
343 _ => std::thread::park_timeout(
344 std::time::Duration::from_micros(100),
345 ),
346 }
347 }
348 }
349 }
350 }
351 Ok(None) => {
352 #[allow(clippy::cast_possible_truncation)]
354 metrics
355 .last_poll_ns
356 .store(poll_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
357 }
358 Err(e) => {
359 metrics.errors.fetch_add(1, Ordering::Relaxed);
360 tracing::warn!("Source '{name}': poll error: {e}");
361 }
362 }
363
364 let _ = cp_tx.send(connector.checkpoint());
367
368 if let Some(barrier) = barrier_handle.poll(epoch) {
370 epoch += 1;
371 let _ = cp_tx.send(connector.checkpoint());
373 let mut bmsg = CoreMessage::Barrier {
377 source_idx,
378 barrier,
379 };
380 let mut barrier_backoff = 0u32;
381 loop {
382 match target_inbox.push(bmsg) {
383 Ok(()) => {
384 core_thread.unpark();
385 break;
386 }
387 Err(returned) => {
388 if shutdown.load(Ordering::Acquire) {
389 break;
390 }
391 bmsg = returned;
392 barrier_backoff += 1;
393 match barrier_backoff {
394 0..=63 => std::hint::spin_loop(),
395 64..=255 => std::thread::yield_now(),
396 _ => {
397 std::thread::park_timeout(std::time::Duration::from_micros(
398 100,
399 ));
400 }
401 }
402 }
403 }
404 }
405 }
406 }
407
408 if let Err(e) = connector.close().await {
410 tracing::warn!("Source '{name}': close error: {e}");
411 }
412
413 Some(connector)
414 })
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use arrow_array::{Int64Array, TimestampMillisecondArray};
421 use arrow_schema::{DataType, Field, Schema, TimeUnit};
422 use std::sync::Arc;
423
424 #[test]
425 fn test_extract_timestamp_event_time() {
426 let schema = Schema::new(vec![Field::new("event_time", DataType::Int64, false)]);
427 let batch = RecordBatch::try_new(
428 Arc::new(schema),
429 vec![Arc::new(Int64Array::from(vec![42_000]))],
430 )
431 .unwrap();
432 assert_eq!(extract_timestamp(&batch), 42_000);
433 }
434
435 #[test]
436 fn test_extract_timestamp_column() {
437 let schema = Schema::new(vec![Field::new("timestamp", DataType::Int64, false)]);
438 let batch = RecordBatch::try_new(
439 Arc::new(schema),
440 vec![Arc::new(Int64Array::from(vec![99_000]))],
441 )
442 .unwrap();
443 assert_eq!(extract_timestamp(&batch), 99_000);
444 }
445
446 #[test]
447 fn test_extract_timestamp_millis() {
448 let schema = Schema::new(vec![Field::new(
449 "event_time",
450 DataType::Timestamp(TimeUnit::Millisecond, None),
451 false,
452 )]);
453 let batch = RecordBatch::try_new(
454 Arc::new(schema),
455 vec![Arc::new(TimestampMillisecondArray::from(vec![123_456]))],
456 )
457 .unwrap();
458 assert_eq!(extract_timestamp(&batch), 123_456);
459 }
460
461 #[test]
462 fn test_extract_timestamp_fallback() {
463 let schema = Schema::new(vec![Field::new("value", DataType::Int64, false)]);
464 let batch =
465 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int64Array::from(vec![1]))])
466 .unwrap();
467 let ts = extract_timestamp(&batch);
468 #[allow(clippy::cast_possible_truncation)]
470 let now = std::time::SystemTime::now()
471 .duration_since(std::time::UNIX_EPOCH)
472 .unwrap()
473 .as_millis() as i64;
474 assert!((ts - now).abs() < 1000);
475 }
476
477 #[test]
478 fn test_source_io_metrics_default() {
479 let metrics = SourceIoMetrics::default();
480 assert_eq!(metrics.batches.load(Ordering::Relaxed), 0);
481 assert_eq!(metrics.records.load(Ordering::Relaxed), 0);
482 assert_eq!(metrics.errors.load(Ordering::Relaxed), 0);
483 }
484}