Skip to main content

laminar_db/pipeline/
source_adapter.rs

1//! Source I/O thread adapter for thread-per-core mode.
2//!
3//! Bridges an async [`SourceConnector`] to a core thread's SPSC inbox.
4//! Each source runs on a dedicated `std::thread` with a single-threaded
5//! tokio runtime, pushing [`CoreMessage::Event`] into the target core's
6//! inbox after converting `SourceBatch` → [`Event`].
7#![allow(clippy::disallowed_types)] // cold path
8
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14use arrow_array::RecordBatch;
15use laminar_connectors::checkpoint::SourceCheckpoint;
16use laminar_connectors::config::ConnectorConfig;
17use laminar_connectors::connector::{DeliveryGuarantee, SourceConnector};
18use laminar_core::checkpoint::{BarrierPollHandle, CheckpointBarrierInjector};
19use laminar_core::operator::Event;
20use laminar_core::tpc::{CoreMessage, SpscQueue};
21
22/// Per-source metrics (lock-free atomic reads from any thread).
23#[derive(Debug)]
24pub struct SourceIoMetrics {
25    /// Total batches received from the connector.
26    pub batches: AtomicU64,
27    /// Total records received.
28    pub records: AtomicU64,
29    /// Total errors during poll.
30    pub errors: AtomicU64,
31    /// Last poll duration in nanoseconds.
32    pub last_poll_ns: AtomicU64,
33}
34
35impl Default for SourceIoMetrics {
36    fn default() -> Self {
37        Self {
38            batches: AtomicU64::new(0),
39            records: AtomicU64::new(0),
40            errors: AtomicU64::new(0),
41            last_poll_ns: AtomicU64::new(0),
42        }
43    }
44}
45
46/// Bridges an async `SourceConnector` to a core thread's SPSC inbox.
47///
48/// Runs a dedicated `std::thread` with a single-threaded tokio runtime.
49/// Polls the connector and pushes `CoreMessage::Event` into the target
50/// core's inbox. Barrier detection is handled inline after each poll.
51pub struct SourceIoThread {
52    thread: Option<JoinHandle<Option<Box<dyn SourceConnector>>>>,
53    shutdown: Arc<AtomicBool>,
54    /// Set to `true` once the source has successfully opened and restored.
55    /// Remains `false` if open/restore fails (thread exits early).
56    started: Arc<AtomicBool>,
57    /// Barrier injector for this source (coordinator uses this to trigger barriers).
58    pub injector: CheckpointBarrierInjector,
59    /// Lock-free metrics.
60    pub metrics: Arc<SourceIoMetrics>,
61    /// Watch receiver for checkpoint snapshots captured at barrier points.
62    pub checkpoint_rx: tokio::sync::watch::Receiver<SourceCheckpoint>,
63}
64
65impl SourceIoThread {
66    /// Spawn a new source I/O thread.
67    ///
68    /// The thread opens the connector, polls for batches, and pushes events
69    /// into the target core's inbox. Barrier detection happens inline.
70    ///
71    /// The `core_thread` handle is used to `unpark()` the core after each
72    /// inbox push, waking it from `park_timeout()` sleep.
73    ///
74    /// # Errors
75    ///
76    /// Returns `std::io::Error` if the OS thread cannot be spawned.
77    #[allow(clippy::too_many_arguments)]
78    pub fn spawn(
79        source_idx: usize,
80        name: String,
81        connector: Box<dyn SourceConnector>,
82        config: ConnectorConfig,
83        target_inbox: Arc<SpscQueue<CoreMessage>>,
84        max_poll_records: usize,
85        fallback_poll_interval: Duration,
86        core_thread: thread::Thread,
87        restore_checkpoint: Option<SourceCheckpoint>,
88        delivery_guarantee: DeliveryGuarantee,
89    ) -> std::io::Result<Self> {
90        let shutdown = Arc::new(AtomicBool::new(false));
91        let started = Arc::new(AtomicBool::new(false));
92        let injector = CheckpointBarrierInjector::new();
93        let barrier_handle = injector.handle();
94        let metrics = Arc::new(SourceIoMetrics::default());
95        let (cp_tx, cp_rx) = tokio::sync::watch::channel(SourceCheckpoint::new(0));
96
97        let shutdown_clone = Arc::clone(&shutdown);
98        let started_clone = Arc::clone(&started);
99        let metrics_clone = Arc::clone(&metrics);
100
101        let thread = thread::Builder::new()
102            .name(format!("laminar-src-{name}"))
103            .spawn(move || {
104                source_io_main(
105                    source_idx,
106                    name,
107                    connector,
108                    config,
109                    target_inbox,
110                    max_poll_records,
111                    fallback_poll_interval,
112                    shutdown_clone,
113                    barrier_handle,
114                    metrics_clone,
115                    cp_tx,
116                    core_thread,
117                    restore_checkpoint,
118                    delivery_guarantee,
119                    started_clone,
120                )
121            })?;
122
123        Ok(Self {
124            thread: Some(thread),
125            shutdown,
126            started,
127            injector,
128            metrics,
129            checkpoint_rx: cp_rx,
130        })
131    }
132
133    /// Returns true if the source successfully opened and restored.
134    #[must_use]
135    pub fn has_started(&self) -> bool {
136        self.started.load(Ordering::Acquire)
137    }
138
139    /// Returns true if the source thread has exited without starting.
140    /// This indicates a fatal startup failure (open or restore failed).
141    /// Returns false if the thread is still running or started successfully.
142    #[must_use]
143    pub fn has_failed(&self) -> bool {
144        !self.has_started()
145            && self
146                .thread
147                .as_ref()
148                .is_some_and(std::thread::JoinHandle::is_finished)
149    }
150
151    /// Signal shutdown and join the thread, returning the connector for cleanup.
152    pub fn shutdown_and_join(&mut self) -> Option<Box<dyn SourceConnector>> {
153        self.shutdown.store(true, Ordering::Release);
154        self.thread.take().and_then(|h| h.join().ok()).flatten()
155    }
156}
157
158impl Drop for SourceIoThread {
159    fn drop(&mut self) {
160        self.shutdown.store(true, Ordering::Release);
161        if let Some(handle) = self.thread.take() {
162            let _ = handle.join();
163        }
164    }
165}
166
167/// Extract the maximum timestamp from a `RecordBatch`.
168///
169/// Scans for a column named `event_time` or `timestamp` (i64 or
170/// `TimestampMillisecond`) and returns the MAX value across all rows.
171/// Using MAX ensures watermark correctness for out-of-order batches.
172/// If not found, falls back to wall-clock time.
173fn extract_timestamp(batch: &RecordBatch) -> i64 {
174    use arrow::compute::kernels::aggregate::max as arrow_max;
175    use arrow_array::Array;
176
177    for name in &["event_time", "timestamp"] {
178        if let Ok(col_idx) = batch.schema().index_of(name) {
179            let col = batch.column(col_idx);
180            // Try i64 array — use max across all rows
181            if let Some(arr) = col.as_any().downcast_ref::<arrow_array::Int64Array>() {
182                if !arr.is_empty() {
183                    if let Some(max_val) = arrow_max(arr) {
184                        return max_val;
185                    }
186                }
187            }
188            // Try TimestampMillisecond array — use max across all rows
189            if let Some(arr) = col
190                .as_any()
191                .downcast_ref::<arrow_array::TimestampMillisecondArray>()
192            {
193                if !arr.is_empty() {
194                    if let Some(max_val) = arrow_max(arr) {
195                        return max_val;
196                    }
197                }
198            }
199        }
200    }
201
202    // Fallback: wall-clock time in milliseconds
203    #[allow(clippy::cast_possible_truncation)]
204    std::time::SystemTime::now()
205        .duration_since(std::time::UNIX_EPOCH)
206        .map_or(0, |d| d.as_millis() as i64)
207}
208
209/// Main loop for the source I/O thread.
210#[allow(
211    clippy::too_many_arguments,
212    clippy::needless_pass_by_value,
213    clippy::too_many_lines
214)]
215fn source_io_main(
216    source_idx: usize,
217    name: String,
218    mut connector: Box<dyn SourceConnector>,
219    config: ConnectorConfig,
220    target_inbox: Arc<SpscQueue<CoreMessage>>,
221    max_poll_records: usize,
222    fallback_poll_interval: Duration,
223    shutdown: Arc<AtomicBool>,
224    barrier_handle: BarrierPollHandle,
225    metrics: Arc<SourceIoMetrics>,
226    cp_tx: tokio::sync::watch::Sender<SourceCheckpoint>,
227    core_thread: thread::Thread,
228    restore_checkpoint: Option<SourceCheckpoint>,
229    delivery_guarantee: DeliveryGuarantee,
230    started: Arc<AtomicBool>,
231) -> Option<Box<dyn SourceConnector>> {
232    let rt = match tokio::runtime::Builder::new_current_thread()
233        .enable_all()
234        .build()
235    {
236        Ok(rt) => rt,
237        Err(e) => {
238            tracing::error!("Source '{name}': failed to create tokio runtime: {e}");
239            return Some(connector);
240        }
241    };
242
243    rt.block_on(async {
244        // Open the connector
245        if let Err(e) = connector.open(&config).await {
246            tracing::error!("Source '{name}': failed to open connector: {e}");
247            return None;
248        }
249
250        // Restore checkpoint offsets AFTER open but BEFORE first poll.
251        // For Kafka, this calls consumer.assign() to seek to the checkpoint
252        // position. The consumer must still be owned by the source (not yet
253        // moved to a background reader task) for assign() to take effect.
254        if let Some(ref checkpoint) = restore_checkpoint {
255            match connector.restore(checkpoint).await {
256                Ok(()) => {
257                    tracing::info!(
258                        source = %name,
259                        "restored source from checkpoint"
260                    );
261                }
262                Err(e) => {
263                    if matches!(delivery_guarantee, DeliveryGuarantee::ExactlyOnce) {
264                        tracing::error!(
265                            source = %name,
266                            error = %e,
267                            "[LDB-5030] source checkpoint restore failed under exactly-once \
268                             — cannot guarantee delivery semantics"
269                        );
270                        return None;
271                    }
272                    tracing::warn!(
273                        source = %name,
274                        error = %e,
275                        "source checkpoint restore failed, starting from group offsets"
276                    );
277                }
278            }
279        }
280
281        started.store(true, Ordering::Release);
282        let notify = connector.data_ready_notify();
283        let mut epoch: u64 = 0;
284
285        loop {
286            if shutdown.load(Ordering::Acquire) {
287                break;
288            }
289
290            // Wait for data or timeout
291            if let Some(ref notifier) = notify {
292                tokio::select! {
293                    biased;
294                    () = notifier.notified() => {}
295                    () = tokio::time::sleep(fallback_poll_interval) => {}
296                }
297            } else {
298                tokio::time::sleep(fallback_poll_interval).await;
299            }
300
301            if shutdown.load(Ordering::Acquire) {
302                break;
303            }
304
305            // Poll for a batch
306            let poll_start = std::time::Instant::now();
307            match connector.poll_batch(max_poll_records).await {
308                Ok(Some(batch)) => {
309                    let num_rows = batch.records.num_rows();
310                    metrics.batches.fetch_add(1, Ordering::Relaxed);
311                    #[allow(clippy::cast_possible_truncation)]
312                    metrics
313                        .records
314                        .fetch_add(num_rows as u64, Ordering::Relaxed);
315                    #[allow(clippy::cast_possible_truncation)]
316                    metrics
317                        .last_poll_ns
318                        .store(poll_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
319
320                    // Convert SourceBatch → Event
321                    let timestamp = extract_timestamp(&batch.records);
322                    let event = Event::new(timestamp, batch.records);
323
324                    // Push to core inbox with tiered backoff under backpressure
325                    let mut msg = CoreMessage::Event { source_idx, event };
326                    let mut backoff = 0u32;
327                    loop {
328                        match target_inbox.push(msg) {
329                            Ok(()) => {
330                                // Wake the core thread from park_timeout() sleep
331                                core_thread.unpark();
332                                break;
333                            }
334                            Err(returned) => {
335                                if shutdown.load(Ordering::Acquire) {
336                                    break;
337                                }
338                                msg = returned;
339                                backoff += 1;
340                                match backoff {
341                                    0..=63 => std::hint::spin_loop(),
342                                    64..=255 => std::thread::yield_now(),
343                                    _ => std::thread::park_timeout(
344                                        std::time::Duration::from_micros(100),
345                                    ),
346                                }
347                            }
348                        }
349                    }
350                }
351                Ok(None) => {
352                    // No data available
353                    #[allow(clippy::cast_possible_truncation)]
354                    metrics
355                        .last_poll_ns
356                        .store(poll_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
357                }
358                Err(e) => {
359                    metrics.errors.fetch_add(1, Ordering::Relaxed);
360                    tracing::warn!("Source '{name}': poll error: {e}");
361                }
362            }
363
364            // Publish current offsets for timer-based checkpoints.
365            // watch::send is a lock-free atomic pointer swap (~5ns).
366            let _ = cp_tx.send(connector.checkpoint());
367
368            // Check for pending barrier
369            if let Some(barrier) = barrier_handle.poll(epoch) {
370                epoch += 1;
371                // Capture checkpoint at barrier point
372                let _ = cp_tx.send(connector.checkpoint());
373                // Forward barrier to core with same backpressure as events.
374                // A dropped barrier breaks Chandy-Lamport alignment — the
375                // coordinator would wait 30 s then abandon the checkpoint.
376                let mut bmsg = CoreMessage::Barrier {
377                    source_idx,
378                    barrier,
379                };
380                let mut barrier_backoff = 0u32;
381                loop {
382                    match target_inbox.push(bmsg) {
383                        Ok(()) => {
384                            core_thread.unpark();
385                            break;
386                        }
387                        Err(returned) => {
388                            if shutdown.load(Ordering::Acquire) {
389                                break;
390                            }
391                            bmsg = returned;
392                            barrier_backoff += 1;
393                            match barrier_backoff {
394                                0..=63 => std::hint::spin_loop(),
395                                64..=255 => std::thread::yield_now(),
396                                _ => {
397                                    std::thread::park_timeout(std::time::Duration::from_micros(
398                                        100,
399                                    ));
400                                }
401                            }
402                        }
403                    }
404                }
405            }
406        }
407
408        // Close connector
409        if let Err(e) = connector.close().await {
410            tracing::warn!("Source '{name}': close error: {e}");
411        }
412
413        Some(connector)
414    })
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use arrow_array::{Int64Array, TimestampMillisecondArray};
421    use arrow_schema::{DataType, Field, Schema, TimeUnit};
422    use std::sync::Arc;
423
424    #[test]
425    fn test_extract_timestamp_event_time() {
426        let schema = Schema::new(vec![Field::new("event_time", DataType::Int64, false)]);
427        let batch = RecordBatch::try_new(
428            Arc::new(schema),
429            vec![Arc::new(Int64Array::from(vec![42_000]))],
430        )
431        .unwrap();
432        assert_eq!(extract_timestamp(&batch), 42_000);
433    }
434
435    #[test]
436    fn test_extract_timestamp_column() {
437        let schema = Schema::new(vec![Field::new("timestamp", DataType::Int64, false)]);
438        let batch = RecordBatch::try_new(
439            Arc::new(schema),
440            vec![Arc::new(Int64Array::from(vec![99_000]))],
441        )
442        .unwrap();
443        assert_eq!(extract_timestamp(&batch), 99_000);
444    }
445
446    #[test]
447    fn test_extract_timestamp_millis() {
448        let schema = Schema::new(vec![Field::new(
449            "event_time",
450            DataType::Timestamp(TimeUnit::Millisecond, None),
451            false,
452        )]);
453        let batch = RecordBatch::try_new(
454            Arc::new(schema),
455            vec![Arc::new(TimestampMillisecondArray::from(vec![123_456]))],
456        )
457        .unwrap();
458        assert_eq!(extract_timestamp(&batch), 123_456);
459    }
460
461    #[test]
462    fn test_extract_timestamp_fallback() {
463        let schema = Schema::new(vec![Field::new("value", DataType::Int64, false)]);
464        let batch =
465            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int64Array::from(vec![1]))])
466                .unwrap();
467        let ts = extract_timestamp(&batch);
468        // Should be approximately current time in millis
469        #[allow(clippy::cast_possible_truncation)]
470        let now = std::time::SystemTime::now()
471            .duration_since(std::time::UNIX_EPOCH)
472            .unwrap()
473            .as_millis() as i64;
474        assert!((ts - now).abs() < 1000);
475    }
476
477    #[test]
478    fn test_source_io_metrics_default() {
479        let metrics = SourceIoMetrics::default();
480        assert_eq!(metrics.batches.load(Ordering::Relaxed), 0);
481        assert_eq!(metrics.records.load(Ordering::Relaxed), 0);
482        assert_eq!(metrics.errors.load(Ordering::Relaxed), 0);
483    }
484}