Skip to main content

laminar_db/pipeline/
streaming_coordinator.rs

1//! Simplified pipeline coordinator.
2//!
3//! Sources push directly to the coordinator via `crossfire::mpsc`.
4//! The coordinator runs on a dedicated single-threaded tokio runtime
5//! (`laminar-compute` thread), isolating CPU-bound event processing
6//! from IO tasks (Kafka poll, S3 checkpoint writes, HTTP) on the main
7//! work-stealing runtime. SQL execution is delegated to [`PipelineCallback`].
8//!
9//! # Architecture
10//!
11//! ```text
12//! Source task (main tokio runtime)
13//!   │  connector.poll_batch().await
14//!   │
15//!   └──── MAsyncTx ────► StreamingCoordinator (dedicated compute thread)
16//!                              │  callback.execute_cycle()
17//!                              │  callback.write_to_sinks()
18//!                              ▼
19//!                            Sinks
20//! ```
21
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::RecordBatch;
27use crossfire::{mpsc, AsyncRx};
28use laminar_connectors::checkpoint::SourceCheckpoint;
29use laminar_connectors::connector::{DeliveryGuarantee, SourceBatch};
30use laminar_connectors::error::ConnectorError;
31use laminar_core::checkpoint::{CheckpointBarrier, CheckpointBarrierInjector};
32use rustc_hash::{FxHashMap, FxHashSet};
33
34use super::callback::{BarrierOutcome, PipelineCallback, SourceRegistration};
35use super::config::PipelineConfig;
36use crate::error::DbError;
37
38/// Single-consumer async receiver for source → coordinator batches.
39type SourceMsgRx = AsyncRx<mpsc::Array<SourceMsg>>;
40/// Single-consumer async receiver for live DDL control messages.
41type ControlMsgRx = AsyncRx<mpsc::Array<super::ControlMsg>>;
42
43/// Message sent from a source task to the coordinator.
44///
45/// Each variant carries the [`SourceCheckpoint`] captured at the point the
46/// message was produced.  Co-locating the offset with the data guarantees
47/// the coordinator never checkpoints an offset for data it has not yet
48/// processed (eliminates the offset-before-batch race).
49enum SourceMsg {
50    /// A batch of records from a source.
51    Batch {
52        source_idx: usize,
53        batch: RecordBatch,
54        /// Offset snapshot captured *after* `poll_batch` drained the reader
55        /// channel.  Committed to `committed_offsets` only when the batch
56        /// is actually processed (not when deferred to `post_barrier_buf`).
57        checkpoint: SourceCheckpoint,
58    },
59    /// Checkpoint barrier injected at the source.
60    Barrier {
61        source_idx: usize,
62        barrier: CheckpointBarrier,
63        /// Offset snapshot captured at barrier injection (consistent cut).
64        checkpoint: SourceCheckpoint,
65    },
66}
67
68/// Handle to a running source I/O task.
69struct SourceHandle {
70    name: Arc<str>,
71    shutdown: Arc<tokio::sync::Notify>,
72    join: tokio::task::JoinHandle<()>,
73    /// Injector for Chandy-Lamport checkpoint barriers.
74    barrier_injector: CheckpointBarrierInjector,
75    /// Latest committed `(epoch, persisted-checkpoint)`. The checkpoint is
76    /// the per-source `SourceCheckpoint` that was actually written to the
77    /// manifest for `epoch` — sources rebuild downstream offset state
78    /// (Kafka group offsets, NATS acks, etc.) from this exact value
79    /// rather than from `self.offsets`, which may have advanced past
80    /// the durability cut. Empty for timer-driven commits.
81    epoch_committed_tx: tokio::sync::watch::Sender<Option<(u64, SourceCheckpoint)>>,
82}
83
84/// `(epoch, per-source fan-out)` sent back when a background checkpoint completes.
85type CheckpointCompletion = (u64, rustc_hash::FxHashMap<String, SourceCheckpoint>);
86
87/// Simplified pipeline coordinator — single tokio task, no core threads.
88pub struct StreamingCoordinator {
89    config: PipelineConfig,
90    /// Receives all source messages (batches + barriers).
91    rx: SourceMsgRx,
92    /// Handles to source tasks (for shutdown + checkpoint queries).
93    source_handles: Vec<SourceHandle>,
94    /// Source name cache indexed by `source_idx`.
95    source_names: Vec<Arc<str>>,
96    /// Shutdown signal.
97    shutdown: Arc<tokio::sync::Notify>,
98    /// Pending barrier alignment.
99    pending_barrier: PendingBarrier,
100    /// Next checkpoint ID for barrier injection.
101    next_checkpoint_id: u64,
102    /// Last checkpoint time.
103    last_checkpoint: Instant,
104    /// Source-initiated checkpoint request flags.
105    checkpoint_request_flags: Vec<Arc<AtomicBool>>,
106    /// Pre-allocated source batches buffer (cleared per cycle).
107    source_batches_buf: FxHashMap<Arc<str>, Vec<RecordBatch>>,
108    /// Batches received after a barrier from the same source in the same
109    /// drain cycle. These belong to the NEXT checkpoint epoch and must not
110    /// be included in the current checkpoint state. Bounded in practice by
111    /// `channel_capacity` (max messages available per drain cycle).
112    post_barrier_buf: Vec<SourceMsg>,
113    pending_watermark_batches: Vec<(Arc<str>, RecordBatch)>,
114    /// Source indices that have delivered a barrier during the current drain
115    /// cycle. Any subsequent batch from these sources goes to
116    /// `post_barrier_buf`.
117    barrier_seen: FxHashSet<usize>,
118    /// Latest durably-processed source offset per source index.  Merged
119    /// from `pending_offsets` only after a successful `execute_cycle`.
120    committed_offsets: Vec<Option<SourceCheckpoint>>,
121    /// Offsets staged by `process_msg`.  Merged into `committed_offsets`
122    /// after a successful `execute_cycle`, discarded on failure.
123    pending_offsets: Vec<Option<SourceCheckpoint>>,
124    /// Control channel for live DDL (add/drop stream) from `LaminarDB`.
125    control_rx: ControlMsgRx,
126    checkpoint_complete_rx:
127        Option<crossfire::AsyncRx<crossfire::mpsc::Array<CheckpointCompletion>>>,
128    /// Epochs between admission and restorable (durable tails still
129    /// running). Shared with the callback; gated against
130    /// `max_in_flight_epochs`.
131    checkpoint_in_flight: Arc<AtomicU64>,
132    /// Barrier admission cap on `checkpoint_in_flight`. Exactly-once
133    /// pipelines are wired with 1.
134    max_in_flight_epochs: u64,
135    /// Captured-state bytes held by in-flight epochs (shared with the
136    /// callback).
137    staged_bytes: Arc<AtomicU64>,
138    /// Barrier admission cap on `staged_bytes`.
139    max_staged_bytes: u64,
140}
141
142/// Tracks in-flight checkpoint barrier alignment.
143struct PendingBarrier {
144    checkpoint_id: u64,
145    sources_total: usize,
146    sources_aligned: FxHashSet<usize>,
147    source_checkpoints: FxHashMap<String, SourceCheckpoint>,
148    started_at: Instant,
149    active: bool,
150}
151
152impl PendingBarrier {
153    fn new() -> Self {
154        Self {
155            checkpoint_id: 0,
156            sources_total: 0,
157            sources_aligned: FxHashSet::default(),
158            source_checkpoints: FxHashMap::default(),
159            started_at: Instant::now(),
160            active: false,
161        }
162    }
163
164    fn reset(&mut self, checkpoint_id: u64, sources_total: usize) {
165        self.checkpoint_id = checkpoint_id;
166        self.sources_total = sources_total;
167        self.sources_aligned.clear();
168        self.source_checkpoints.clear();
169        self.started_at = Instant::now();
170        self.active = true;
171    }
172}
173
174/// Fallback timeout for idle wake.
175const IDLE_TIMEOUT: Duration = Duration::from_millis(100);
176
177/// Throttled (~once/10s) WARN while barrier admission is paused at the
178/// staged-state cap — this check runs every coordinator tick, so an
179/// unthrottled warn would spam under a sustained upload backlog.
180fn warn_staged_cap_throttled(staged_bytes: u64, cap: u64) {
181    static THROTTLE: crate::log_throttle::LogThrottle =
182        crate::log_throttle::LogThrottle::every(Duration::from_secs(10));
183    if THROTTLE.allow() {
184        tracing::warn!(
185            staged_bytes,
186            cap,
187            "checkpoint admission paused: staged-state cap reached"
188        );
189    }
190}
191
192/// What woke a source task's select loop.
193enum SourceWake {
194    Shutdown,
195    EpochCommitted(u64, SourceCheckpoint),
196    Polled(Result<Option<SourceBatch>, ConnectorError>),
197}
198
199impl StreamingCoordinator {
200    /// Fan out a committed epoch to every source so they can ack. Each
201    /// source receives the per-source checkpoint that was persisted into
202    /// the manifest for `epoch`, or an empty checkpoint when no per-source
203    /// state was captured (timer-driven commits).
204    fn broadcast_epoch_committed(
205        &self,
206        epoch: u64,
207        per_source: &FxHashMap<String, SourceCheckpoint>,
208    ) {
209        for handle in &self.source_handles {
210            let cp = per_source
211                .get(handle.name.as_ref())
212                .cloned()
213                .unwrap_or_else(|| SourceCheckpoint::new(epoch));
214            let _ = handle.epoch_committed_tx.send(Some((epoch, cp)));
215        }
216    }
217
218    /// Create a new streaming coordinator.
219    ///
220    /// Spawns a tokio task for each source that polls the connector and
221    /// sends batches/barriers to the coordinator via mpsc.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if delivery guarantee constraints are violated
226    /// or if any source connector fails to open.
227    #[allow(clippy::too_many_lines)]
228    pub async fn new(
229        sources: Vec<SourceRegistration>,
230        config: PipelineConfig,
231        shutdown: Arc<tokio::sync::Notify>,
232        control_rx: ControlMsgRx,
233    ) -> Result<Self, DbError> {
234        // Validate delivery guarantee constraints.
235        if config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
236            for src in &sources {
237                if !src.supports_replay {
238                    return Err(DbError::Config(format!(
239                        "[LDB-5031] exactly-once requires source '{}' to support replay",
240                        src.name
241                    )));
242                }
243            }
244            if config.checkpoint_interval.is_none() {
245                return Err(DbError::Config(
246                    "[LDB-5032] exactly-once requires checkpointing to be enabled".into(),
247                ));
248            }
249        }
250
251        if config.channel_capacity == 0 {
252            return Err(DbError::Config(
253                "[LDB-0010] channel_capacity must be > 0".into(),
254            ));
255        }
256
257        // Channel for all source messages.
258        let (tx, rx) = mpsc::bounded_async::<SourceMsg>(config.channel_capacity);
259
260        let mut source_handles = Vec::with_capacity(sources.len());
261        let mut source_names = Vec::with_capacity(sources.len());
262        let mut checkpoint_request_flags = Vec::new();
263        let mut committed_offsets = Vec::with_capacity(sources.len());
264
265        for (idx, src) in sources.into_iter().enumerate() {
266            if let Some(flag) = src.connector.checkpoint_requested() {
267                checkpoint_request_flags.push(flag);
268            }
269
270            let task_shutdown = Arc::new(tokio::sync::Notify::new());
271            let task_shutdown_clone = Arc::clone(&task_shutdown);
272            let task_tx = tx.clone();
273            let max_poll = config.max_poll_records;
274            let poll_interval = config.fallback_poll_interval;
275            let src_name = src.name.clone();
276            let restore = src.restore_checkpoint.clone();
277            let mut connector = src.connector;
278            let connector_config = src.config;
279
280            // Open connector eagerly so startup fails fast on bad config.
281            connector
282                .open(&connector_config)
283                .await
284                .map_err(|e| DbError::Config(format!("source '{src_name}' open failed: {e}")))?;
285
286            // Restore checkpoint if available.
287            if let Some(ref cp) = restore {
288                if let Err(e) = connector.restore(cp).await {
289                    tracing::warn!(
290                        source = %src_name, error = %e,
291                        "source restore failed, starting from beginning"
292                    );
293                }
294            }
295
296            // Seed committed_offsets with the restore checkpoint so a
297            // shutdown before any data still checkpoints the restore position.
298            committed_offsets.push(src.restore_checkpoint);
299
300            // Barrier injection: coordinator triggers → source polls → sends SourceMsg::Barrier
301            let barrier_injector = CheckpointBarrierInjector::new();
302            let barrier_handle = barrier_injector.handle();
303
304            let (epoch_committed_tx, mut epoch_committed_rx) =
305                tokio::sync::watch::channel::<Option<(u64, SourceCheckpoint)>>(None);
306
307            let join = tokio::spawn(async move {
308                let mut epoch: u64 = 0;
309
310                // Ack a fresh commit before polling more — keeps
311                // max_ack_pending headroom for the broker.
312                loop {
313                    let wake = tokio::select! {
314                        biased;
315                        () = task_shutdown_clone.notified() => SourceWake::Shutdown,
316                        r = epoch_committed_rx.changed() => match r {
317                            Ok(()) => {
318                                let snapshot = epoch_committed_rx.borrow_and_update().clone();
319                                match snapshot {
320                                    Some((e, cp)) => SourceWake::EpochCommitted(e, cp),
321                                    None => continue,
322                                }
323                            },
324                            Err(_) => SourceWake::Shutdown,
325                        },
326                        r = connector.poll_batch(max_poll) => SourceWake::Polled(r),
327                    };
328
329                    let poll_result = match wake {
330                        SourceWake::Shutdown => break,
331                        SourceWake::EpochCommitted(e, cp) => {
332                            if let Err(err) = connector.notify_epoch_committed(e, &cp).await {
333                                tracing::warn!(
334                                    source = %src_name,
335                                    error = %err,
336                                    epoch = e,
337                                    "notify_epoch_committed failed",
338                                );
339                            }
340                            continue;
341                        }
342                        SourceWake::Polled(r) => r,
343                    };
344
345                    match poll_result {
346                        Ok(Some(batch)) => {
347                            // Offset travels with the batch — no separate
348                            // watch channel.  The coordinator commits the
349                            // offset only after processing the batch.
350                            let cp = connector.checkpoint();
351                            let msg = SourceMsg::Batch {
352                                source_idx: idx,
353                                batch: batch.records,
354                                checkpoint: cp,
355                            };
356                            if task_tx.send(msg).await.is_err() {
357                                break; // Coordinator dropped
358                            }
359                        }
360                        Ok(None) => {
361                            // No data — sleep briefly (cancellable).
362                            tokio::select! {
363                                biased;
364                                () = task_shutdown_clone.notified() => break,
365                                () = tokio::time::sleep(poll_interval) => {}
366                            }
367                        }
368                        Err(e) if !e.is_transient() => {
369                            tracing::error!(source = %src_name, error = %e, "terminal poll error");
370                            break;
371                        }
372                        Err(e) => {
373                            tracing::warn!(source = %src_name, error = %e, "poll error (retrying)");
374                            tokio::select! {
375                                biased;
376                                () = task_shutdown_clone.notified() => break,
377                                () = tokio::time::sleep(poll_interval) => {}
378                            }
379                        }
380                    }
381
382                    // Poll for pending checkpoint barrier.
383                    if let Some(barrier) = barrier_handle.poll(epoch) {
384                        epoch += 1;
385                        let cp = connector.checkpoint();
386                        let msg = SourceMsg::Barrier {
387                            source_idx: idx,
388                            barrier,
389                            checkpoint: cp,
390                        };
391                        if task_tx.send(msg).await.is_err() {
392                            break;
393                        }
394                    }
395                }
396
397                // Drain remaining data from the connector's internal
398                // buffer before closing — close() drops the buffer and
399                // anything not drained here is lost.
400                while let Ok(Some(batch)) = connector.poll_batch(max_poll).await {
401                    let cp = connector.checkpoint();
402                    let msg = SourceMsg::Batch {
403                        source_idx: idx,
404                        batch: batch.records,
405                        checkpoint: cp,
406                    };
407                    if task_tx.send(msg).await.is_err() {
408                        break;
409                    }
410                }
411
412                // Drain any post-shutdown EpochCommitted broadcasts before
413                // close — the coordinator's final checkpoint fires after
414                // signalling shutdown, and sources need to ack it so
415                // external state (Kafka group offsets, NATS msg acks) for
416                // the last durable epoch is advanced.  Loop until the
417                // sender is dropped (which happens when the coordinator
418                // joins this task).
419                while let Ok(()) = epoch_committed_rx.changed().await {
420                    let snapshot = epoch_committed_rx.borrow_and_update().clone();
421                    if let Some((e, cp)) = snapshot {
422                        if let Err(err) = connector.notify_epoch_committed(e, &cp).await {
423                            tracing::warn!(
424                                source = %src_name,
425                                error = %err,
426                                epoch = e,
427                                "notify_epoch_committed failed",
428                            );
429                        }
430                    }
431                }
432
433                if let Err(e) = connector.close().await {
434                    tracing::warn!(source = %src_name, error = %e, "source close error");
435                }
436            });
437
438            let arc_name: Arc<str> = Arc::from(src.name.as_str());
439            source_handles.push(SourceHandle {
440                name: Arc::clone(&arc_name),
441                shutdown: task_shutdown,
442                join,
443                barrier_injector,
444                epoch_committed_tx,
445            });
446            source_names.push(arc_name);
447        }
448
449        Ok(Self {
450            config,
451            rx,
452            source_handles,
453            source_names,
454            shutdown,
455            pending_barrier: PendingBarrier::new(),
456            next_checkpoint_id: 1,
457            last_checkpoint: Instant::now(),
458            checkpoint_request_flags,
459            source_batches_buf: FxHashMap::default(),
460            post_barrier_buf: Vec::new(),
461            pending_watermark_batches: Vec::new(),
462            barrier_seen: FxHashSet::default(),
463            pending_offsets: vec![None; committed_offsets.len()],
464            committed_offsets,
465            control_rx,
466            checkpoint_complete_rx: None,
467            checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
468            max_in_flight_epochs: 1,
469            staged_bytes: Arc::new(AtomicU64::new(0)),
470            max_staged_bytes: u64::MAX,
471        })
472    }
473
474    /// Share the callback's admission state so the coordinator admits a
475    /// new barrier only while in-flight epochs and staged bytes are
476    /// under their caps.
477    pub(crate) fn with_checkpoint_admission(
478        mut self,
479        in_flight: Arc<AtomicU64>,
480        max_in_flight_epochs: u64,
481        staged_bytes: Arc<AtomicU64>,
482        max_staged_bytes: u64,
483    ) -> Self {
484        self.checkpoint_in_flight = in_flight;
485        self.max_in_flight_epochs = max_in_flight_epochs.max(1);
486        self.staged_bytes = staged_bytes;
487        self.max_staged_bytes = max_staged_bytes;
488        self
489    }
490
491    pub(crate) fn with_checkpoint_complete_rx(
492        mut self,
493        rx: crossfire::AsyncRx<crossfire::mpsc::Array<CheckpointCompletion>>,
494    ) -> Self {
495        self.checkpoint_complete_rx = Some(rx);
496        self
497    }
498
499    /// Run the coordinator loop.
500    ///
501    /// Receives batches from sources via mpsc, executes SQL cycles via
502    /// the callback, and handles checkpoint barriers. Returns when
503    /// shutdown is signaled.
504    ///
505    /// Cycle priority ordering:
506    /// 1. Shutdown signal (biased select — checked first)
507    /// 2. Event drain + SQL execution (up to `MAX_DRAIN_PER_CYCLE` messages)
508    /// 3. Barrier alignment (after SQL so checkpoint state includes processed data)
509    /// 4. Periodic checkpoint (if interval elapsed)
510    /// 5. Table source polling (idle maintenance)
511    /// 6. Barrier timeout check
512    #[allow(clippy::too_many_lines)]
513    pub async fn run<C: PipelineCallback>(mut self, mut callback: C) {
514        /// Maximum messages to drain per cycle before yielding for maintenance work.
515        const MAX_DRAIN_PER_CYCLE: usize = 10_000;
516
517        let injectors = self
518            .source_handles
519            .iter()
520            .map(|h| (h.name.clone(), h.barrier_injector.clone()))
521            .collect();
522        callback.set_barrier_injectors(injectors);
523
524        let batch_window = self.config.batch_window;
525        let mut barriers_buf: Vec<(usize, CheckpointBarrier, SourceCheckpoint)> = Vec::new();
526
527        loop {
528            // Step: Wait for data, shutdown, or idle timeout.
529            let msg = tokio::select! {
530                biased;
531                () = self.shutdown.notified() => break,
532                // A background persist finished (in-flight guard ensures epoch order).
533                Some((epoch, fan_out)) = async {
534                    if let Some(ref mut rx) = self.checkpoint_complete_rx {
535                        rx.recv().await.ok()
536                    } else {
537                        futures::future::pending::<Option<CheckpointCompletion>>().await
538                    }
539                } => {
540                    self.broadcast_epoch_committed(epoch, &fan_out);
541                    callback.publish_barrier(epoch, epoch);
542                    continue;
543                }
544                msg = self.rx.recv() => {
545                    match msg {
546                        Ok(m) => {
547                            // If batch_window > 0, coalesce: sleep briefly
548                            // to let more data accumulate.
549                            if !batch_window.is_zero() {
550                                tokio::time::sleep(batch_window).await;
551                            }
552                            Some(m)
553                        }
554                        Err(_) => break, // All senders dropped
555                    }
556                }
557                () = tokio::time::sleep(IDLE_TIMEOUT) => None,
558            };
559
560            self.source_batches_buf.clear();
561            self.barrier_seen.clear();
562            self.discard_pending_offsets();
563            barriers_buf.clear();
564            let mut cycle_events: u64 = 0;
565            let cycle_start = Instant::now();
566
567            // First, drain any post-barrier messages deferred from the
568            // previous cycle — these are pre-next-barrier data that should
569            // be processed in this cycle.
570            let deferred = std::mem::take(&mut self.post_barrier_buf);
571            for deferred_msg in deferred {
572                self.process_msg(
573                    deferred_msg,
574                    &mut callback,
575                    &mut barriers_buf,
576                    &mut cycle_events,
577                );
578            }
579
580            let had_data = msg.is_some();
581            if let Some(first_msg) = msg {
582                self.process_msg(
583                    first_msg,
584                    &mut callback,
585                    &mut barriers_buf,
586                    &mut cycle_events,
587                );
588            }
589
590            // Drain any additional buffered messages (batch coalescing).
591            // Terminates on count limit, time budget, or backpressure.
592            // Only check backpressure on active wakeups to avoid bumping
593            // the counter on idle timeouts.
594            let mut drain_count = 0;
595            let drain_budget_ns = self.config.drain_budget_ns;
596            let backpressured = had_data && callback.is_backpressured();
597            if backpressured {
598                tracing::debug!("operator graph backpressured — skipping drain");
599            }
600            #[allow(clippy::cast_possible_truncation)]
601            while !backpressured
602                && drain_count < MAX_DRAIN_PER_CYCLE
603                && (cycle_start.elapsed().as_nanos() as u64) < drain_budget_ns
604            {
605                match self.rx.try_recv() {
606                    Ok(msg) => {
607                        self.process_msg(msg, &mut callback, &mut barriers_buf, &mut cycle_events);
608                        drain_count += 1;
609                    }
610                    Err(_) => break,
611                }
612            }
613
614            for (name, batch) in self.pending_watermark_batches.drain(..) {
615                callback.extract_watermark(&name, &batch);
616            }
617
618            // Demote watermarked sources idle past their timeout so a quiet
619            // input doesn't pin the combined watermark (active sources keep
620            // driving the cycle below, which then closes pending windows).
621            callback.tick_idle_watermark();
622
623            // Step: Execute SQL cycle. Also runs on idle wakeups when
624            // operators have deferred input from a prior budget-exceeded
625            // cycle — otherwise that data is stuck forever once the source
626            // goes idle.
627            if !self.source_batches_buf.is_empty() || callback.has_deferred_input() {
628                let wm = callback.current_watermark();
629                match callback.execute_cycle(&self.source_batches_buf, wm).await {
630                    Ok(results) => {
631                        self.commit_pending_offsets();
632                        callback.update_mv_stores(&results);
633                        callback.push_to_streams(&results);
634                        callback.write_to_sinks(&results).await;
635                    }
636                    Err(e) => {
637                        self.discard_pending_offsets();
638                        tracing::warn!(error = %e, "[LDB-3020] SQL cycle error");
639                    }
640                }
641                #[allow(clippy::cast_possible_truncation)]
642                let elapsed_ns = cycle_start.elapsed().as_nanos() as u64;
643                callback.record_cycle(cycle_events, 0, elapsed_ns);
644
645                if elapsed_ns >= self.config.cycle_budget_ns {
646                    tracing::debug!(
647                        elapsed_ms = elapsed_ns / 1_000_000,
648                        budget_ms = self.config.cycle_budget_ns / 1_000_000,
649                        "cycle budget exceeded — proceeding to maintenance"
650                    );
651                }
652            }
653
654            // Hoist elapsed_ns so the background phase can use it.
655            #[allow(clippy::cast_possible_truncation)]
656            let cycle_elapsed_ns = cycle_start.elapsed().as_nanos() as u64;
657
658            let bg_start = Instant::now();
659            let bg_budget = self.config.background_budget_ns;
660
661            // Step: Handle barriers — always process (cheap, O(num_sources)
662            // hash lookups, must not be deferred for correctness).
663            for (source_idx, barrier, cp) in &barriers_buf {
664                self.handle_barrier(*source_idx, barrier, cp, &mut callback)
665                    .await;
666            }
667
668            // Step: Periodic checkpoint — skip if background budget already
669            // exhausted (checkpoint is expensive I/O).
670            #[allow(clippy::cast_possible_truncation)]
671            if (bg_start.elapsed().as_nanos() as u64) < bg_budget {
672                self.maybe_checkpoint(&mut callback).await;
673            }
674
675            // Step: Poll table sources — skip if cycle OR background budget
676            // exceeded (lowest priority background work).
677            #[allow(clippy::cast_possible_truncation)]
678            let bg_elapsed = bg_start.elapsed().as_nanos() as u64;
679            if cycle_elapsed_ns < self.config.cycle_budget_ns && bg_elapsed < bg_budget {
680                callback.poll_tables().await;
681            } else {
682                tracing::debug!("skipping poll_tables (budget exhausted)");
683            }
684
685            // Step: Drain control messages (add/drop stream DDL).
686            // Processed AFTER checkpoint so newly added queries don't have
687            // inconsistent state in the checkpoint.
688            while let Ok(msg) = self.control_rx.try_recv() {
689                callback.apply_control(msg);
690            }
691
692            // Step: Barrier timeout check.
693            if self.pending_barrier.active
694                && self.pending_barrier.started_at.elapsed() > self.config.barrier_alignment_timeout
695            {
696                tracing::warn!(
697                    checkpoint_id = self.pending_barrier.checkpoint_id,
698                    "Barrier alignment timeout — cancelling checkpoint"
699                );
700                self.pending_barrier.active = false;
701            }
702        }
703
704        // Signal source tasks to stop.
705        for handle in &self.source_handles {
706            handle.shutdown.notify_one();
707        }
708
709        // Drain rx BEFORE joining source tasks. Source tasks may be
710        // blocked on task_tx.send() (channel full because the pipeline
711        // is slower than the source). Draining first frees channel
712        // slots so those source tasks can unblock, see the shutdown
713        // signal, and exit. Joining before draining deadlocks.
714        self.source_batches_buf.clear();
715        self.barrier_seen.clear();
716        self.discard_pending_offsets();
717        let mut drain_barriers: Vec<(usize, CheckpointBarrier, SourceCheckpoint)> = Vec::new();
718        let mut drain_events: u64 = 0;
719
720        loop {
721            let deferred = std::mem::take(&mut self.post_barrier_buf);
722            let mut got_any = !deferred.is_empty();
723            for msg in deferred {
724                self.process_msg(msg, &mut callback, &mut drain_barriers, &mut drain_events);
725            }
726            while let Ok(msg) = self.rx.try_recv() {
727                got_any = true;
728                self.process_msg(msg, &mut callback, &mut drain_barriers, &mut drain_events);
729            }
730            if !got_any {
731                break;
732            }
733        }
734
735        for (name, batch) in self.pending_watermark_batches.drain(..) {
736            callback.extract_watermark(&name, &batch);
737        }
738        callback.tick_idle_watermark();
739        if !self.source_batches_buf.is_empty() || callback.has_deferred_input() {
740            let wm = callback.current_watermark();
741            match callback.execute_cycle(&self.source_batches_buf, wm).await {
742                Ok(results) => {
743                    self.commit_pending_offsets();
744                    callback.update_mv_stores(&results);
745                    callback.push_to_streams(&results);
746                    callback.write_to_sinks(&results).await;
747                }
748                Err(e) => {
749                    self.discard_pending_offsets();
750                    tracing::warn!(error = %e, "[LDB-3020] SQL cycle error during shutdown drain");
751                }
752            }
753        }
754
755        // Second drain: pick up any messages source tasks sent between
756        // the first drain and finishing their data-drain phase.
757        self.source_batches_buf.clear();
758        self.barrier_seen.clear();
759        self.discard_pending_offsets();
760        drain_barriers.clear();
761        while let Ok(msg) = self.rx.try_recv() {
762            self.process_msg(msg, &mut callback, &mut drain_barriers, &mut drain_events);
763        }
764        for (name, batch) in self.pending_watermark_batches.drain(..) {
765            callback.extract_watermark(&name, &batch);
766        }
767        callback.tick_idle_watermark();
768        if !self.source_batches_buf.is_empty() || callback.has_deferred_input() {
769            let wm = callback.current_watermark();
770            match callback.execute_cycle(&self.source_batches_buf, wm).await {
771                Ok(results) => {
772                    self.commit_pending_offsets();
773                    callback.update_mv_stores(&results);
774                    callback.push_to_streams(&results);
775                    callback.write_to_sinks(&results).await;
776                }
777                Err(e) => {
778                    self.discard_pending_offsets();
779                    tracing::warn!(error = %e, "[LDB-3020] SQL cycle error during final drain");
780                }
781            }
782        }
783
784        // Final checkpoint uses committed_offsets — only reflects data
785        // that successfully passed through execute_cycle. Run BEFORE
786        // dropping source senders so sources receive the EpochCommitted
787        // and ack to their external systems (Kafka group offsets, etc.).
788        let checkpoint_enabled = self.config.checkpoint_interval.is_some();
789        if checkpoint_enabled {
790            let source_offsets: FxHashMap<String, SourceCheckpoint> = self
791                .committed_offsets
792                .iter()
793                .enumerate()
794                .filter_map(|(idx, cp)| {
795                    cp.as_ref().and_then(|c| {
796                        self.source_names
797                            .get(idx)
798                            .map(|name| (name.to_string(), c.clone()))
799                    })
800                })
801                .collect();
802            if let Some(epoch) = callback
803                .maybe_checkpoint(true, source_offsets.clone())
804                .await
805            {
806                tracing::info!(epoch, "final checkpoint completed before shutdown");
807                self.broadcast_epoch_committed(epoch, &source_offsets);
808            }
809        }
810
811        // Drop epoch_committed_tx senders before awaiting join: source
812        // tasks block on epoch_committed_rx.changed() in their drain
813        // phase and only exit once the sender is dropped.
814        for handle in std::mem::take(&mut self.source_handles) {
815            let SourceHandle {
816                name,
817                shutdown: _,
818                join,
819                barrier_injector: _,
820                epoch_committed_tx,
821            } = handle;
822            drop(epoch_committed_tx);
823            if let Err(e) = join.await {
824                tracing::warn!(source = %name, error = ?e, "source task panicked");
825            }
826        }
827    }
828
829    /// Process a single source message.
830    ///
831    /// When a barrier is seen from a source, subsequent batches from that
832    /// source are diverted to `post_barrier_buf` to ensure they are not
833    /// included in the current checkpoint state.
834    fn process_msg(
835        &mut self,
836        msg: SourceMsg,
837        callback: &mut impl PipelineCallback,
838        barriers: &mut Vec<(usize, CheckpointBarrier, SourceCheckpoint)>,
839        cycle_events: &mut u64,
840    ) {
841        match msg {
842            SourceMsg::Batch {
843                source_idx,
844                batch,
845                checkpoint,
846            } => {
847                // If this source already delivered a barrier in this drain
848                // cycle, this batch is post-barrier data — defer it.
849                if self.barrier_seen.contains(&source_idx) {
850                    self.post_barrier_buf.push(SourceMsg::Batch {
851                        source_idx,
852                        batch,
853                        checkpoint,
854                    });
855                    return;
856                }
857
858                // Stage offset (committed after successful execute_cycle).
859                if source_idx < self.pending_offsets.len() {
860                    self.pending_offsets[source_idx] = Some(checkpoint);
861                }
862
863                if let Some(name) = self.source_names.get(source_idx) {
864                    #[allow(clippy::cast_possible_truncation)]
865                    {
866                        *cycle_events += batch.num_rows() as u64;
867                    }
868                    // Filter with the PRE-DRAIN watermark. Watermark extraction
869                    // is deferred to after all batches in this drain cycle are
870                    // filtered — otherwise batch N's watermark advance causes
871                    // batch N+1's slightly-older rows to be dropped as "late."
872                    if let Some(filtered) = callback.filter_late_rows(name, &batch) {
873                        self.source_batches_buf
874                            .entry(Arc::clone(name))
875                            .or_default()
876                            .push(filtered);
877                    }
878                    self.pending_watermark_batches
879                        .push((Arc::clone(name), batch));
880                }
881            }
882            SourceMsg::Barrier {
883                source_idx,
884                barrier,
885                checkpoint,
886            } => {
887                tracing::debug!(
888                    source_idx,
889                    checkpoint_id = barrier.checkpoint_id,
890                    "coordinator received source barrier"
891                );
892                self.barrier_seen.insert(source_idx);
893                barriers.push((source_idx, barrier, checkpoint));
894            }
895        }
896    }
897
898    /// Merge staged offsets into `committed_offsets`.  Called after a
899    /// successful `execute_cycle`.
900    fn commit_pending_offsets(&mut self) {
901        for (i, pending) in self.pending_offsets.iter_mut().enumerate() {
902            if let Some(cp) = pending.take() {
903                self.committed_offsets[i] = Some(cp);
904            }
905        }
906    }
907
908    /// Discard staged offsets.  Called when `execute_cycle` fails.
909    fn discard_pending_offsets(&mut self) {
910        for slot in &mut self.pending_offsets {
911            *slot = None;
912        }
913    }
914
915    /// Handle a barrier from a source.
916    async fn handle_barrier(
917        &mut self,
918        source_idx: usize,
919        barrier: &CheckpointBarrier,
920        barrier_checkpoint: &SourceCheckpoint,
921        callback: &mut impl PipelineCallback,
922    ) {
923        if !self.pending_barrier.active && !callback.is_leader() {
924            self.pending_barrier
925                .reset(barrier.checkpoint_id, self.source_handles.len());
926        }
927
928        if !self.pending_barrier.active
929            || barrier.checkpoint_id != self.pending_barrier.checkpoint_id
930        {
931            return;
932        }
933
934        // Use the checkpoint that traveled atomically with the barrier
935        // message — no watch-channel race.
936        if let Some(name) = self.source_names.get(source_idx) {
937            self.pending_barrier
938                .source_checkpoints
939                .insert(name.to_string(), barrier_checkpoint.clone());
940        }
941
942        self.pending_barrier.sources_aligned.insert(source_idx);
943
944        // Check if all sources aligned.
945        if self.pending_barrier.sources_aligned.len() >= self.pending_barrier.sources_total {
946            let checkpoints = std::mem::take(&mut self.pending_barrier.source_checkpoints);
947            // Clone for fan-out — `checkpoint_with_barrier` consumes the
948            // map. The fan-out passes each source the exact `SourceCheckpoint`
949            // that was persisted, so external offset state (Kafka group
950            // offsets, ack tokens) advances only with the durable manifest.
951            let fan_out = checkpoints.clone();
952            let checkpoint_id = self.pending_barrier.checkpoint_id;
953            match callback
954                .checkpoint_with_barrier(checkpoints, checkpoint_id)
955                .await
956            {
957                BarrierOutcome::Committed(epoch) => {
958                    self.broadcast_epoch_committed(epoch, &fan_out);
959                    // Wire barrier = durable epoch.
960                    callback.publish_barrier(epoch, checkpoint_id);
961                }
962                BarrierOutcome::Async => {
963                    self.pending_barrier.active = false;
964                    self.last_checkpoint = Instant::now();
965                }
966                BarrierOutcome::Skipped(reason) => {
967                    tracing::debug!(checkpoint_id, reason = %reason, "barrier checkpoint skipped");
968                }
969                BarrierOutcome::Failed => {
970                    tracing::warn!(
971                        checkpoint_id,
972                        "barrier checkpoint failed, will retry on next interval"
973                    );
974                }
975            }
976            self.pending_barrier.active = false;
977            self.last_checkpoint = Instant::now();
978        }
979    }
980
981    /// Check if a periodic checkpoint should be triggered.
982    ///
983    /// When barriers are supported (sources present), injects barriers for
984    /// Chandy-Lamport consistent snapshots. When no sources are present,
985    /// falls back to timer-based offset-only checkpoints.
986    async fn maybe_checkpoint(&mut self, callback: &mut impl PipelineCallback) {
987        if self.pending_barrier.active {
988            return; // Already tracking a barrier.
989        }
990        if self.checkpoint_in_flight.load(Ordering::Acquire) >= self.max_in_flight_epochs {
991            return; // The in-flight epoch backlog is at its cap.
992        }
993        if self.staged_bytes.load(Ordering::Acquire) >= self.max_staged_bytes {
994            // Cadence degrades to upload speed rather than buffering
995            // unbounded captured state.
996            warn_staged_cap_throttled(
997                self.staged_bytes.load(Ordering::Acquire),
998                self.max_staged_bytes,
999            );
1000            return;
1001        }
1002
1003        // Always give the callback a chance to run cluster-follower
1004        // polling. `ConnectorPipelineCallback::maybe_checkpoint` routes
1005        // to `maybe_follower_checkpoint` on non-leader nodes before
1006        // checking the timer, so a follower with no data-path events
1007        // still picks up the leader's PREPARE announcements from gossip.
1008        // Leader-side nodes short-circuit here (no checkpoint_interval
1009        // configured in the tests that drive `db.checkpoint()` manually).
1010        let offsets = FxHashMap::default();
1011        if let Some(epoch) = callback.maybe_checkpoint(false, offsets).await {
1012            // Timer-driven follower path — no per-source state captured.
1013            self.broadcast_epoch_committed(epoch, &FxHashMap::default());
1014        }
1015
1016        let should_checkpoint = callback.is_leader()
1017            && (self
1018                .config
1019                .checkpoint_interval
1020                .is_some_and(|interval| self.last_checkpoint.elapsed() >= interval)
1021                || self
1022                    .checkpoint_request_flags
1023                    .iter()
1024                    .any(|f| f.swap(false, Ordering::AcqRel)));
1025
1026        if !should_checkpoint {
1027            return;
1028        }
1029
1030        if self.source_handles.is_empty() {
1031            // No sources — timer-based checkpoint only.
1032            let offsets = FxHashMap::default();
1033            if let Some(epoch) = callback.maybe_checkpoint(false, offsets).await {
1034                self.last_checkpoint = Instant::now();
1035                self.broadcast_epoch_committed(epoch, &FxHashMap::default());
1036            }
1037            return;
1038        }
1039
1040        // Inject barriers into all source tasks for Chandy-Lamport alignment.
1041        let checkpoint_id = if let Some(external_id) = callback.next_checkpoint_id() {
1042            self.next_checkpoint_id = external_id + 1;
1043            external_id
1044        } else {
1045            let id = self.next_checkpoint_id;
1046            self.next_checkpoint_id += 1;
1047            id
1048        };
1049        self.pending_barrier
1050            .reset(checkpoint_id, self.source_handles.len());
1051
1052        for handle in &self.source_handles {
1053            handle.barrier_injector.trigger(checkpoint_id, 0);
1054        }
1055    }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use super::*;
1061    use arrow::array::Int64Array;
1062    use arrow::datatypes::{DataType, Field, Schema};
1063    use std::sync::Arc;
1064
1065    /// Minimal mock callback for testing the coordinator loop.
1066    struct MockCallback {
1067        cycle_count: u32,
1068        results: Vec<FxHashMap<Arc<str>, Vec<RecordBatch>>>,
1069        watermark: i64,
1070        /// Optional shared flag set when `maybe_checkpoint(force=true)` fires.
1071        force_checkpoint_flag: Option<Arc<std::sync::atomic::AtomicBool>>,
1072    }
1073
1074    impl MockCallback {
1075        fn new() -> Self {
1076            Self {
1077                cycle_count: 0,
1078                results: Vec::new(),
1079                watermark: 0,
1080                force_checkpoint_flag: None,
1081            }
1082        }
1083    }
1084
1085    impl PipelineCallback for MockCallback {
1086        async fn execute_cycle(
1087            &mut self,
1088            source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
1089            _watermark: i64,
1090        ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String> {
1091            self.cycle_count += 1;
1092            // Pass through source batches as results.
1093            let results: FxHashMap<Arc<str>, Vec<RecordBatch>> = source_batches
1094                .iter()
1095                .map(|(k, v)| (k.clone(), v.clone()))
1096                .collect();
1097            self.results.push(results.clone());
1098            Ok(results)
1099        }
1100
1101        fn push_to_streams(&self, _results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {}
1102        async fn write_to_sinks(&mut self, _results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {}
1103
1104        fn extract_watermark(&mut self, _source_name: &str, batch: &RecordBatch) {
1105            // Use row count as a simple watermark proxy.
1106            #[allow(clippy::cast_possible_wrap)]
1107            {
1108                self.watermark += batch.num_rows() as i64;
1109            }
1110        }
1111
1112        fn filter_late_rows(&self, _source_name: &str, batch: &RecordBatch) -> Option<RecordBatch> {
1113            Some(batch.clone())
1114        }
1115
1116        fn current_watermark(&self) -> i64 {
1117            self.watermark
1118        }
1119
1120        async fn maybe_checkpoint(
1121            &mut self,
1122            force: bool,
1123            _source_offsets: FxHashMap<String, SourceCheckpoint>,
1124        ) -> Option<u64> {
1125            if force {
1126                if let Some(ref flag) = self.force_checkpoint_flag {
1127                    flag.store(true, std::sync::atomic::Ordering::SeqCst);
1128                }
1129                Some(1)
1130            } else {
1131                None
1132            }
1133        }
1134
1135        async fn checkpoint_with_barrier(
1136            &mut self,
1137            _source_checkpoints: FxHashMap<String, SourceCheckpoint>,
1138            _checkpoint_id: u64,
1139        ) -> BarrierOutcome {
1140            BarrierOutcome::Committed(1)
1141        }
1142
1143        fn record_cycle(&self, _events: u64, _batches: u64, _elapsed_ns: u64) {}
1144        async fn poll_tables(&mut self) {}
1145        fn apply_control(&mut self, _msg: crate::pipeline::ControlMsg) {}
1146    }
1147
1148    /// Test that the coordinator processes messages via direct mpsc channel.
1149    #[tokio::test]
1150    async fn test_coordinator_direct_channel() {
1151        let shutdown = Arc::new(tokio::sync::Notify::new());
1152        let (tx, rx) = mpsc::bounded_async::<SourceMsg>(64);
1153
1154        // Create coordinator directly (bypassing source spawning).
1155        let (_control_tx, control_rx) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1156        let coordinator = StreamingCoordinator {
1157            config: PipelineConfig {
1158                batch_window: Duration::ZERO,
1159                max_poll_records: 1000,
1160                channel_capacity: 64,
1161                fallback_poll_interval: Duration::from_millis(10),
1162                checkpoint_interval: None,
1163                delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1164                barrier_alignment_timeout: Duration::from_secs(30),
1165                cycle_budget_ns: 10_000_000,
1166                drain_budget_ns: 1_000_000,
1167                query_budget_ns: 8_000_000,
1168                background_budget_ns: 5_000_000,
1169                max_input_buf_batches: 256,
1170                max_input_buf_bytes: None,
1171                backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1172            },
1173            rx,
1174            source_handles: Vec::new(),
1175            source_names: vec![Arc::from("test_source")],
1176            shutdown: Arc::clone(&shutdown),
1177            pending_barrier: PendingBarrier::new(),
1178            next_checkpoint_id: 1,
1179            last_checkpoint: Instant::now(),
1180            checkpoint_request_flags: Vec::new(),
1181            source_batches_buf: FxHashMap::default(),
1182            post_barrier_buf: Vec::new(),
1183            pending_watermark_batches: Vec::new(),
1184            barrier_seen: FxHashSet::default(),
1185            committed_offsets: vec![None],
1186            pending_offsets: vec![None],
1187            control_rx,
1188            checkpoint_complete_rx: None,
1189            checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1190            max_in_flight_epochs: 1,
1191            staged_bytes: Arc::new(AtomicU64::new(0)),
1192            max_staged_bytes: u64::MAX,
1193        };
1194
1195        let callback = MockCallback::new();
1196
1197        // Send a batch.
1198        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
1199        let batch =
1200            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
1201        tx.send(SourceMsg::Batch {
1202            source_idx: 0,
1203            batch,
1204            checkpoint: SourceCheckpoint::new(1),
1205        })
1206        .await
1207        .unwrap();
1208
1209        // Signal shutdown after a brief delay.
1210        let shutdown_clone = Arc::clone(&shutdown);
1211        tokio::spawn(async move {
1212            tokio::time::sleep(Duration::from_millis(50)).await;
1213            shutdown_clone.notify_one();
1214        });
1215
1216        // Run coordinator — it should process the batch and exit on shutdown.
1217        coordinator.run(callback).await;
1218
1219        // The callback was consumed by run(), so we can't inspect it directly.
1220        // But the test proves: no panics, no deadlocks, clean shutdown.
1221    }
1222
1223    /// Test that a final checkpoint is triggered on shutdown when checkpointing
1224    /// is enabled.
1225    #[tokio::test]
1226    async fn test_final_checkpoint_on_shutdown() {
1227        let shutdown = Arc::new(tokio::sync::Notify::new());
1228        let (tx, rx) = mpsc::bounded_async::<SourceMsg>(64);
1229        let (_control_tx, control_rx) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1230
1231        let coordinator = StreamingCoordinator {
1232            config: PipelineConfig {
1233                batch_window: Duration::ZERO,
1234                max_poll_records: 1000,
1235                channel_capacity: 64,
1236                fallback_poll_interval: Duration::from_millis(10),
1237                checkpoint_interval: Some(Duration::from_secs(60)),
1238                delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1239                barrier_alignment_timeout: Duration::from_secs(30),
1240                cycle_budget_ns: 10_000_000,
1241                drain_budget_ns: 1_000_000,
1242                query_budget_ns: 8_000_000,
1243                background_budget_ns: 5_000_000,
1244                max_input_buf_batches: 256,
1245                max_input_buf_bytes: None,
1246                backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1247            },
1248            rx,
1249            source_handles: Vec::new(),
1250            source_names: vec![Arc::from("test_source")],
1251            shutdown: Arc::clone(&shutdown),
1252            pending_barrier: PendingBarrier::new(),
1253            next_checkpoint_id: 1,
1254            last_checkpoint: Instant::now(),
1255            checkpoint_request_flags: Vec::new(),
1256            source_batches_buf: FxHashMap::default(),
1257            post_barrier_buf: Vec::new(),
1258            pending_watermark_batches: Vec::new(),
1259            barrier_seen: FxHashSet::default(),
1260            committed_offsets: vec![None],
1261            pending_offsets: vec![None],
1262            control_rx,
1263            checkpoint_complete_rx: None,
1264            checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1265            max_in_flight_epochs: 1,
1266            staged_bytes: Arc::new(AtomicU64::new(0)),
1267            max_staged_bytes: u64::MAX,
1268        };
1269
1270        let force_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
1271        let mut callback = MockCallback::new();
1272        callback.force_checkpoint_flag = Some(Arc::clone(&force_flag));
1273
1274        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
1275        let batch =
1276            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
1277        tx.send(SourceMsg::Batch {
1278            source_idx: 0,
1279            batch,
1280            checkpoint: SourceCheckpoint::new(1),
1281        })
1282        .await
1283        .unwrap();
1284
1285        let shutdown_clone = Arc::clone(&shutdown);
1286        tokio::spawn(async move {
1287            tokio::time::sleep(Duration::from_millis(50)).await;
1288            shutdown_clone.notify_one();
1289        });
1290
1291        coordinator.run(callback).await;
1292
1293        assert!(
1294            force_flag.load(std::sync::atomic::Ordering::SeqCst),
1295            "final checkpoint with force=true should have been called"
1296        );
1297    }
1298
1299    /// Test that post-barrier batches are excluded from the current cycle's
1300    /// `source_batches_buf` and deferred to the next cycle.
1301    #[tokio::test]
1302    #[allow(clippy::too_many_lines, clippy::similar_names)]
1303    async fn test_barrier_excludes_post_barrier_data() {
1304        let shutdown = Arc::new(tokio::sync::Notify::new());
1305        let schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, false)]));
1306
1307        let (_control_tx2, control_rx2) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1308        let mut coordinator = StreamingCoordinator {
1309            config: PipelineConfig {
1310                batch_window: Duration::ZERO,
1311                max_poll_records: 1000,
1312                channel_capacity: 64,
1313                fallback_poll_interval: Duration::from_millis(10),
1314                checkpoint_interval: None,
1315                delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1316                barrier_alignment_timeout: Duration::from_secs(30),
1317                cycle_budget_ns: 10_000_000,
1318                drain_budget_ns: 1_000_000,
1319                query_budget_ns: 8_000_000,
1320                background_budget_ns: 5_000_000,
1321                max_input_buf_batches: 256,
1322                max_input_buf_bytes: None,
1323                backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1324            },
1325            rx: mpsc::bounded_async::<SourceMsg>(64).1, // dummy, not used
1326            source_handles: Vec::new(),
1327            source_names: vec![Arc::from("s0"), Arc::from("s1")],
1328            shutdown: Arc::clone(&shutdown),
1329            pending_barrier: PendingBarrier::new(),
1330            next_checkpoint_id: 1,
1331            last_checkpoint: Instant::now(),
1332            checkpoint_request_flags: Vec::new(),
1333            source_batches_buf: FxHashMap::default(),
1334            post_barrier_buf: Vec::new(),
1335            pending_watermark_batches: Vec::new(),
1336            barrier_seen: FxHashSet::default(),
1337            committed_offsets: vec![None, None],
1338            pending_offsets: vec![None, None],
1339            control_rx: control_rx2,
1340            checkpoint_complete_rx: None,
1341            checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1342            max_in_flight_epochs: 1,
1343            staged_bytes: Arc::new(AtomicU64::new(0)),
1344            max_staged_bytes: u64::MAX,
1345        };
1346
1347        let mut callback = MockCallback::new();
1348        let mut barriers = Vec::new();
1349        let mut cycle_events: u64 = 0;
1350
1351        // Source 0: batch(ts=1), barrier, batch(ts=2)
1352        let batch_1 = RecordBatch::try_new(
1353            Arc::clone(&schema),
1354            vec![Arc::new(Int64Array::from(vec![1]))],
1355        )
1356        .unwrap();
1357        let batch_2 = RecordBatch::try_new(
1358            Arc::clone(&schema),
1359            vec![Arc::new(Int64Array::from(vec![2]))],
1360        )
1361        .unwrap();
1362        let barrier = CheckpointBarrier::new(1, 0);
1363
1364        coordinator.process_msg(
1365            SourceMsg::Batch {
1366                source_idx: 0,
1367                batch: batch_1,
1368                checkpoint: SourceCheckpoint::new(10),
1369            },
1370            &mut callback,
1371            &mut barriers,
1372            &mut cycle_events,
1373        );
1374        coordinator.process_msg(
1375            SourceMsg::Barrier {
1376                source_idx: 0,
1377                barrier,
1378                checkpoint: SourceCheckpoint::new(10),
1379            },
1380            &mut callback,
1381            &mut barriers,
1382            &mut cycle_events,
1383        );
1384        coordinator.process_msg(
1385            SourceMsg::Batch {
1386                source_idx: 0,
1387                batch: batch_2,
1388                checkpoint: SourceCheckpoint::new(20),
1389            },
1390            &mut callback,
1391            &mut barriers,
1392            &mut cycle_events,
1393        );
1394
1395        // Source 1: batch(ts=1), barrier
1396        let batch_s1 = RecordBatch::try_new(
1397            Arc::clone(&schema),
1398            vec![Arc::new(Int64Array::from(vec![1]))],
1399        )
1400        .unwrap();
1401        coordinator.process_msg(
1402            SourceMsg::Batch {
1403                source_idx: 1,
1404                batch: batch_s1,
1405                checkpoint: SourceCheckpoint::new(5),
1406            },
1407            &mut callback,
1408            &mut barriers,
1409            &mut cycle_events,
1410        );
1411        coordinator.process_msg(
1412            SourceMsg::Barrier {
1413                source_idx: 1,
1414                barrier,
1415                checkpoint: SourceCheckpoint::new(5),
1416            },
1417            &mut callback,
1418            &mut barriers,
1419            &mut cycle_events,
1420        );
1421
1422        // Verify: source_batches_buf should have ts=1 from both sources,
1423        // but NOT ts=2 from source 0 (that's post-barrier).
1424        let s0_batches = coordinator.source_batches_buf.get("s0").unwrap();
1425        assert_eq!(
1426            s0_batches.len(),
1427            1,
1428            "s0 should have exactly 1 pre-barrier batch"
1429        );
1430        let s0_col = s0_batches[0]
1431            .column(0)
1432            .as_any()
1433            .downcast_ref::<Int64Array>()
1434            .unwrap();
1435        assert_eq!(s0_col.value(0), 1, "s0 batch should contain ts=1");
1436
1437        let s1_batches = coordinator.source_batches_buf.get("s1").unwrap();
1438        assert_eq!(s1_batches.len(), 1, "s1 should have exactly 1 batch");
1439
1440        // Post-barrier buf should contain the ts=2 batch.
1441        assert_eq!(
1442            coordinator.post_barrier_buf.len(),
1443            1,
1444            "post_barrier_buf should have 1 deferred batch"
1445        );
1446
1447        // pending_offsets: pre-barrier only (post-barrier deferred, not staged).
1448        assert_eq!(
1449            coordinator.pending_offsets[0].as_ref().unwrap().epoch(),
1450            10,
1451            "s0 pending offset should be the pre-barrier batch"
1452        );
1453        assert_eq!(
1454            coordinator.pending_offsets[1].as_ref().unwrap().epoch(),
1455            5,
1456            "s1 pending offset should be epoch 5"
1457        );
1458        // committed_offsets must still be None — no execute_cycle has run.
1459        assert!(
1460            coordinator.committed_offsets[0].is_none(),
1461            "s0 committed offset should be None before execute_cycle"
1462        );
1463        assert!(
1464            coordinator.committed_offsets[1].is_none(),
1465            "s1 committed offset should be None before execute_cycle"
1466        );
1467
1468        // Simulate successful cycle → commit.
1469        coordinator.commit_pending_offsets();
1470        assert_eq!(
1471            coordinator.committed_offsets[0].as_ref().unwrap().epoch(),
1472            10,
1473            "s0 committed after cycle"
1474        );
1475        assert_eq!(
1476            coordinator.committed_offsets[1].as_ref().unwrap().epoch(),
1477            5,
1478            "s1 committed after cycle"
1479        );
1480
1481        // Barriers should have both sources.
1482        assert_eq!(barriers.len(), 2, "should have barriers from both sources");
1483    }
1484
1485    #[allow(clippy::disallowed_types)] // test-only: std::sync::Mutex is fine here
1486    struct BackpressuredCallback {
1487        inner: MockCallback,
1488        cycle_count: Arc<std::sync::atomic::AtomicU32>,
1489        events_per_cycle: Arc<std::sync::Mutex<Vec<u64>>>,
1490    }
1491
1492    impl BackpressuredCallback {
1493        #[allow(clippy::disallowed_types)]
1494        fn new(
1495            cycle_count: Arc<std::sync::atomic::AtomicU32>,
1496            events_per_cycle: Arc<std::sync::Mutex<Vec<u64>>>,
1497        ) -> Self {
1498            Self {
1499                inner: MockCallback::new(),
1500                cycle_count,
1501                events_per_cycle,
1502            }
1503        }
1504    }
1505
1506    impl PipelineCallback for BackpressuredCallback {
1507        async fn execute_cycle(
1508            &mut self,
1509            source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
1510            watermark: i64,
1511        ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String> {
1512            self.cycle_count
1513                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1514            let total: u64 = source_batches
1515                .values()
1516                .flat_map(|bs| bs.iter())
1517                .map(|b| b.num_rows() as u64)
1518                .sum();
1519            self.events_per_cycle.lock().unwrap().push(total);
1520            self.inner.execute_cycle(source_batches, watermark).await
1521        }
1522
1523        fn push_to_streams(&self, r: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
1524            self.inner.push_to_streams(r);
1525        }
1526        async fn write_to_sinks(&mut self, r: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
1527            self.inner.write_to_sinks(r).await;
1528        }
1529        fn extract_watermark(&mut self, s: &str, b: &RecordBatch) {
1530            self.inner.extract_watermark(s, b);
1531        }
1532        fn filter_late_rows(&self, s: &str, b: &RecordBatch) -> Option<RecordBatch> {
1533            self.inner.filter_late_rows(s, b)
1534        }
1535        fn current_watermark(&self) -> i64 {
1536            self.inner.current_watermark()
1537        }
1538        async fn maybe_checkpoint(
1539            &mut self,
1540            force: bool,
1541            offsets: FxHashMap<String, SourceCheckpoint>,
1542        ) -> Option<u64> {
1543            self.inner.maybe_checkpoint(force, offsets).await
1544        }
1545        async fn checkpoint_with_barrier(
1546            &mut self,
1547            cp: FxHashMap<String, SourceCheckpoint>,
1548            checkpoint_id: u64,
1549        ) -> BarrierOutcome {
1550            self.inner.checkpoint_with_barrier(cp, checkpoint_id).await
1551        }
1552        fn record_cycle(&self, e: u64, b: u64, ns: u64) {
1553            self.inner.record_cycle(e, b, ns);
1554        }
1555        async fn poll_tables(&mut self) {
1556            self.inner.poll_tables().await;
1557        }
1558        fn apply_control(&mut self, msg: crate::pipeline::ControlMsg) {
1559            self.inner.apply_control(msg);
1560        }
1561
1562        fn is_backpressured(&self) -> bool {
1563            true // Always backpressured — drain loop should never fire.
1564        }
1565    }
1566
1567    /// With `is_backpressured() == true`, the coordinator processes only
1568    /// the first wakeup message per cycle (no drain coalescing). With 5
1569    /// messages pre-loaded and `batch_window=0`, each cycle should see
1570    /// exactly 1 event, spread across multiple cycles.
1571    #[tokio::test]
1572    async fn test_drain_skip_under_backpressure() {
1573        let shutdown = Arc::new(tokio::sync::Notify::new());
1574        let (tx, rx) = mpsc::bounded_async::<SourceMsg>(64);
1575        let (_control_tx, control_rx) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1576
1577        let coordinator = StreamingCoordinator {
1578            config: PipelineConfig {
1579                batch_window: Duration::ZERO,
1580                max_poll_records: 1000,
1581                channel_capacity: 64,
1582                fallback_poll_interval: Duration::from_millis(10),
1583                checkpoint_interval: None,
1584                delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1585                barrier_alignment_timeout: Duration::from_secs(30),
1586                cycle_budget_ns: 10_000_000,
1587                drain_budget_ns: 1_000_000,
1588                query_budget_ns: 8_000_000,
1589                background_budget_ns: 5_000_000,
1590                max_input_buf_batches: 256,
1591                max_input_buf_bytes: None,
1592                backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1593            },
1594            rx,
1595            source_handles: Vec::new(),
1596            source_names: vec![Arc::from("src")],
1597            shutdown: Arc::clone(&shutdown),
1598            pending_barrier: PendingBarrier::new(),
1599            next_checkpoint_id: 1,
1600            last_checkpoint: Instant::now(),
1601            checkpoint_request_flags: Vec::new(),
1602            source_batches_buf: FxHashMap::default(),
1603            post_barrier_buf: Vec::new(),
1604            pending_watermark_batches: Vec::new(),
1605            barrier_seen: FxHashSet::default(),
1606            committed_offsets: vec![None],
1607            pending_offsets: vec![None],
1608            control_rx,
1609            checkpoint_complete_rx: None,
1610            checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1611            max_in_flight_epochs: 1,
1612            staged_bytes: Arc::new(AtomicU64::new(0)),
1613            max_staged_bytes: u64::MAX,
1614        };
1615
1616        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
1617
1618        // Pre-load 5 batches (1 row each).
1619        for i in 0..5 {
1620            let batch = RecordBatch::try_new(
1621                Arc::clone(&schema),
1622                vec![Arc::new(Int64Array::from(vec![i]))],
1623            )
1624            .unwrap();
1625            tx.send(SourceMsg::Batch {
1626                source_idx: 0,
1627                batch,
1628                checkpoint: SourceCheckpoint::new(u64::try_from(i).unwrap()),
1629            })
1630            .await
1631            .unwrap();
1632        }
1633
1634        let shutdown_clone = Arc::clone(&shutdown);
1635        tokio::spawn(async move {
1636            tokio::time::sleep(Duration::from_millis(300)).await;
1637            shutdown_clone.notify_one();
1638        });
1639
1640        let cycle_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
1641        #[allow(clippy::disallowed_types)]
1642        let events_per_cycle = Arc::new(std::sync::Mutex::new(Vec::new()));
1643        let callback =
1644            BackpressuredCallback::new(Arc::clone(&cycle_count), Arc::clone(&events_per_cycle));
1645        coordinator.run(callback).await;
1646
1647        let cycles = cycle_count.load(std::sync::atomic::Ordering::SeqCst);
1648        let epc = events_per_cycle.lock().unwrap();
1649        let total: u64 = epc.iter().sum();
1650
1651        // All 5 events must be processed (no data loss).
1652        assert_eq!(total, 5, "all events must be processed, got {total}");
1653        // Under backpressure each cycle gets only the wakeup message (1
1654        // event), so we need at least 5 cycles for 5 messages. Without
1655        // backpressure, cycle 1 would drain all 5 in one shot.
1656        assert!(
1657            cycles >= 5,
1658            "expected >=5 cycles (1 event each), got {cycles} cycles with events/cycle: {epc:?}"
1659        );
1660        // Each cycle sees at most 1 event (the wakeup message; drain skipped).
1661        for (i, &events) in epc.iter().enumerate() {
1662            assert!(
1663                events <= 1,
1664                "cycle {i} saw {events} events, expected <=1 under backpressure"
1665            );
1666        }
1667    }
1668}