Skip to main content

laminar_db/pipeline/
tpc_coordinator.rs

1//! TPC-mode pipeline coordinator.
2//!
3//! Runs as a tokio task. Drains core outboxes from [`TpcRuntime`], runs SQL
4//! cycles via [`PipelineCallback`], and handles checkpoint barriers.
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_core::checkpoint::CheckpointBarrier;
12use laminar_core::operator::Output;
13use laminar_core::tpc::TaggedOutput;
14use laminar_core::tpc::TpcConfig;
15use rustc_hash::{FxHashMap, FxHashSet};
16
17use super::callback::{PipelineCallback, SourceRegistration};
18use super::config::PipelineConfig;
19use super::tpc_runtime::TpcRuntime;
20use crate::error::DbError;
21
22/// Tracks in-flight checkpoint barrier alignment.
23struct PendingBarrier {
24    checkpoint_id: u64,
25    sources_total: usize,
26    sources_aligned: FxHashSet<usize>,
27    source_checkpoints: FxHashMap<String, SourceCheckpoint>,
28    started_at: Instant,
29    /// Whether this barrier is currently active (tracking alignment).
30    active: bool,
31}
32
33impl PendingBarrier {
34    fn new() -> Self {
35        Self {
36            checkpoint_id: 0,
37            sources_total: 0,
38            sources_aligned: FxHashSet::default(),
39            source_checkpoints: FxHashMap::default(),
40            started_at: Instant::now(),
41            active: false,
42        }
43    }
44
45    fn reset(&mut self, checkpoint_id: u64, sources_total: usize) {
46        self.checkpoint_id = checkpoint_id;
47        self.sources_total = sources_total;
48        self.sources_aligned.clear();
49        self.source_checkpoints.clear();
50        self.started_at = Instant::now();
51        self.active = true;
52    }
53}
54
55/// Fallback timeout for idle periods. Ensures `maybe_inject_checkpoint`
56/// is called even when no data flows.
57const IDLE_FALLBACK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
58
59/// Thread-per-core pipeline coordinator.
60///
61/// Drains core outboxes (SPSC queues) from CPU-pinned core threads,
62/// runs SQL cycles via [`PipelineCallback`], and handles checkpoint barriers.
63pub struct TpcPipelineCoordinator {
64    config: PipelineConfig,
65    runtime: TpcRuntime,
66    shutdown: Arc<tokio::sync::Notify>,
67    /// Lock-free flag set by core threads when outputs are available.
68    has_new_data: Arc<std::sync::atomic::AtomicBool>,
69    /// Signaled on the false→true transition of `has_new_data`.
70    data_notify: Arc<tokio::sync::Notify>,
71    /// Pre-built source names indexed by `source_idx` (avoids clone per event).
72    source_name_cache: Vec<String>,
73    /// Pre-allocated drain buffer (reused each cycle, zero alloc).
74    drain_buffer: Vec<TaggedOutput>,
75    /// Pre-allocated source batches buffer (cleared per cycle, not dropped).
76    source_batches_buf: FxHashMap<String, Vec<RecordBatch>>,
77    /// Pre-allocated barriers buffer (cleared per cycle, not dropped).
78    barriers_buf: Vec<(usize, CheckpointBarrier)>,
79    /// Pre-allocated barrier alignment tracking (reused across checkpoints).
80    pending_barrier: PendingBarrier,
81    /// Counter for late events (arrived after watermark).
82    late_events: u64,
83    /// Next checkpoint ID.
84    next_checkpoint_id: u64,
85    /// Last checkpoint time.
86    last_checkpoint: Instant,
87    /// Consecutive SQL cycle errors (reset on success).
88    consecutive_sql_errors: u32,
89    /// Source-initiated checkpoint request flags (e.g., Kafka rebalance).
90    /// Polled each cycle; when any flag is set, a forced checkpoint is triggered.
91    checkpoint_request_flags: Vec<Arc<std::sync::atomic::AtomicBool>>,
92}
93
94impl TpcPipelineCoordinator {
95    /// Create a new TPC pipeline coordinator.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the TPC runtime cannot be initialized.
100    pub fn new(
101        sources: Vec<SourceRegistration>,
102        config: PipelineConfig,
103        tpc_config: &TpcConfig,
104        shutdown: Arc<tokio::sync::Notify>,
105    ) -> Result<Self, DbError> {
106        let mut runtime =
107            TpcRuntime::new(tpc_config).map_err(|e| DbError::Config(e.to_string()))?;
108
109        // Validate delivery guarantee constraints before spawning threads.
110        if config.delivery_guarantee
111            == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
112        {
113            for src in &sources {
114                if !src.supports_replay {
115                    return Err(DbError::Config(format!(
116                        "[LDB-5031] exactly-once requires source '{}' to support replay, \
117                         but supports_replay=false",
118                        src.name
119                    )));
120                }
121            }
122            if config.checkpoint_interval.is_none() {
123                return Err(DbError::Config(
124                    "[LDB-5032] exactly-once requires checkpointing to be enabled \
125                     (checkpoint_interval must be set)"
126                        .into(),
127                ));
128            }
129        }
130
131        // Capture checkpoint_requested flags before connectors are moved.
132        let mut checkpoint_request_flags = Vec::new();
133        for src in &sources {
134            if let Some(flag) = src.connector.checkpoint_requested() {
135                checkpoint_request_flags.push(flag);
136            }
137        }
138
139        for (idx, src) in sources.into_iter().enumerate() {
140            runtime
141                .attach_source(
142                    idx,
143                    src.name,
144                    src.connector,
145                    src.config,
146                    &config,
147                    src.restore_checkpoint,
148                )
149                .map_err(|e| DbError::Config(format!("failed to spawn source thread: {e}")))?;
150        }
151
152        let source_name_cache: Vec<String> =
153            runtime.source_names().iter().map(String::clone).collect();
154        let (has_new_data, data_notify) = runtime.output_signal();
155
156        Ok(Self {
157            config,
158            runtime,
159            shutdown,
160            has_new_data,
161            data_notify,
162            source_name_cache,
163            drain_buffer: Vec::with_capacity(4096),
164            source_batches_buf: FxHashMap::default(),
165            barriers_buf: Vec::new(),
166            pending_barrier: PendingBarrier::new(),
167            late_events: 0,
168            next_checkpoint_id: 1,
169            last_checkpoint: Instant::now(),
170            consecutive_sql_errors: 0,
171            checkpoint_request_flags,
172        })
173    }
174
175    /// Run the coordinator loop.
176    ///
177    /// Drains core outboxes, converts tagged outputs to source batches,
178    /// executes SQL cycles via the callback, and handles checkpoints.
179    #[allow(clippy::too_many_lines)]
180    pub async fn run(mut self, mut callback: Box<dyn PipelineCallback>) {
181        let batch_window = self.config.batch_window;
182        let barrier_timeout = self.config.barrier_alignment_timeout;
183        let startup_time = Instant::now();
184        let mut source_health_checked = false;
185
186        loop {
187            // Phase 1: Wait for data, shutdown, or fallback timeout.
188            // Core threads call notify_one() after pushing to the outbox,
189            // waking the coordinator immediately instead of polling on a timer.
190            tokio::select! {
191                biased;
192                () = self.shutdown.notified() => break,
193                () = self.data_notify.notified() => {
194                    // Data available. If batch_window > 0, coalesce: sleep
195                    // briefly to let more data accumulate before executing
196                    // the SQL cycle. This amortizes DataFusion overhead for
197                    // high-throughput sources (e.g., Kafka at 500K events/sec).
198                    if !batch_window.is_zero() {
199                        tokio::time::sleep(batch_window).await;
200                    }
201                }
202                // Fallback: wake periodically for checkpoint injection
203                // even when no data flows.
204                () = tokio::time::sleep(IDLE_FALLBACK_TIMEOUT) => {}
205            }
206
207            // Clear the flag so the next core output triggers a new wake.
208            self.has_new_data
209                .store(false, std::sync::atomic::Ordering::Release);
210
211            // Phase 2: Drain all core outboxes
212            self.drain_buffer.clear();
213            self.runtime.poll_all_outputs(&mut self.drain_buffer);
214            if self.drain_buffer.is_empty() {
215                // Deferred source health check — run once after sources have
216                // had time to open and restore (~1s after coordinator start).
217                if !source_health_checked
218                    && startup_time.elapsed() > std::time::Duration::from_secs(1)
219                {
220                    source_health_checked = true;
221                    let failed = self.runtime.failed_sources();
222                    if !failed.is_empty() {
223                        tracing::error!(
224                            sources = ?failed,
225                            "[LDB-5033] source threads failed to start"
226                        );
227                        if self.config.delivery_guarantee
228                            == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
229                        {
230                            tracing::error!("aborting: exactly-once requires all sources to start");
231                            break;
232                        }
233                    }
234                }
235
236                self.maybe_inject_checkpoint(&mut *callback).await;
237                callback.poll_tables().await;
238                // Check barrier timeout even when idle — otherwise an
239                // in-flight barrier from a stalled source never expires.
240                if self.pending_barrier.active
241                    && self.pending_barrier.started_at.elapsed() > barrier_timeout
242                {
243                    let missing: Vec<&str> = self
244                        .source_name_cache
245                        .iter()
246                        .enumerate()
247                        .filter(|(i, _)| !self.pending_barrier.sources_aligned.contains(i))
248                        .map(|(_, name)| name.as_str())
249                        .collect();
250                    tracing::warn!(
251                        checkpoint_id = self.pending_barrier.checkpoint_id,
252                        aligned = self.pending_barrier.sources_aligned.len(),
253                        total = self.pending_barrier.sources_total,
254                        missing_sources = ?missing,
255                        "Barrier alignment timeout — cancelling checkpoint"
256                    );
257                    self.pending_barrier.active = false;
258                }
259                continue;
260            }
261
262            // Phase 3: Convert TaggedOutput → FxHashMap<String, Vec<RecordBatch>>
263            // Collect barriers separately to avoid borrow conflicts.
264            // Reuse pre-allocated buffers (cleared, not dropped).
265            self.source_batches_buf.clear();
266            self.barriers_buf.clear();
267
268            let mut cycle_events: u64 = 0;
269            let mut cycle_batches: u64 = 0;
270            let cycle_start = Instant::now();
271
272            for tagged in self.drain_buffer.drain(..) {
273                match tagged.output {
274                    Output::Event(event) => {
275                        if let Some(name) = self.source_name_cache.get(tagged.source_idx) {
276                            callback.extract_watermark(name, &event.data);
277                            cycle_events += event.data.num_rows() as u64;
278                            if let Some(filtered) = callback.filter_late_rows(name, &event.data) {
279                                if let Some(vec) = self.source_batches_buf.get_mut(name.as_str()) {
280                                    vec.push(filtered);
281                                } else {
282                                    self.source_batches_buf.insert(name.clone(), vec![filtered]);
283                                }
284                                cycle_batches += 1;
285                            }
286                        }
287                    }
288                    Output::Barrier(barrier) => {
289                        self.barriers_buf.push((tagged.source_idx, barrier));
290                    }
291                    Output::Watermark(_ts) => {
292                        // Watermark progression is already tracked via
293                        // extract_watermark() on each Event batch
294                    }
295                    Output::CheckpointComplete(data) => {
296                        tracing::debug!(
297                            checkpoint_id = data.checkpoint_id,
298                            operators = data.operator_states.len(),
299                            "core checkpoint complete (operator states not yet persisted — \
300                             cores currently run no operators)"
301                        );
302                    }
303                    Output::LateEvent(_event) => {
304                        self.late_events += 1;
305                        tracing::trace!(total_late = self.late_events, "late event past watermark");
306                    }
307                    Output::SideOutput(_) | Output::Changelog(_) => {
308                        tracing::warn!(
309                            source_idx = tagged.source_idx,
310                            "SideOutput/Changelog leaked past DAG boundary — dropped"
311                        );
312                    }
313                }
314            }
315
316            // Process barriers after drain is complete.
317            // Swap with empty to avoid borrow conflict with self.handle_barrier.
318            let mut barriers = std::mem::take(&mut self.barriers_buf);
319            for (source_idx, barrier) in barriers.drain(..) {
320                self.handle_barrier(source_idx, &barrier, &mut *callback)
321                    .await;
322            }
323            // Restore capacity for next cycle.
324            self.barriers_buf = barriers;
325
326            // Phase 4: SQL cycle
327            if !self.source_batches_buf.is_empty() {
328                let wm = callback.current_watermark();
329                match callback.execute_cycle(&self.source_batches_buf, wm).await {
330                    Ok(results) => {
331                        self.consecutive_sql_errors = 0;
332                        callback.push_to_streams(&results);
333                        callback.write_to_sinks(&results).await;
334                    }
335                    Err(e) => {
336                        self.consecutive_sql_errors += 1;
337                        tracing::warn!(
338                            error = %e,
339                            consecutive = self.consecutive_sql_errors,
340                            "[LDB-3020] SQL cycle error"
341                        );
342                        if self.consecutive_sql_errors >= 100 {
343                            tracing::error!(
344                                "[LDB-3021] {} consecutive SQL errors — shutting down pipeline",
345                                self.consecutive_sql_errors
346                            );
347                            break;
348                        }
349                    }
350                }
351                #[allow(clippy::cast_possible_truncation)]
352                // Cycle duration will never exceed u64::MAX nanoseconds (~584 years)
353                let elapsed_ns = cycle_start.elapsed().as_nanos() as u64;
354                callback.record_cycle(cycle_events, cycle_batches, elapsed_ns);
355            }
356
357            // Phase 5: Periodic checkpoint injection
358            self.maybe_inject_checkpoint(&mut *callback).await;
359
360            // Phase 5b: Poll reference table CDC updates
361            callback.poll_tables().await;
362
363            // Phase 6: Barrier timeout check
364            if self.pending_barrier.active
365                && self.pending_barrier.started_at.elapsed() > barrier_timeout
366            {
367                let missing: Vec<&str> = self
368                    .source_name_cache
369                    .iter()
370                    .enumerate()
371                    .filter(|(i, _)| !self.pending_barrier.sources_aligned.contains(i))
372                    .map(|(_, name)| name.as_str())
373                    .collect();
374                tracing::warn!(
375                    checkpoint_id = self.pending_barrier.checkpoint_id,
376                    aligned = self.pending_barrier.sources_aligned.len(),
377                    total = self.pending_barrier.sources_total,
378                    missing_sources = ?missing,
379                    "Barrier alignment timeout — cancelling checkpoint"
380                );
381                self.pending_barrier.active = false;
382            }
383        }
384
385        // Shutdown: drain remaining outputs
386        self.drain_buffer.clear();
387        self.runtime.poll_all_outputs(&mut self.drain_buffer);
388
389        // Close sources
390        let connectors = self.runtime.shutdown();
391        for (_name, mut connector) in connectors {
392            if let Err(e) = connector.close().await {
393                tracing::warn!(error = %e, "Error closing connector on shutdown");
394            }
395        }
396    }
397
398    /// Handle a barrier from a source. Track alignment and trigger checkpoint
399    /// when all sources have aligned.
400    async fn handle_barrier(
401        &mut self,
402        source_idx: usize,
403        barrier: &CheckpointBarrier,
404        callback: &mut dyn PipelineCallback,
405    ) {
406        if !self.pending_barrier.active {
407            self.pending_barrier
408                .reset(barrier.checkpoint_id, self.runtime.num_sources());
409        }
410
411        if self.pending_barrier.checkpoint_id != barrier.checkpoint_id {
412            tracing::debug!(
413                expected = self.pending_barrier.checkpoint_id,
414                actual = barrier.checkpoint_id,
415                source_idx,
416                "ignoring barrier with mismatched checkpoint ID"
417            );
418            return;
419        }
420
421        self.pending_barrier.sources_aligned.insert(source_idx);
422
423        // Capture this source's checkpoint
424        if let Some(name) = self.source_name_cache.get(source_idx) {
425            let cp = self.runtime.source_checkpoint(source_idx);
426            self.pending_barrier
427                .source_checkpoints
428                .insert(name.clone(), cp);
429        }
430
431        // Check if all sources are aligned
432        if self.pending_barrier.sources_aligned.len() >= self.pending_barrier.sources_total {
433            // drain() preserves the map's allocated capacity for reuse.
434            let source_checkpoints: FxHashMap<String, SourceCheckpoint> =
435                self.pending_barrier.source_checkpoints.drain().collect();
436            self.pending_barrier.active = false;
437
438            let success = callback.checkpoint_with_barrier(source_checkpoints).await;
439            if !success {
440                if self.config.delivery_guarantee
441                    == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
442                {
443                    tracing::error!("[LDB-6011] barrier checkpoint failed under exactly-once");
444                } else {
445                    tracing::warn!("checkpoint with barrier failed");
446                }
447            }
448        }
449    }
450
451    /// Inject checkpoint barriers if the interval has elapsed or a source
452    /// has requested an immediate checkpoint (e.g., Kafka partition revocation).
453    async fn maybe_inject_checkpoint(&mut self, callback: &mut dyn PipelineCallback) {
454        if self.pending_barrier.active {
455            return; // Already waiting for alignment
456        }
457
458        // Check source-initiated checkpoint requests (e.g., Kafka rebalance).
459        let source_requested = self.checkpoint_request_flags.iter().any(|flag| {
460            flag.compare_exchange(
461                true,
462                false,
463                std::sync::atomic::Ordering::AcqRel,
464                std::sync::atomic::Ordering::Relaxed,
465            )
466            .is_ok()
467        });
468
469        let is_exactly_once = self.config.delivery_guarantee
470            == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce;
471
472        let Some(interval) = self.config.checkpoint_interval else {
473            if source_requested && !is_exactly_once {
474                // Manual-only mode but a source needs a checkpoint. Force one.
475                // Skipped under exactly-once — use barriers instead.
476                let offsets = self.runtime.snapshot_all_source_checkpoints();
477                let _ = callback.maybe_checkpoint(true, offsets).await;
478            }
479            return;
480        };
481
482        if !source_requested && self.last_checkpoint.elapsed() < interval {
483            return;
484        }
485
486        self.last_checkpoint = Instant::now();
487        let checkpoint_id = self.next_checkpoint_id;
488        self.next_checkpoint_id += 1;
489
490        // Inject barriers into all sources
491        for idx in 0..self.runtime.num_sources() {
492            self.runtime
493                .injector(idx)
494                .trigger(checkpoint_id, laminar_core::checkpoint::flags::NONE);
495        }
496
497        // Timer-based checkpoint alongside barriers (at-least-once only).
498        if !is_exactly_once {
499            let offsets = self.runtime.snapshot_all_source_checkpoints();
500            let _ = callback.maybe_checkpoint(false, offsets).await;
501        }
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_pending_barrier_creation_and_reset() {
511        let mut pending = PendingBarrier::new();
512        assert!(!pending.active);
513
514        pending.reset(1, 3);
515        assert!(pending.active);
516        assert_eq!(pending.checkpoint_id, 1);
517        assert_eq!(pending.sources_total, 3);
518        assert!(pending.sources_aligned.is_empty());
519
520        pending.sources_aligned.insert(0);
521        pending.reset(2, 5);
522        assert_eq!(pending.checkpoint_id, 2);
523        assert!(pending.sources_aligned.is_empty());
524    }
525}