1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::RecordBatch;
27use crossfire::{mpsc, AsyncRx};
28use laminar_connectors::checkpoint::SourceCheckpoint;
29use laminar_connectors::connector::{DeliveryGuarantee, SourceBatch};
30use laminar_connectors::error::ConnectorError;
31use laminar_core::checkpoint::{CheckpointBarrier, CheckpointBarrierInjector};
32use rustc_hash::{FxHashMap, FxHashSet};
33
34use super::callback::{BarrierOutcome, PipelineCallback, SourceRegistration};
35use super::config::PipelineConfig;
36use crate::error::DbError;
37
38type SourceMsgRx = AsyncRx<mpsc::Array<SourceMsg>>;
40type ControlMsgRx = AsyncRx<mpsc::Array<super::ControlMsg>>;
42
43enum SourceMsg {
50 Batch {
52 source_idx: usize,
53 batch: RecordBatch,
54 checkpoint: SourceCheckpoint,
58 },
59 Barrier {
61 source_idx: usize,
62 barrier: CheckpointBarrier,
63 checkpoint: SourceCheckpoint,
65 },
66}
67
68struct SourceHandle {
70 name: Arc<str>,
71 shutdown: Arc<tokio::sync::Notify>,
72 join: tokio::task::JoinHandle<()>,
73 barrier_injector: CheckpointBarrierInjector,
75 epoch_committed_tx: tokio::sync::watch::Sender<Option<(u64, SourceCheckpoint)>>,
82}
83
84type CheckpointCompletion = (u64, rustc_hash::FxHashMap<String, SourceCheckpoint>);
86
87pub struct StreamingCoordinator {
89 config: PipelineConfig,
90 rx: SourceMsgRx,
92 source_handles: Vec<SourceHandle>,
94 source_names: Vec<Arc<str>>,
96 shutdown: Arc<tokio::sync::Notify>,
98 pending_barrier: PendingBarrier,
100 next_checkpoint_id: u64,
102 last_checkpoint: Instant,
104 checkpoint_request_flags: Vec<Arc<AtomicBool>>,
106 source_batches_buf: FxHashMap<Arc<str>, Vec<RecordBatch>>,
108 post_barrier_buf: Vec<SourceMsg>,
113 pending_watermark_batches: Vec<(Arc<str>, RecordBatch)>,
114 barrier_seen: FxHashSet<usize>,
118 committed_offsets: Vec<Option<SourceCheckpoint>>,
121 pending_offsets: Vec<Option<SourceCheckpoint>>,
124 control_rx: ControlMsgRx,
126 checkpoint_complete_rx:
127 Option<crossfire::AsyncRx<crossfire::mpsc::Array<CheckpointCompletion>>>,
128 checkpoint_in_flight: Arc<AtomicU64>,
132 max_in_flight_epochs: u64,
135 staged_bytes: Arc<AtomicU64>,
138 max_staged_bytes: u64,
140}
141
142struct PendingBarrier {
144 checkpoint_id: u64,
145 sources_total: usize,
146 sources_aligned: FxHashSet<usize>,
147 source_checkpoints: FxHashMap<String, SourceCheckpoint>,
148 started_at: Instant,
149 active: bool,
150}
151
152impl PendingBarrier {
153 fn new() -> Self {
154 Self {
155 checkpoint_id: 0,
156 sources_total: 0,
157 sources_aligned: FxHashSet::default(),
158 source_checkpoints: FxHashMap::default(),
159 started_at: Instant::now(),
160 active: false,
161 }
162 }
163
164 fn reset(&mut self, checkpoint_id: u64, sources_total: usize) {
165 self.checkpoint_id = checkpoint_id;
166 self.sources_total = sources_total;
167 self.sources_aligned.clear();
168 self.source_checkpoints.clear();
169 self.started_at = Instant::now();
170 self.active = true;
171 }
172}
173
174const IDLE_TIMEOUT: Duration = Duration::from_millis(100);
176
177fn warn_staged_cap_throttled(staged_bytes: u64, cap: u64) {
181 static THROTTLE: crate::log_throttle::LogThrottle =
182 crate::log_throttle::LogThrottle::every(Duration::from_secs(10));
183 if THROTTLE.allow() {
184 tracing::warn!(
185 staged_bytes,
186 cap,
187 "checkpoint admission paused: staged-state cap reached"
188 );
189 }
190}
191
192enum SourceWake {
194 Shutdown,
195 EpochCommitted(u64, SourceCheckpoint),
196 Polled(Result<Option<SourceBatch>, ConnectorError>),
197}
198
199impl StreamingCoordinator {
200 fn broadcast_epoch_committed(
205 &self,
206 epoch: u64,
207 per_source: &FxHashMap<String, SourceCheckpoint>,
208 ) {
209 for handle in &self.source_handles {
210 let cp = per_source
211 .get(handle.name.as_ref())
212 .cloned()
213 .unwrap_or_else(|| SourceCheckpoint::new(epoch));
214 let _ = handle.epoch_committed_tx.send(Some((epoch, cp)));
215 }
216 }
217
218 #[allow(clippy::too_many_lines)]
228 pub async fn new(
229 sources: Vec<SourceRegistration>,
230 config: PipelineConfig,
231 shutdown: Arc<tokio::sync::Notify>,
232 control_rx: ControlMsgRx,
233 ) -> Result<Self, DbError> {
234 if config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
236 for src in &sources {
237 if !src.supports_replay {
238 return Err(DbError::Config(format!(
239 "[LDB-5031] exactly-once requires source '{}' to support replay",
240 src.name
241 )));
242 }
243 }
244 if config.checkpoint_interval.is_none() {
245 return Err(DbError::Config(
246 "[LDB-5032] exactly-once requires checkpointing to be enabled".into(),
247 ));
248 }
249 }
250
251 if config.channel_capacity == 0 {
252 return Err(DbError::Config(
253 "[LDB-0010] channel_capacity must be > 0".into(),
254 ));
255 }
256
257 let (tx, rx) = mpsc::bounded_async::<SourceMsg>(config.channel_capacity);
259
260 let mut source_handles = Vec::with_capacity(sources.len());
261 let mut source_names = Vec::with_capacity(sources.len());
262 let mut checkpoint_request_flags = Vec::new();
263 let mut committed_offsets = Vec::with_capacity(sources.len());
264
265 for (idx, src) in sources.into_iter().enumerate() {
266 if let Some(flag) = src.connector.checkpoint_requested() {
267 checkpoint_request_flags.push(flag);
268 }
269
270 let task_shutdown = Arc::new(tokio::sync::Notify::new());
271 let task_shutdown_clone = Arc::clone(&task_shutdown);
272 let task_tx = tx.clone();
273 let max_poll = config.max_poll_records;
274 let poll_interval = config.fallback_poll_interval;
275 let src_name = src.name.clone();
276 let restore = src.restore_checkpoint.clone();
277 let mut connector = src.connector;
278 let connector_config = src.config;
279
280 connector
282 .open(&connector_config)
283 .await
284 .map_err(|e| DbError::Config(format!("source '{src_name}' open failed: {e}")))?;
285
286 if let Some(ref cp) = restore {
288 if let Err(e) = connector.restore(cp).await {
289 tracing::warn!(
290 source = %src_name, error = %e,
291 "source restore failed, starting from beginning"
292 );
293 }
294 }
295
296 committed_offsets.push(src.restore_checkpoint);
299
300 let barrier_injector = CheckpointBarrierInjector::new();
302 let barrier_handle = barrier_injector.handle();
303
304 let (epoch_committed_tx, mut epoch_committed_rx) =
305 tokio::sync::watch::channel::<Option<(u64, SourceCheckpoint)>>(None);
306
307 let join = tokio::spawn(async move {
308 let mut epoch: u64 = 0;
309
310 loop {
313 let wake = tokio::select! {
314 biased;
315 () = task_shutdown_clone.notified() => SourceWake::Shutdown,
316 r = epoch_committed_rx.changed() => match r {
317 Ok(()) => {
318 let snapshot = epoch_committed_rx.borrow_and_update().clone();
319 match snapshot {
320 Some((e, cp)) => SourceWake::EpochCommitted(e, cp),
321 None => continue,
322 }
323 },
324 Err(_) => SourceWake::Shutdown,
325 },
326 r = connector.poll_batch(max_poll) => SourceWake::Polled(r),
327 };
328
329 let poll_result = match wake {
330 SourceWake::Shutdown => break,
331 SourceWake::EpochCommitted(e, cp) => {
332 if let Err(err) = connector.notify_epoch_committed(e, &cp).await {
333 tracing::warn!(
334 source = %src_name,
335 error = %err,
336 epoch = e,
337 "notify_epoch_committed failed",
338 );
339 }
340 continue;
341 }
342 SourceWake::Polled(r) => r,
343 };
344
345 match poll_result {
346 Ok(Some(batch)) => {
347 let cp = connector.checkpoint();
351 let msg = SourceMsg::Batch {
352 source_idx: idx,
353 batch: batch.records,
354 checkpoint: cp,
355 };
356 if task_tx.send(msg).await.is_err() {
357 break; }
359 }
360 Ok(None) => {
361 tokio::select! {
363 biased;
364 () = task_shutdown_clone.notified() => break,
365 () = tokio::time::sleep(poll_interval) => {}
366 }
367 }
368 Err(e) if !e.is_transient() => {
369 tracing::error!(source = %src_name, error = %e, "terminal poll error");
370 break;
371 }
372 Err(e) => {
373 tracing::warn!(source = %src_name, error = %e, "poll error (retrying)");
374 tokio::select! {
375 biased;
376 () = task_shutdown_clone.notified() => break,
377 () = tokio::time::sleep(poll_interval) => {}
378 }
379 }
380 }
381
382 if let Some(barrier) = barrier_handle.poll(epoch) {
384 epoch += 1;
385 let cp = connector.checkpoint();
386 let msg = SourceMsg::Barrier {
387 source_idx: idx,
388 barrier,
389 checkpoint: cp,
390 };
391 if task_tx.send(msg).await.is_err() {
392 break;
393 }
394 }
395 }
396
397 while let Ok(Some(batch)) = connector.poll_batch(max_poll).await {
401 let cp = connector.checkpoint();
402 let msg = SourceMsg::Batch {
403 source_idx: idx,
404 batch: batch.records,
405 checkpoint: cp,
406 };
407 if task_tx.send(msg).await.is_err() {
408 break;
409 }
410 }
411
412 while let Ok(()) = epoch_committed_rx.changed().await {
420 let snapshot = epoch_committed_rx.borrow_and_update().clone();
421 if let Some((e, cp)) = snapshot {
422 if let Err(err) = connector.notify_epoch_committed(e, &cp).await {
423 tracing::warn!(
424 source = %src_name,
425 error = %err,
426 epoch = e,
427 "notify_epoch_committed failed",
428 );
429 }
430 }
431 }
432
433 if let Err(e) = connector.close().await {
434 tracing::warn!(source = %src_name, error = %e, "source close error");
435 }
436 });
437
438 let arc_name: Arc<str> = Arc::from(src.name.as_str());
439 source_handles.push(SourceHandle {
440 name: Arc::clone(&arc_name),
441 shutdown: task_shutdown,
442 join,
443 barrier_injector,
444 epoch_committed_tx,
445 });
446 source_names.push(arc_name);
447 }
448
449 Ok(Self {
450 config,
451 rx,
452 source_handles,
453 source_names,
454 shutdown,
455 pending_barrier: PendingBarrier::new(),
456 next_checkpoint_id: 1,
457 last_checkpoint: Instant::now(),
458 checkpoint_request_flags,
459 source_batches_buf: FxHashMap::default(),
460 post_barrier_buf: Vec::new(),
461 pending_watermark_batches: Vec::new(),
462 barrier_seen: FxHashSet::default(),
463 pending_offsets: vec![None; committed_offsets.len()],
464 committed_offsets,
465 control_rx,
466 checkpoint_complete_rx: None,
467 checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
468 max_in_flight_epochs: 1,
469 staged_bytes: Arc::new(AtomicU64::new(0)),
470 max_staged_bytes: u64::MAX,
471 })
472 }
473
474 pub(crate) fn with_checkpoint_admission(
478 mut self,
479 in_flight: Arc<AtomicU64>,
480 max_in_flight_epochs: u64,
481 staged_bytes: Arc<AtomicU64>,
482 max_staged_bytes: u64,
483 ) -> Self {
484 self.checkpoint_in_flight = in_flight;
485 self.max_in_flight_epochs = max_in_flight_epochs.max(1);
486 self.staged_bytes = staged_bytes;
487 self.max_staged_bytes = max_staged_bytes;
488 self
489 }
490
491 pub(crate) fn with_checkpoint_complete_rx(
492 mut self,
493 rx: crossfire::AsyncRx<crossfire::mpsc::Array<CheckpointCompletion>>,
494 ) -> Self {
495 self.checkpoint_complete_rx = Some(rx);
496 self
497 }
498
499 #[allow(clippy::too_many_lines)]
513 pub async fn run<C: PipelineCallback>(mut self, mut callback: C) {
514 const MAX_DRAIN_PER_CYCLE: usize = 10_000;
516
517 let injectors = self
518 .source_handles
519 .iter()
520 .map(|h| (h.name.clone(), h.barrier_injector.clone()))
521 .collect();
522 callback.set_barrier_injectors(injectors);
523
524 let batch_window = self.config.batch_window;
525 let mut barriers_buf: Vec<(usize, CheckpointBarrier, SourceCheckpoint)> = Vec::new();
526
527 loop {
528 let msg = tokio::select! {
530 biased;
531 () = self.shutdown.notified() => break,
532 Some((epoch, fan_out)) = async {
534 if let Some(ref mut rx) = self.checkpoint_complete_rx {
535 rx.recv().await.ok()
536 } else {
537 futures::future::pending::<Option<CheckpointCompletion>>().await
538 }
539 } => {
540 self.broadcast_epoch_committed(epoch, &fan_out);
541 callback.publish_barrier(epoch, epoch);
542 continue;
543 }
544 msg = self.rx.recv() => {
545 match msg {
546 Ok(m) => {
547 if !batch_window.is_zero() {
550 tokio::time::sleep(batch_window).await;
551 }
552 Some(m)
553 }
554 Err(_) => break, }
556 }
557 () = tokio::time::sleep(IDLE_TIMEOUT) => None,
558 };
559
560 self.source_batches_buf.clear();
561 self.barrier_seen.clear();
562 self.discard_pending_offsets();
563 barriers_buf.clear();
564 let mut cycle_events: u64 = 0;
565 let cycle_start = Instant::now();
566
567 let deferred = std::mem::take(&mut self.post_barrier_buf);
571 for deferred_msg in deferred {
572 self.process_msg(
573 deferred_msg,
574 &mut callback,
575 &mut barriers_buf,
576 &mut cycle_events,
577 );
578 }
579
580 let had_data = msg.is_some();
581 if let Some(first_msg) = msg {
582 self.process_msg(
583 first_msg,
584 &mut callback,
585 &mut barriers_buf,
586 &mut cycle_events,
587 );
588 }
589
590 let mut drain_count = 0;
595 let drain_budget_ns = self.config.drain_budget_ns;
596 let backpressured = had_data && callback.is_backpressured();
597 if backpressured {
598 tracing::debug!("operator graph backpressured — skipping drain");
599 }
600 #[allow(clippy::cast_possible_truncation)]
601 while !backpressured
602 && drain_count < MAX_DRAIN_PER_CYCLE
603 && (cycle_start.elapsed().as_nanos() as u64) < drain_budget_ns
604 {
605 match self.rx.try_recv() {
606 Ok(msg) => {
607 self.process_msg(msg, &mut callback, &mut barriers_buf, &mut cycle_events);
608 drain_count += 1;
609 }
610 Err(_) => break,
611 }
612 }
613
614 for (name, batch) in self.pending_watermark_batches.drain(..) {
615 callback.extract_watermark(&name, &batch);
616 }
617
618 callback.tick_idle_watermark();
622
623 if !self.source_batches_buf.is_empty() || callback.has_deferred_input() {
628 let wm = callback.current_watermark();
629 match callback.execute_cycle(&self.source_batches_buf, wm).await {
630 Ok(results) => {
631 self.commit_pending_offsets();
632 callback.update_mv_stores(&results);
633 callback.push_to_streams(&results);
634 callback.write_to_sinks(&results).await;
635 }
636 Err(e) => {
637 self.discard_pending_offsets();
638 tracing::warn!(error = %e, "[LDB-3020] SQL cycle error");
639 }
640 }
641 #[allow(clippy::cast_possible_truncation)]
642 let elapsed_ns = cycle_start.elapsed().as_nanos() as u64;
643 callback.record_cycle(cycle_events, 0, elapsed_ns);
644
645 if elapsed_ns >= self.config.cycle_budget_ns {
646 tracing::debug!(
647 elapsed_ms = elapsed_ns / 1_000_000,
648 budget_ms = self.config.cycle_budget_ns / 1_000_000,
649 "cycle budget exceeded — proceeding to maintenance"
650 );
651 }
652 }
653
654 #[allow(clippy::cast_possible_truncation)]
656 let cycle_elapsed_ns = cycle_start.elapsed().as_nanos() as u64;
657
658 let bg_start = Instant::now();
659 let bg_budget = self.config.background_budget_ns;
660
661 for (source_idx, barrier, cp) in &barriers_buf {
664 self.handle_barrier(*source_idx, barrier, cp, &mut callback)
665 .await;
666 }
667
668 #[allow(clippy::cast_possible_truncation)]
671 if (bg_start.elapsed().as_nanos() as u64) < bg_budget {
672 self.maybe_checkpoint(&mut callback).await;
673 }
674
675 #[allow(clippy::cast_possible_truncation)]
678 let bg_elapsed = bg_start.elapsed().as_nanos() as u64;
679 if cycle_elapsed_ns < self.config.cycle_budget_ns && bg_elapsed < bg_budget {
680 callback.poll_tables().await;
681 } else {
682 tracing::debug!("skipping poll_tables (budget exhausted)");
683 }
684
685 while let Ok(msg) = self.control_rx.try_recv() {
689 callback.apply_control(msg);
690 }
691
692 if self.pending_barrier.active
694 && self.pending_barrier.started_at.elapsed() > self.config.barrier_alignment_timeout
695 {
696 tracing::warn!(
697 checkpoint_id = self.pending_barrier.checkpoint_id,
698 "Barrier alignment timeout — cancelling checkpoint"
699 );
700 self.pending_barrier.active = false;
701 }
702 }
703
704 for handle in &self.source_handles {
706 handle.shutdown.notify_one();
707 }
708
709 self.source_batches_buf.clear();
715 self.barrier_seen.clear();
716 self.discard_pending_offsets();
717 let mut drain_barriers: Vec<(usize, CheckpointBarrier, SourceCheckpoint)> = Vec::new();
718 let mut drain_events: u64 = 0;
719
720 loop {
721 let deferred = std::mem::take(&mut self.post_barrier_buf);
722 let mut got_any = !deferred.is_empty();
723 for msg in deferred {
724 self.process_msg(msg, &mut callback, &mut drain_barriers, &mut drain_events);
725 }
726 while let Ok(msg) = self.rx.try_recv() {
727 got_any = true;
728 self.process_msg(msg, &mut callback, &mut drain_barriers, &mut drain_events);
729 }
730 if !got_any {
731 break;
732 }
733 }
734
735 for (name, batch) in self.pending_watermark_batches.drain(..) {
736 callback.extract_watermark(&name, &batch);
737 }
738 callback.tick_idle_watermark();
739 if !self.source_batches_buf.is_empty() || callback.has_deferred_input() {
740 let wm = callback.current_watermark();
741 match callback.execute_cycle(&self.source_batches_buf, wm).await {
742 Ok(results) => {
743 self.commit_pending_offsets();
744 callback.update_mv_stores(&results);
745 callback.push_to_streams(&results);
746 callback.write_to_sinks(&results).await;
747 }
748 Err(e) => {
749 self.discard_pending_offsets();
750 tracing::warn!(error = %e, "[LDB-3020] SQL cycle error during shutdown drain");
751 }
752 }
753 }
754
755 self.source_batches_buf.clear();
758 self.barrier_seen.clear();
759 self.discard_pending_offsets();
760 drain_barriers.clear();
761 while let Ok(msg) = self.rx.try_recv() {
762 self.process_msg(msg, &mut callback, &mut drain_barriers, &mut drain_events);
763 }
764 for (name, batch) in self.pending_watermark_batches.drain(..) {
765 callback.extract_watermark(&name, &batch);
766 }
767 callback.tick_idle_watermark();
768 if !self.source_batches_buf.is_empty() || callback.has_deferred_input() {
769 let wm = callback.current_watermark();
770 match callback.execute_cycle(&self.source_batches_buf, wm).await {
771 Ok(results) => {
772 self.commit_pending_offsets();
773 callback.update_mv_stores(&results);
774 callback.push_to_streams(&results);
775 callback.write_to_sinks(&results).await;
776 }
777 Err(e) => {
778 self.discard_pending_offsets();
779 tracing::warn!(error = %e, "[LDB-3020] SQL cycle error during final drain");
780 }
781 }
782 }
783
784 let checkpoint_enabled = self.config.checkpoint_interval.is_some();
789 if checkpoint_enabled {
790 let source_offsets: FxHashMap<String, SourceCheckpoint> = self
791 .committed_offsets
792 .iter()
793 .enumerate()
794 .filter_map(|(idx, cp)| {
795 cp.as_ref().and_then(|c| {
796 self.source_names
797 .get(idx)
798 .map(|name| (name.to_string(), c.clone()))
799 })
800 })
801 .collect();
802 if let Some(epoch) = callback
803 .maybe_checkpoint(true, source_offsets.clone())
804 .await
805 {
806 tracing::info!(epoch, "final checkpoint completed before shutdown");
807 self.broadcast_epoch_committed(epoch, &source_offsets);
808 }
809 }
810
811 for handle in std::mem::take(&mut self.source_handles) {
815 let SourceHandle {
816 name,
817 shutdown: _,
818 join,
819 barrier_injector: _,
820 epoch_committed_tx,
821 } = handle;
822 drop(epoch_committed_tx);
823 if let Err(e) = join.await {
824 tracing::warn!(source = %name, error = ?e, "source task panicked");
825 }
826 }
827 }
828
829 fn process_msg(
835 &mut self,
836 msg: SourceMsg,
837 callback: &mut impl PipelineCallback,
838 barriers: &mut Vec<(usize, CheckpointBarrier, SourceCheckpoint)>,
839 cycle_events: &mut u64,
840 ) {
841 match msg {
842 SourceMsg::Batch {
843 source_idx,
844 batch,
845 checkpoint,
846 } => {
847 if self.barrier_seen.contains(&source_idx) {
850 self.post_barrier_buf.push(SourceMsg::Batch {
851 source_idx,
852 batch,
853 checkpoint,
854 });
855 return;
856 }
857
858 if source_idx < self.pending_offsets.len() {
860 self.pending_offsets[source_idx] = Some(checkpoint);
861 }
862
863 if let Some(name) = self.source_names.get(source_idx) {
864 #[allow(clippy::cast_possible_truncation)]
865 {
866 *cycle_events += batch.num_rows() as u64;
867 }
868 if let Some(filtered) = callback.filter_late_rows(name, &batch) {
873 self.source_batches_buf
874 .entry(Arc::clone(name))
875 .or_default()
876 .push(filtered);
877 }
878 self.pending_watermark_batches
879 .push((Arc::clone(name), batch));
880 }
881 }
882 SourceMsg::Barrier {
883 source_idx,
884 barrier,
885 checkpoint,
886 } => {
887 tracing::debug!(
888 source_idx,
889 checkpoint_id = barrier.checkpoint_id,
890 "coordinator received source barrier"
891 );
892 self.barrier_seen.insert(source_idx);
893 barriers.push((source_idx, barrier, checkpoint));
894 }
895 }
896 }
897
898 fn commit_pending_offsets(&mut self) {
901 for (i, pending) in self.pending_offsets.iter_mut().enumerate() {
902 if let Some(cp) = pending.take() {
903 self.committed_offsets[i] = Some(cp);
904 }
905 }
906 }
907
908 fn discard_pending_offsets(&mut self) {
910 for slot in &mut self.pending_offsets {
911 *slot = None;
912 }
913 }
914
915 async fn handle_barrier(
917 &mut self,
918 source_idx: usize,
919 barrier: &CheckpointBarrier,
920 barrier_checkpoint: &SourceCheckpoint,
921 callback: &mut impl PipelineCallback,
922 ) {
923 if !self.pending_barrier.active && !callback.is_leader() {
924 self.pending_barrier
925 .reset(barrier.checkpoint_id, self.source_handles.len());
926 }
927
928 if !self.pending_barrier.active
929 || barrier.checkpoint_id != self.pending_barrier.checkpoint_id
930 {
931 return;
932 }
933
934 if let Some(name) = self.source_names.get(source_idx) {
937 self.pending_barrier
938 .source_checkpoints
939 .insert(name.to_string(), barrier_checkpoint.clone());
940 }
941
942 self.pending_barrier.sources_aligned.insert(source_idx);
943
944 if self.pending_barrier.sources_aligned.len() >= self.pending_barrier.sources_total {
946 let checkpoints = std::mem::take(&mut self.pending_barrier.source_checkpoints);
947 let fan_out = checkpoints.clone();
952 let checkpoint_id = self.pending_barrier.checkpoint_id;
953 match callback
954 .checkpoint_with_barrier(checkpoints, checkpoint_id)
955 .await
956 {
957 BarrierOutcome::Committed(epoch) => {
958 self.broadcast_epoch_committed(epoch, &fan_out);
959 callback.publish_barrier(epoch, checkpoint_id);
961 }
962 BarrierOutcome::Async => {
963 self.pending_barrier.active = false;
964 self.last_checkpoint = Instant::now();
965 }
966 BarrierOutcome::Skipped(reason) => {
967 tracing::debug!(checkpoint_id, reason = %reason, "barrier checkpoint skipped");
968 }
969 BarrierOutcome::Failed => {
970 tracing::warn!(
971 checkpoint_id,
972 "barrier checkpoint failed, will retry on next interval"
973 );
974 }
975 }
976 self.pending_barrier.active = false;
977 self.last_checkpoint = Instant::now();
978 }
979 }
980
981 async fn maybe_checkpoint(&mut self, callback: &mut impl PipelineCallback) {
987 if self.pending_barrier.active {
988 return; }
990 if self.checkpoint_in_flight.load(Ordering::Acquire) >= self.max_in_flight_epochs {
991 return; }
993 if self.staged_bytes.load(Ordering::Acquire) >= self.max_staged_bytes {
994 warn_staged_cap_throttled(
997 self.staged_bytes.load(Ordering::Acquire),
998 self.max_staged_bytes,
999 );
1000 return;
1001 }
1002
1003 let offsets = FxHashMap::default();
1011 if let Some(epoch) = callback.maybe_checkpoint(false, offsets).await {
1012 self.broadcast_epoch_committed(epoch, &FxHashMap::default());
1014 }
1015
1016 let should_checkpoint = callback.is_leader()
1017 && (self
1018 .config
1019 .checkpoint_interval
1020 .is_some_and(|interval| self.last_checkpoint.elapsed() >= interval)
1021 || self
1022 .checkpoint_request_flags
1023 .iter()
1024 .any(|f| f.swap(false, Ordering::AcqRel)));
1025
1026 if !should_checkpoint {
1027 return;
1028 }
1029
1030 if self.source_handles.is_empty() {
1031 let offsets = FxHashMap::default();
1033 if let Some(epoch) = callback.maybe_checkpoint(false, offsets).await {
1034 self.last_checkpoint = Instant::now();
1035 self.broadcast_epoch_committed(epoch, &FxHashMap::default());
1036 }
1037 return;
1038 }
1039
1040 let checkpoint_id = if let Some(external_id) = callback.next_checkpoint_id() {
1042 self.next_checkpoint_id = external_id + 1;
1043 external_id
1044 } else {
1045 let id = self.next_checkpoint_id;
1046 self.next_checkpoint_id += 1;
1047 id
1048 };
1049 self.pending_barrier
1050 .reset(checkpoint_id, self.source_handles.len());
1051
1052 for handle in &self.source_handles {
1053 handle.barrier_injector.trigger(checkpoint_id, 0);
1054 }
1055 }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use super::*;
1061 use arrow::array::Int64Array;
1062 use arrow::datatypes::{DataType, Field, Schema};
1063 use std::sync::Arc;
1064
1065 struct MockCallback {
1067 cycle_count: u32,
1068 results: Vec<FxHashMap<Arc<str>, Vec<RecordBatch>>>,
1069 watermark: i64,
1070 force_checkpoint_flag: Option<Arc<std::sync::atomic::AtomicBool>>,
1072 }
1073
1074 impl MockCallback {
1075 fn new() -> Self {
1076 Self {
1077 cycle_count: 0,
1078 results: Vec::new(),
1079 watermark: 0,
1080 force_checkpoint_flag: None,
1081 }
1082 }
1083 }
1084
1085 impl PipelineCallback for MockCallback {
1086 async fn execute_cycle(
1087 &mut self,
1088 source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
1089 _watermark: i64,
1090 ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String> {
1091 self.cycle_count += 1;
1092 let results: FxHashMap<Arc<str>, Vec<RecordBatch>> = source_batches
1094 .iter()
1095 .map(|(k, v)| (k.clone(), v.clone()))
1096 .collect();
1097 self.results.push(results.clone());
1098 Ok(results)
1099 }
1100
1101 fn push_to_streams(&self, _results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {}
1102 async fn write_to_sinks(&mut self, _results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {}
1103
1104 fn extract_watermark(&mut self, _source_name: &str, batch: &RecordBatch) {
1105 #[allow(clippy::cast_possible_wrap)]
1107 {
1108 self.watermark += batch.num_rows() as i64;
1109 }
1110 }
1111
1112 fn filter_late_rows(&self, _source_name: &str, batch: &RecordBatch) -> Option<RecordBatch> {
1113 Some(batch.clone())
1114 }
1115
1116 fn current_watermark(&self) -> i64 {
1117 self.watermark
1118 }
1119
1120 async fn maybe_checkpoint(
1121 &mut self,
1122 force: bool,
1123 _source_offsets: FxHashMap<String, SourceCheckpoint>,
1124 ) -> Option<u64> {
1125 if force {
1126 if let Some(ref flag) = self.force_checkpoint_flag {
1127 flag.store(true, std::sync::atomic::Ordering::SeqCst);
1128 }
1129 Some(1)
1130 } else {
1131 None
1132 }
1133 }
1134
1135 async fn checkpoint_with_barrier(
1136 &mut self,
1137 _source_checkpoints: FxHashMap<String, SourceCheckpoint>,
1138 _checkpoint_id: u64,
1139 ) -> BarrierOutcome {
1140 BarrierOutcome::Committed(1)
1141 }
1142
1143 fn record_cycle(&self, _events: u64, _batches: u64, _elapsed_ns: u64) {}
1144 async fn poll_tables(&mut self) {}
1145 fn apply_control(&mut self, _msg: crate::pipeline::ControlMsg) {}
1146 }
1147
1148 #[tokio::test]
1150 async fn test_coordinator_direct_channel() {
1151 let shutdown = Arc::new(tokio::sync::Notify::new());
1152 let (tx, rx) = mpsc::bounded_async::<SourceMsg>(64);
1153
1154 let (_control_tx, control_rx) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1156 let coordinator = StreamingCoordinator {
1157 config: PipelineConfig {
1158 batch_window: Duration::ZERO,
1159 max_poll_records: 1000,
1160 channel_capacity: 64,
1161 fallback_poll_interval: Duration::from_millis(10),
1162 checkpoint_interval: None,
1163 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1164 barrier_alignment_timeout: Duration::from_secs(30),
1165 cycle_budget_ns: 10_000_000,
1166 drain_budget_ns: 1_000_000,
1167 query_budget_ns: 8_000_000,
1168 background_budget_ns: 5_000_000,
1169 max_input_buf_batches: 256,
1170 max_input_buf_bytes: None,
1171 backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1172 },
1173 rx,
1174 source_handles: Vec::new(),
1175 source_names: vec![Arc::from("test_source")],
1176 shutdown: Arc::clone(&shutdown),
1177 pending_barrier: PendingBarrier::new(),
1178 next_checkpoint_id: 1,
1179 last_checkpoint: Instant::now(),
1180 checkpoint_request_flags: Vec::new(),
1181 source_batches_buf: FxHashMap::default(),
1182 post_barrier_buf: Vec::new(),
1183 pending_watermark_batches: Vec::new(),
1184 barrier_seen: FxHashSet::default(),
1185 committed_offsets: vec![None],
1186 pending_offsets: vec![None],
1187 control_rx,
1188 checkpoint_complete_rx: None,
1189 checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1190 max_in_flight_epochs: 1,
1191 staged_bytes: Arc::new(AtomicU64::new(0)),
1192 max_staged_bytes: u64::MAX,
1193 };
1194
1195 let callback = MockCallback::new();
1196
1197 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
1199 let batch =
1200 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
1201 tx.send(SourceMsg::Batch {
1202 source_idx: 0,
1203 batch,
1204 checkpoint: SourceCheckpoint::new(1),
1205 })
1206 .await
1207 .unwrap();
1208
1209 let shutdown_clone = Arc::clone(&shutdown);
1211 tokio::spawn(async move {
1212 tokio::time::sleep(Duration::from_millis(50)).await;
1213 shutdown_clone.notify_one();
1214 });
1215
1216 coordinator.run(callback).await;
1218
1219 }
1222
1223 #[tokio::test]
1226 async fn test_final_checkpoint_on_shutdown() {
1227 let shutdown = Arc::new(tokio::sync::Notify::new());
1228 let (tx, rx) = mpsc::bounded_async::<SourceMsg>(64);
1229 let (_control_tx, control_rx) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1230
1231 let coordinator = StreamingCoordinator {
1232 config: PipelineConfig {
1233 batch_window: Duration::ZERO,
1234 max_poll_records: 1000,
1235 channel_capacity: 64,
1236 fallback_poll_interval: Duration::from_millis(10),
1237 checkpoint_interval: Some(Duration::from_secs(60)),
1238 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1239 barrier_alignment_timeout: Duration::from_secs(30),
1240 cycle_budget_ns: 10_000_000,
1241 drain_budget_ns: 1_000_000,
1242 query_budget_ns: 8_000_000,
1243 background_budget_ns: 5_000_000,
1244 max_input_buf_batches: 256,
1245 max_input_buf_bytes: None,
1246 backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1247 },
1248 rx,
1249 source_handles: Vec::new(),
1250 source_names: vec![Arc::from("test_source")],
1251 shutdown: Arc::clone(&shutdown),
1252 pending_barrier: PendingBarrier::new(),
1253 next_checkpoint_id: 1,
1254 last_checkpoint: Instant::now(),
1255 checkpoint_request_flags: Vec::new(),
1256 source_batches_buf: FxHashMap::default(),
1257 post_barrier_buf: Vec::new(),
1258 pending_watermark_batches: Vec::new(),
1259 barrier_seen: FxHashSet::default(),
1260 committed_offsets: vec![None],
1261 pending_offsets: vec![None],
1262 control_rx,
1263 checkpoint_complete_rx: None,
1264 checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1265 max_in_flight_epochs: 1,
1266 staged_bytes: Arc::new(AtomicU64::new(0)),
1267 max_staged_bytes: u64::MAX,
1268 };
1269
1270 let force_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
1271 let mut callback = MockCallback::new();
1272 callback.force_checkpoint_flag = Some(Arc::clone(&force_flag));
1273
1274 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
1275 let batch =
1276 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
1277 tx.send(SourceMsg::Batch {
1278 source_idx: 0,
1279 batch,
1280 checkpoint: SourceCheckpoint::new(1),
1281 })
1282 .await
1283 .unwrap();
1284
1285 let shutdown_clone = Arc::clone(&shutdown);
1286 tokio::spawn(async move {
1287 tokio::time::sleep(Duration::from_millis(50)).await;
1288 shutdown_clone.notify_one();
1289 });
1290
1291 coordinator.run(callback).await;
1292
1293 assert!(
1294 force_flag.load(std::sync::atomic::Ordering::SeqCst),
1295 "final checkpoint with force=true should have been called"
1296 );
1297 }
1298
1299 #[tokio::test]
1302 #[allow(clippy::too_many_lines, clippy::similar_names)]
1303 async fn test_barrier_excludes_post_barrier_data() {
1304 let shutdown = Arc::new(tokio::sync::Notify::new());
1305 let schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, false)]));
1306
1307 let (_control_tx2, control_rx2) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1308 let mut coordinator = StreamingCoordinator {
1309 config: PipelineConfig {
1310 batch_window: Duration::ZERO,
1311 max_poll_records: 1000,
1312 channel_capacity: 64,
1313 fallback_poll_interval: Duration::from_millis(10),
1314 checkpoint_interval: None,
1315 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1316 barrier_alignment_timeout: Duration::from_secs(30),
1317 cycle_budget_ns: 10_000_000,
1318 drain_budget_ns: 1_000_000,
1319 query_budget_ns: 8_000_000,
1320 background_budget_ns: 5_000_000,
1321 max_input_buf_batches: 256,
1322 max_input_buf_bytes: None,
1323 backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1324 },
1325 rx: mpsc::bounded_async::<SourceMsg>(64).1, source_handles: Vec::new(),
1327 source_names: vec![Arc::from("s0"), Arc::from("s1")],
1328 shutdown: Arc::clone(&shutdown),
1329 pending_barrier: PendingBarrier::new(),
1330 next_checkpoint_id: 1,
1331 last_checkpoint: Instant::now(),
1332 checkpoint_request_flags: Vec::new(),
1333 source_batches_buf: FxHashMap::default(),
1334 post_barrier_buf: Vec::new(),
1335 pending_watermark_batches: Vec::new(),
1336 barrier_seen: FxHashSet::default(),
1337 committed_offsets: vec![None, None],
1338 pending_offsets: vec![None, None],
1339 control_rx: control_rx2,
1340 checkpoint_complete_rx: None,
1341 checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1342 max_in_flight_epochs: 1,
1343 staged_bytes: Arc::new(AtomicU64::new(0)),
1344 max_staged_bytes: u64::MAX,
1345 };
1346
1347 let mut callback = MockCallback::new();
1348 let mut barriers = Vec::new();
1349 let mut cycle_events: u64 = 0;
1350
1351 let batch_1 = RecordBatch::try_new(
1353 Arc::clone(&schema),
1354 vec![Arc::new(Int64Array::from(vec![1]))],
1355 )
1356 .unwrap();
1357 let batch_2 = RecordBatch::try_new(
1358 Arc::clone(&schema),
1359 vec![Arc::new(Int64Array::from(vec![2]))],
1360 )
1361 .unwrap();
1362 let barrier = CheckpointBarrier::new(1, 0);
1363
1364 coordinator.process_msg(
1365 SourceMsg::Batch {
1366 source_idx: 0,
1367 batch: batch_1,
1368 checkpoint: SourceCheckpoint::new(10),
1369 },
1370 &mut callback,
1371 &mut barriers,
1372 &mut cycle_events,
1373 );
1374 coordinator.process_msg(
1375 SourceMsg::Barrier {
1376 source_idx: 0,
1377 barrier,
1378 checkpoint: SourceCheckpoint::new(10),
1379 },
1380 &mut callback,
1381 &mut barriers,
1382 &mut cycle_events,
1383 );
1384 coordinator.process_msg(
1385 SourceMsg::Batch {
1386 source_idx: 0,
1387 batch: batch_2,
1388 checkpoint: SourceCheckpoint::new(20),
1389 },
1390 &mut callback,
1391 &mut barriers,
1392 &mut cycle_events,
1393 );
1394
1395 let batch_s1 = RecordBatch::try_new(
1397 Arc::clone(&schema),
1398 vec![Arc::new(Int64Array::from(vec![1]))],
1399 )
1400 .unwrap();
1401 coordinator.process_msg(
1402 SourceMsg::Batch {
1403 source_idx: 1,
1404 batch: batch_s1,
1405 checkpoint: SourceCheckpoint::new(5),
1406 },
1407 &mut callback,
1408 &mut barriers,
1409 &mut cycle_events,
1410 );
1411 coordinator.process_msg(
1412 SourceMsg::Barrier {
1413 source_idx: 1,
1414 barrier,
1415 checkpoint: SourceCheckpoint::new(5),
1416 },
1417 &mut callback,
1418 &mut barriers,
1419 &mut cycle_events,
1420 );
1421
1422 let s0_batches = coordinator.source_batches_buf.get("s0").unwrap();
1425 assert_eq!(
1426 s0_batches.len(),
1427 1,
1428 "s0 should have exactly 1 pre-barrier batch"
1429 );
1430 let s0_col = s0_batches[0]
1431 .column(0)
1432 .as_any()
1433 .downcast_ref::<Int64Array>()
1434 .unwrap();
1435 assert_eq!(s0_col.value(0), 1, "s0 batch should contain ts=1");
1436
1437 let s1_batches = coordinator.source_batches_buf.get("s1").unwrap();
1438 assert_eq!(s1_batches.len(), 1, "s1 should have exactly 1 batch");
1439
1440 assert_eq!(
1442 coordinator.post_barrier_buf.len(),
1443 1,
1444 "post_barrier_buf should have 1 deferred batch"
1445 );
1446
1447 assert_eq!(
1449 coordinator.pending_offsets[0].as_ref().unwrap().epoch(),
1450 10,
1451 "s0 pending offset should be the pre-barrier batch"
1452 );
1453 assert_eq!(
1454 coordinator.pending_offsets[1].as_ref().unwrap().epoch(),
1455 5,
1456 "s1 pending offset should be epoch 5"
1457 );
1458 assert!(
1460 coordinator.committed_offsets[0].is_none(),
1461 "s0 committed offset should be None before execute_cycle"
1462 );
1463 assert!(
1464 coordinator.committed_offsets[1].is_none(),
1465 "s1 committed offset should be None before execute_cycle"
1466 );
1467
1468 coordinator.commit_pending_offsets();
1470 assert_eq!(
1471 coordinator.committed_offsets[0].as_ref().unwrap().epoch(),
1472 10,
1473 "s0 committed after cycle"
1474 );
1475 assert_eq!(
1476 coordinator.committed_offsets[1].as_ref().unwrap().epoch(),
1477 5,
1478 "s1 committed after cycle"
1479 );
1480
1481 assert_eq!(barriers.len(), 2, "should have barriers from both sources");
1483 }
1484
1485 #[allow(clippy::disallowed_types)] struct BackpressuredCallback {
1487 inner: MockCallback,
1488 cycle_count: Arc<std::sync::atomic::AtomicU32>,
1489 events_per_cycle: Arc<std::sync::Mutex<Vec<u64>>>,
1490 }
1491
1492 impl BackpressuredCallback {
1493 #[allow(clippy::disallowed_types)]
1494 fn new(
1495 cycle_count: Arc<std::sync::atomic::AtomicU32>,
1496 events_per_cycle: Arc<std::sync::Mutex<Vec<u64>>>,
1497 ) -> Self {
1498 Self {
1499 inner: MockCallback::new(),
1500 cycle_count,
1501 events_per_cycle,
1502 }
1503 }
1504 }
1505
1506 impl PipelineCallback for BackpressuredCallback {
1507 async fn execute_cycle(
1508 &mut self,
1509 source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
1510 watermark: i64,
1511 ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String> {
1512 self.cycle_count
1513 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1514 let total: u64 = source_batches
1515 .values()
1516 .flat_map(|bs| bs.iter())
1517 .map(|b| b.num_rows() as u64)
1518 .sum();
1519 self.events_per_cycle.lock().unwrap().push(total);
1520 self.inner.execute_cycle(source_batches, watermark).await
1521 }
1522
1523 fn push_to_streams(&self, r: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
1524 self.inner.push_to_streams(r);
1525 }
1526 async fn write_to_sinks(&mut self, r: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
1527 self.inner.write_to_sinks(r).await;
1528 }
1529 fn extract_watermark(&mut self, s: &str, b: &RecordBatch) {
1530 self.inner.extract_watermark(s, b);
1531 }
1532 fn filter_late_rows(&self, s: &str, b: &RecordBatch) -> Option<RecordBatch> {
1533 self.inner.filter_late_rows(s, b)
1534 }
1535 fn current_watermark(&self) -> i64 {
1536 self.inner.current_watermark()
1537 }
1538 async fn maybe_checkpoint(
1539 &mut self,
1540 force: bool,
1541 offsets: FxHashMap<String, SourceCheckpoint>,
1542 ) -> Option<u64> {
1543 self.inner.maybe_checkpoint(force, offsets).await
1544 }
1545 async fn checkpoint_with_barrier(
1546 &mut self,
1547 cp: FxHashMap<String, SourceCheckpoint>,
1548 checkpoint_id: u64,
1549 ) -> BarrierOutcome {
1550 self.inner.checkpoint_with_barrier(cp, checkpoint_id).await
1551 }
1552 fn record_cycle(&self, e: u64, b: u64, ns: u64) {
1553 self.inner.record_cycle(e, b, ns);
1554 }
1555 async fn poll_tables(&mut self) {
1556 self.inner.poll_tables().await;
1557 }
1558 fn apply_control(&mut self, msg: crate::pipeline::ControlMsg) {
1559 self.inner.apply_control(msg);
1560 }
1561
1562 fn is_backpressured(&self) -> bool {
1563 true }
1565 }
1566
1567 #[tokio::test]
1572 async fn test_drain_skip_under_backpressure() {
1573 let shutdown = Arc::new(tokio::sync::Notify::new());
1574 let (tx, rx) = mpsc::bounded_async::<SourceMsg>(64);
1575 let (_control_tx, control_rx) = mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1576
1577 let coordinator = StreamingCoordinator {
1578 config: PipelineConfig {
1579 batch_window: Duration::ZERO,
1580 max_poll_records: 1000,
1581 channel_capacity: 64,
1582 fallback_poll_interval: Duration::from_millis(10),
1583 checkpoint_interval: None,
1584 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
1585 barrier_alignment_timeout: Duration::from_secs(30),
1586 cycle_budget_ns: 10_000_000,
1587 drain_budget_ns: 1_000_000,
1588 query_budget_ns: 8_000_000,
1589 background_budget_ns: 5_000_000,
1590 max_input_buf_batches: 256,
1591 max_input_buf_bytes: None,
1592 backpressure_policy: crate::config::BackpressurePolicy::Backpressure,
1593 },
1594 rx,
1595 source_handles: Vec::new(),
1596 source_names: vec![Arc::from("src")],
1597 shutdown: Arc::clone(&shutdown),
1598 pending_barrier: PendingBarrier::new(),
1599 next_checkpoint_id: 1,
1600 last_checkpoint: Instant::now(),
1601 checkpoint_request_flags: Vec::new(),
1602 source_batches_buf: FxHashMap::default(),
1603 post_barrier_buf: Vec::new(),
1604 pending_watermark_batches: Vec::new(),
1605 barrier_seen: FxHashSet::default(),
1606 committed_offsets: vec![None],
1607 pending_offsets: vec![None],
1608 control_rx,
1609 checkpoint_complete_rx: None,
1610 checkpoint_in_flight: Arc::new(AtomicU64::new(0)),
1611 max_in_flight_epochs: 1,
1612 staged_bytes: Arc::new(AtomicU64::new(0)),
1613 max_staged_bytes: u64::MAX,
1614 };
1615
1616 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
1617
1618 for i in 0..5 {
1620 let batch = RecordBatch::try_new(
1621 Arc::clone(&schema),
1622 vec![Arc::new(Int64Array::from(vec![i]))],
1623 )
1624 .unwrap();
1625 tx.send(SourceMsg::Batch {
1626 source_idx: 0,
1627 batch,
1628 checkpoint: SourceCheckpoint::new(u64::try_from(i).unwrap()),
1629 })
1630 .await
1631 .unwrap();
1632 }
1633
1634 let shutdown_clone = Arc::clone(&shutdown);
1635 tokio::spawn(async move {
1636 tokio::time::sleep(Duration::from_millis(300)).await;
1637 shutdown_clone.notify_one();
1638 });
1639
1640 let cycle_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
1641 #[allow(clippy::disallowed_types)]
1642 let events_per_cycle = Arc::new(std::sync::Mutex::new(Vec::new()));
1643 let callback =
1644 BackpressuredCallback::new(Arc::clone(&cycle_count), Arc::clone(&events_per_cycle));
1645 coordinator.run(callback).await;
1646
1647 let cycles = cycle_count.load(std::sync::atomic::Ordering::SeqCst);
1648 let epc = events_per_cycle.lock().unwrap();
1649 let total: u64 = epc.iter().sum();
1650
1651 assert_eq!(total, 5, "all events must be processed, got {total}");
1653 assert!(
1657 cycles >= 5,
1658 "expected >=5 cycles (1 event each), got {cycles} cycles with events/cycle: {epc:?}"
1659 );
1660 for (i, &events) in epc.iter().enumerate() {
1662 assert!(
1663 events <= 1,
1664 "cycle {i} saw {events} events, expected <=1 under backpressure"
1665 );
1666 }
1667 }
1668}