1use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Instant;
16
17use arrow_array::RecordBatch;
18use smallvec::SmallVec;
19
20use super::bridge::{BridgeError, RowBatchBridge};
21use super::policy::{BackpressureStrategy, BatchPolicy};
22use super::row::{EventRow, RowSchema};
23use crate::tpc::SpscQueue;
24
25#[derive(Debug, thiserror::Error)]
29pub enum PipelineBridgeError {
30 #[error("backpressure: dropped {0} event(s)")]
32 Backpressure(usize),
33 #[error("schema mismatch: {0}")]
35 SchemaMismatch(String),
36 #[error("batch formation error: {0}")]
38 BatchFormation(#[from] BridgeError),
39}
40
41#[derive(Debug)]
48#[allow(clippy::large_enum_variant)]
49pub enum BridgeMessage {
50 Event {
52 row_data: SmallVec<[u8; 256]>,
54 event_time: i64,
56 key_hash: u64,
58 },
59 Watermark {
61 timestamp: i64,
63 },
64 CheckpointBarrier {
66 epoch: u64,
68 },
69 Eof,
71}
72
73pub struct BridgeStats {
77 pub events_sent: AtomicU64,
79 pub events_dropped: AtomicU64,
81 pub watermarks_sent: AtomicU64,
83 pub checkpoints_sent: AtomicU64,
85 pub batches_flushed: AtomicU64,
87 pub rows_flushed: AtomicU64,
89}
90
91impl BridgeStats {
92 #[must_use]
94 pub fn new() -> Self {
95 Self {
96 events_sent: AtomicU64::new(0),
97 events_dropped: AtomicU64::new(0),
98 watermarks_sent: AtomicU64::new(0),
99 checkpoints_sent: AtomicU64::new(0),
100 batches_flushed: AtomicU64::new(0),
101 rows_flushed: AtomicU64::new(0),
102 }
103 }
104
105 #[must_use]
107 pub fn snapshot(&self) -> BridgeStatsSnapshot {
108 BridgeStatsSnapshot {
109 events_sent: self.events_sent.load(Ordering::Relaxed),
110 events_dropped: self.events_dropped.load(Ordering::Relaxed),
111 watermarks_sent: self.watermarks_sent.load(Ordering::Relaxed),
112 checkpoints_sent: self.checkpoints_sent.load(Ordering::Relaxed),
113 batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
114 rows_flushed: self.rows_flushed.load(Ordering::Relaxed),
115 }
116 }
117}
118
119impl Default for BridgeStats {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl std::fmt::Debug for BridgeStats {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("BridgeStats")
128 .field("events_sent", &self.events_sent.load(Ordering::Relaxed))
129 .field(
130 "events_dropped",
131 &self.events_dropped.load(Ordering::Relaxed),
132 )
133 .field(
134 "watermarks_sent",
135 &self.watermarks_sent.load(Ordering::Relaxed),
136 )
137 .field(
138 "checkpoints_sent",
139 &self.checkpoints_sent.load(Ordering::Relaxed),
140 )
141 .field(
142 "batches_flushed",
143 &self.batches_flushed.load(Ordering::Relaxed),
144 )
145 .field("rows_flushed", &self.rows_flushed.load(Ordering::Relaxed))
146 .finish()
147 }
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub struct BridgeStatsSnapshot {
153 pub events_sent: u64,
155 pub events_dropped: u64,
157 pub watermarks_sent: u64,
159 pub checkpoints_sent: u64,
161 pub batches_flushed: u64,
163 pub rows_flushed: u64,
165}
166
167pub struct PipelineBridge {
175 queue: Arc<SpscQueue<BridgeMessage>>,
176 output_schema: Arc<RowSchema>,
177 backpressure_strategy: BackpressureStrategy,
178 stats: Arc<BridgeStats>,
179}
180
181impl PipelineBridge {
182 pub fn send_event(
192 &self,
193 row: &EventRow<'_>,
194 event_time: i64,
195 key_hash: u64,
196 ) -> Result<(), PipelineBridgeError> {
197 let data = row.data();
198 debug_assert!(
199 data.len() <= 256,
200 "BridgeMessage::Event row_data spilled to heap ({} bytes > 256 inline capacity)",
201 data.len()
202 );
203 let msg = BridgeMessage::Event {
204 row_data: SmallVec::from_slice(data),
205 event_time,
206 key_hash,
207 };
208 self.try_send(msg)
209 }
210
211 pub fn send_watermark(&self, timestamp: i64) -> Result<(), PipelineBridgeError> {
217 let msg = BridgeMessage::Watermark { timestamp };
218 if self.queue.push(msg).is_err() {
219 return Err(PipelineBridgeError::Backpressure(0));
220 }
221 self.stats.watermarks_sent.fetch_add(1, Ordering::Relaxed);
222 Ok(())
223 }
224
225 pub fn send_checkpoint(&self, epoch: u64) -> Result<(), PipelineBridgeError> {
231 let msg = BridgeMessage::CheckpointBarrier { epoch };
232 if self.queue.push(msg).is_err() {
233 return Err(PipelineBridgeError::Backpressure(0));
234 }
235 self.stats.checkpoints_sent.fetch_add(1, Ordering::Relaxed);
236 Ok(())
237 }
238
239 pub fn send_eof(&self) -> Result<(), PipelineBridgeError> {
245 if self.queue.push(BridgeMessage::Eof).is_err() {
246 return Err(PipelineBridgeError::Backpressure(0));
247 }
248 Ok(())
249 }
250
251 #[must_use]
253 pub fn has_capacity(&self) -> bool {
254 !self.queue.is_full()
255 }
256
257 #[must_use]
259 pub fn is_backpressured(&self) -> bool {
260 self.queue.is_full()
261 }
262
263 #[must_use]
265 pub fn stats(&self) -> &Arc<BridgeStats> {
266 &self.stats
267 }
268
269 #[must_use]
271 pub fn output_schema(&self) -> &Arc<RowSchema> {
272 &self.output_schema
273 }
274
275 fn try_send(&self, msg: BridgeMessage) -> Result<(), PipelineBridgeError> {
277 if self.queue.push(msg).is_ok() {
278 self.stats.events_sent.fetch_add(1, Ordering::Relaxed);
279 return Ok(());
280 }
281 match &self.backpressure_strategy {
283 BackpressureStrategy::DropNewest => {
284 self.stats.events_dropped.fetch_add(1, Ordering::Relaxed);
285 Err(PipelineBridgeError::Backpressure(1))
286 }
287 BackpressureStrategy::PauseSource => Err(PipelineBridgeError::Backpressure(0)),
288 BackpressureStrategy::SpillToDisk { .. } => {
289 self.stats.events_dropped.fetch_add(1, Ordering::Relaxed);
291 Err(PipelineBridgeError::Backpressure(1))
292 }
293 }
294 }
295}
296
297impl std::fmt::Debug for PipelineBridge {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 f.debug_struct("PipelineBridge")
300 .field("backpressure_strategy", &self.backpressure_strategy)
301 .field("stats", &self.stats)
302 .finish_non_exhaustive()
303 }
304}
305
306pub struct BridgeConsumer {
314 queue: Arc<SpscQueue<BridgeMessage>>,
315 batch_bridge: RowBatchBridge,
316 row_schema: Arc<RowSchema>,
317 policy: BatchPolicy,
318 current_watermark: i64,
319 batch_start_time: Option<Instant>,
320 stats: Arc<BridgeStats>,
321}
322
323impl BridgeConsumer {
324 pub fn drain(&mut self) -> SmallVec<[Ring1Action; 4]> {
332 let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
333
334 while let Some(msg) = self.queue.pop() {
335 match msg {
336 BridgeMessage::Event {
337 row_data,
338 event_time: _,
339 key_hash: _,
340 } => {
341 self.append_event_row(&row_data);
342 if self.batch_bridge.row_count() >= self.policy.max_rows {
343 self.flush_batch(&mut actions);
344 }
345 }
346 BridgeMessage::Watermark { timestamp } => {
347 if self.policy.flush_on_watermark && self.batch_bridge.row_count() > 0 {
348 self.flush_batch(&mut actions);
349 }
350 self.current_watermark = timestamp;
351 actions.push(Ring1Action::AdvanceWatermark(timestamp));
352 }
353 BridgeMessage::CheckpointBarrier { epoch } => {
354 if self.batch_bridge.row_count() > 0 {
355 self.flush_batch(&mut actions);
356 }
357 actions.push(Ring1Action::Checkpoint(epoch));
358 }
359 BridgeMessage::Eof => {
360 if self.batch_bridge.row_count() > 0 {
361 self.flush_batch(&mut actions);
362 }
363 actions.push(Ring1Action::Eof);
364 }
365 }
366 }
367
368 actions
369 }
370
371 pub fn check_latency_flush(&mut self) -> Option<Ring1Action> {
376 if self.batch_bridge.row_count() == 0 {
377 return None;
378 }
379 let start = self.batch_start_time?;
380 if start.elapsed() >= self.policy.max_latency {
381 let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
382 self.flush_batch(&mut actions);
383 actions.into_iter().next()
384 } else {
385 None
386 }
387 }
388
389 #[must_use]
391 pub fn current_watermark(&self) -> i64 {
392 self.current_watermark
393 }
394
395 #[must_use]
397 pub fn pending_rows(&self) -> usize {
398 self.batch_bridge.row_count()
399 }
400
401 #[must_use]
403 pub fn stats(&self) -> &Arc<BridgeStats> {
404 &self.stats
405 }
406
407 fn append_event_row(&mut self, row_data: &[u8]) {
409 let row = EventRow::new(row_data, &self.row_schema);
410 self.batch_bridge
413 .append_row(&row)
414 .expect("BridgeConsumer: batch bridge overflow (capacity < max_rows?)");
415 if self.batch_start_time.is_none() {
416 self.batch_start_time = Some(Instant::now());
417 }
418 }
419
420 fn flush_batch(&mut self, actions: &mut SmallVec<[Ring1Action; 4]>) {
422 let row_count = self.batch_bridge.row_count();
423 if row_count == 0 {
424 return;
425 }
426 let batch = self.batch_bridge.flush();
427 self.batch_start_time = None;
428 self.stats.batches_flushed.fetch_add(1, Ordering::Relaxed);
429 self.stats
430 .rows_flushed
431 .fetch_add(row_count as u64, Ordering::Relaxed);
432 actions.push(Ring1Action::ProcessBatch(batch));
433 }
434}
435
436impl std::fmt::Debug for BridgeConsumer {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct("BridgeConsumer")
439 .field("current_watermark", &self.current_watermark)
440 .field("pending_rows", &self.batch_bridge.row_count())
441 .field("policy", &self.policy)
442 .field("stats", &self.stats)
443 .finish_non_exhaustive()
444 }
445}
446
447#[derive(Debug)]
451pub enum Ring1Action {
452 ProcessBatch(RecordBatch),
454 AdvanceWatermark(i64),
456 Checkpoint(u64),
458 Eof,
460}
461
462pub fn create_pipeline_bridge(
479 schema: Arc<RowSchema>,
480 queue_capacity: usize,
481 batch_capacity: usize,
482 policy: BatchPolicy,
483 strategy: BackpressureStrategy,
484) -> Result<(PipelineBridge, BridgeConsumer), PipelineBridgeError> {
485 if batch_capacity < policy.max_rows {
486 return Err(PipelineBridgeError::SchemaMismatch(format!(
487 "batch_capacity ({batch_capacity}) must be >= policy.max_rows ({})",
488 policy.max_rows
489 )));
490 }
491
492 let arrow_schema = schema.arrow_schema().clone();
493 let batch_bridge = RowBatchBridge::new(arrow_schema, batch_capacity)?;
494 let queue = Arc::new(SpscQueue::new(queue_capacity));
495 let stats = Arc::new(BridgeStats::new());
496
497 let producer = PipelineBridge {
498 queue: Arc::clone(&queue),
499 output_schema: Arc::clone(&schema),
500 backpressure_strategy: strategy,
501 stats: Arc::clone(&stats),
502 };
503
504 let consumer = BridgeConsumer {
505 queue,
506 batch_bridge,
507 row_schema: schema,
508 policy,
509 current_watermark: i64::MIN,
510 batch_start_time: None,
511 stats,
512 };
513
514 Ok((producer, consumer))
515}
516
517#[cfg(test)]
518#[allow(
519 clippy::approx_constant,
520 clippy::cast_precision_loss,
521 clippy::cast_possible_wrap
522)]
523mod tests {
524 use super::*;
525 use crate::compiler::row::MutableEventRow;
526 use arrow_array::{Array, Float64Array, Int64Array, StringArray};
527 use arrow_schema::{DataType, Field, Schema};
528 use bumpalo::Bump;
529 use std::sync::Arc;
530
531 fn make_arrow_schema(fields: Vec<(&str, DataType, bool)>) -> arrow_schema::SchemaRef {
534 Arc::new(Schema::new(
535 fields
536 .into_iter()
537 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
538 .collect::<Vec<_>>(),
539 ))
540 }
541
542 fn make_row_schema(fields: Vec<(&str, DataType, bool)>) -> Arc<RowSchema> {
543 let arrow = make_arrow_schema(fields);
544 Arc::new(RowSchema::from_arrow(&arrow).unwrap())
545 }
546
547 fn default_bridge(schema: Arc<RowSchema>) -> (PipelineBridge, BridgeConsumer) {
548 create_pipeline_bridge(
549 schema,
550 64,
551 1024,
552 BatchPolicy::default(),
553 BackpressureStrategy::DropNewest,
554 )
555 .unwrap()
556 }
557
558 fn small_bridge(
559 schema: Arc<RowSchema>,
560 max_rows: usize,
561 queue_cap: usize,
562 ) -> (PipelineBridge, BridgeConsumer) {
563 create_pipeline_bridge(
564 schema,
565 queue_cap,
566 max_rows.max(1),
567 BatchPolicy::default().with_max_rows(max_rows.max(1)),
568 BackpressureStrategy::DropNewest,
569 )
570 .unwrap()
571 }
572
573 fn send_one_event(producer: &PipelineBridge, schema: &RowSchema, ts: i64, val: f64) {
574 let arena = Bump::new();
575 let mut row = MutableEventRow::new_in(&arena, schema, 0);
576 row.set_i64(0, ts);
577 row.set_f64(1, val);
578 let row = row.freeze();
579 producer.send_event(&row, ts, 0).unwrap();
580 }
581
582 #[test]
585 fn factory_basic_schema() {
586 let schema = make_row_schema(vec![
587 ("ts", DataType::Int64, false),
588 ("val", DataType::Float64, true),
589 ]);
590 let (producer, consumer) = default_bridge(schema);
591 assert!(producer.has_capacity());
592 assert_eq!(consumer.pending_rows(), 0);
593 assert_eq!(consumer.current_watermark(), i64::MIN);
594 }
595
596 #[test]
597 fn factory_mixed_types() {
598 let schema = make_row_schema(vec![
599 ("ts", DataType::Int64, false),
600 ("name", DataType::Utf8, true),
601 ("flag", DataType::Boolean, false),
602 ]);
603 let result = create_pipeline_bridge(
604 schema,
605 32,
606 1024,
607 BatchPolicy::default(),
608 BackpressureStrategy::DropNewest,
609 );
610 assert!(result.is_ok());
611 }
612
613 #[test]
614 fn factory_batch_capacity_too_small() {
615 let schema = make_row_schema(vec![("x", DataType::Int64, false)]);
616 let result = create_pipeline_bridge(
617 schema,
618 32,
619 10, BatchPolicy::default(),
621 BackpressureStrategy::DropNewest,
622 );
623 assert!(result.is_err());
624 let err = result.unwrap_err();
625 assert!(matches!(err, PipelineBridgeError::SchemaMismatch(_)));
626 }
627
628 #[test]
631 fn send_event_basic() {
632 let schema = make_row_schema(vec![
633 ("ts", DataType::Int64, false),
634 ("val", DataType::Float64, true),
635 ]);
636 let (producer, _consumer) = default_bridge(Arc::clone(&schema));
637 send_one_event(&producer, &schema, 1000, 3.14);
638
639 let snap = producer.stats().snapshot();
640 assert_eq!(snap.events_sent, 1);
641 assert_eq!(snap.events_dropped, 0);
642 }
643
644 #[test]
645 fn send_event_row_data_matches() {
646 let schema = make_row_schema(vec![
647 ("ts", DataType::Int64, false),
648 ("val", DataType::Float64, true),
649 ]);
650 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
651
652 let arena = Bump::new();
653 let mut row = MutableEventRow::new_in(&arena, &schema, 0);
654 row.set_i64(0, 42);
655 row.set_f64(1, 2.718);
656 let row = row.freeze();
657 let original_data: Vec<u8> = row.data().to_vec();
658
659 producer.send_event(&row, 42, 0).unwrap();
660
661 let actions = consumer.drain();
663 assert!(actions.is_empty());
665 assert_eq!(consumer.pending_rows(), 1);
666
667 producer.send_watermark(100).unwrap();
669 let actions = consumer.drain();
670
671 assert_eq!(actions.len(), 2); if let Ring1Action::ProcessBatch(batch) = &actions[0] {
673 assert_eq!(batch.num_rows(), 1);
674 let col0 = batch
675 .column(0)
676 .as_any()
677 .downcast_ref::<Int64Array>()
678 .unwrap();
679 assert_eq!(col0.value(0), 42);
680 let col1 = batch
681 .column(1)
682 .as_any()
683 .downcast_ref::<Float64Array>()
684 .unwrap();
685 assert!((col1.value(0) - 2.718).abs() < f64::EPSILON);
686 } else {
687 panic!("expected ProcessBatch, got {:?}", actions[0]);
688 }
689
690 assert!(!original_data.is_empty());
692 }
693
694 #[test]
695 fn send_event_preserves_metadata() {
696 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
697 let (producer, _) = default_bridge(Arc::clone(&schema));
698
699 let arena = Bump::new();
700 let mut row = MutableEventRow::new_in(&arena, &schema, 0);
701 row.set_i64(0, 999);
702 let row = row.freeze();
703 producer.send_event(&row, 999, 0xDEAD).unwrap();
704
705 let snap = producer.stats().snapshot();
706 assert_eq!(snap.events_sent, 1);
707 }
708
709 #[test]
710 fn send_watermark() {
711 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
712 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
713
714 producer.send_watermark(5000).unwrap();
715 let actions = consumer.drain();
716
717 assert_eq!(actions.len(), 1);
718 assert!(matches!(actions[0], Ring1Action::AdvanceWatermark(5000)));
719 assert_eq!(consumer.current_watermark(), 5000);
720
721 let snap = producer.stats().snapshot();
722 assert_eq!(snap.watermarks_sent, 1);
723 }
724
725 #[test]
726 fn send_checkpoint() {
727 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
728 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
729
730 producer.send_checkpoint(7).unwrap();
731 let actions = consumer.drain();
732
733 assert_eq!(actions.len(), 1);
734 assert!(matches!(actions[0], Ring1Action::Checkpoint(7)));
735
736 let snap = producer.stats().snapshot();
737 assert_eq!(snap.checkpoints_sent, 1);
738 }
739
740 #[test]
741 fn send_eof() {
742 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
743 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
744
745 producer.send_eof().unwrap();
746 let actions = consumer.drain();
747
748 assert_eq!(actions.len(), 1);
749 assert!(matches!(actions[0], Ring1Action::Eof));
750 }
751
752 #[test]
753 fn backpressure_drop() {
754 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
755 let (producer, _consumer) = small_bridge(Arc::clone(&schema), 1024, 2);
757
758 let arena = Bump::new();
759 let mut row = MutableEventRow::new_in(&arena, &schema, 0);
760 row.set_i64(0, 1);
761 let row = row.freeze();
762
763 producer.send_event(&row, 1, 0).unwrap();
765
766 let result = producer.send_event(&row, 2, 0);
768 assert!(result.is_err());
769 assert!(matches!(
770 result.unwrap_err(),
771 PipelineBridgeError::Backpressure(1)
772 ));
773
774 let snap = producer.stats().snapshot();
775 assert_eq!(snap.events_dropped, 1);
776 }
777
778 #[test]
779 fn has_capacity_and_backpressure() {
780 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
781 let (producer, _consumer) = small_bridge(Arc::clone(&schema), 1024, 2);
782
783 assert!(producer.has_capacity());
784 assert!(!producer.is_backpressured());
785
786 let arena = Bump::new();
787 let mut row = MutableEventRow::new_in(&arena, &schema, 0);
788 row.set_i64(0, 1);
789 producer.send_event(&row.freeze(), 1, 0).unwrap();
790
791 assert!(!producer.has_capacity());
793 assert!(producer.is_backpressured());
794 }
795
796 #[test]
799 fn drain_empty_queue() {
800 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
801 let (_, mut consumer) = default_bridge(schema);
802 let actions = consumer.drain();
803 assert!(actions.is_empty());
804 }
805
806 #[test]
807 fn drain_single_event_pending() {
808 let schema = make_row_schema(vec![
809 ("ts", DataType::Int64, false),
810 ("val", DataType::Float64, true),
811 ]);
812 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
813 send_one_event(&producer, &schema, 1000, 1.0);
814
815 let actions = consumer.drain();
816 assert!(actions.is_empty());
818 assert_eq!(consumer.pending_rows(), 1);
819 }
820
821 #[test]
822 fn drain_batch_full_flush() {
823 let schema = make_row_schema(vec![
824 ("ts", DataType::Int64, false),
825 ("val", DataType::Float64, true),
826 ]);
827 let (producer, mut consumer) = small_bridge(Arc::clone(&schema), 3, 64);
829
830 for i in 0..3 {
831 send_one_event(&producer, &schema, i, i as f64);
832 }
833
834 let actions = consumer.drain();
835 assert_eq!(actions.len(), 1);
836 if let Ring1Action::ProcessBatch(batch) = &actions[0] {
837 assert_eq!(batch.num_rows(), 3);
838 } else {
839 panic!("expected ProcessBatch");
840 }
841 assert_eq!(consumer.pending_rows(), 0);
842 }
843
844 #[test]
845 fn drain_watermark_flushes_pending() {
846 let schema = make_row_schema(vec![
847 ("ts", DataType::Int64, false),
848 ("val", DataType::Float64, true),
849 ]);
850 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
851 send_one_event(&producer, &schema, 1000, 1.0);
852 send_one_event(&producer, &schema, 2000, 2.0);
853 producer.send_watermark(3000).unwrap();
854
855 let actions = consumer.drain();
856 assert_eq!(actions.len(), 2);
858 assert!(matches!(actions[0], Ring1Action::ProcessBatch(_)));
859 assert!(matches!(actions[1], Ring1Action::AdvanceWatermark(3000)));
860 }
861
862 #[test]
863 fn drain_watermark_no_flush_when_empty() {
864 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
865 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
866 producer.send_watermark(5000).unwrap();
867
868 let actions = consumer.drain();
869 assert_eq!(actions.len(), 1);
871 assert!(matches!(actions[0], Ring1Action::AdvanceWatermark(5000)));
872 }
873
874 #[test]
875 fn drain_watermark_no_flush_when_disabled() {
876 let schema = make_row_schema(vec![
877 ("ts", DataType::Int64, false),
878 ("val", DataType::Float64, true),
879 ]);
880 let (producer, mut consumer) = create_pipeline_bridge(
881 Arc::clone(&schema),
882 64,
883 1024,
884 BatchPolicy::default().with_flush_on_watermark(false),
885 BackpressureStrategy::DropNewest,
886 )
887 .unwrap();
888
889 send_one_event(&producer, &schema, 1000, 1.0);
890 producer.send_watermark(5000).unwrap();
891
892 let actions = consumer.drain();
893 assert_eq!(actions.len(), 1);
895 assert!(matches!(actions[0], Ring1Action::AdvanceWatermark(5000)));
896 assert_eq!(consumer.pending_rows(), 1);
897 }
898
899 #[test]
900 fn drain_checkpoint_flushes() {
901 let schema = make_row_schema(vec![
902 ("ts", DataType::Int64, false),
903 ("val", DataType::Float64, true),
904 ]);
905 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
906 send_one_event(&producer, &schema, 1000, 1.0);
907 producer.send_checkpoint(42).unwrap();
908
909 let actions = consumer.drain();
910 assert_eq!(actions.len(), 2);
911 assert!(matches!(actions[0], Ring1Action::ProcessBatch(_)));
912 assert!(matches!(actions[1], Ring1Action::Checkpoint(42)));
913 }
914
915 #[test]
916 fn drain_eof_flushes() {
917 let schema = make_row_schema(vec![
918 ("ts", DataType::Int64, false),
919 ("val", DataType::Float64, true),
920 ]);
921 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
922 send_one_event(&producer, &schema, 1000, 1.0);
923 producer.send_eof().unwrap();
924
925 let actions = consumer.drain();
926 assert_eq!(actions.len(), 2);
927 assert!(matches!(actions[0], Ring1Action::ProcessBatch(_)));
928 assert!(matches!(actions[1], Ring1Action::Eof));
929 }
930
931 #[test]
932 fn drain_eof_empty() {
933 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
934 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
935 producer.send_eof().unwrap();
936
937 let actions = consumer.drain();
938 assert_eq!(actions.len(), 1);
939 assert!(matches!(actions[0], Ring1Action::Eof));
940 }
941
942 #[test]
943 fn drain_interleaved_messages() {
944 let schema = make_row_schema(vec![
945 ("ts", DataType::Int64, false),
946 ("val", DataType::Float64, true),
947 ]);
948 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
949
950 send_one_event(&producer, &schema, 100, 1.0);
951 send_one_event(&producer, &schema, 200, 2.0);
952 producer.send_watermark(300).unwrap();
953 send_one_event(&producer, &schema, 400, 3.0);
954 producer.send_checkpoint(1).unwrap();
955 producer.send_eof().unwrap();
956
957 let actions = consumer.drain();
958 assert_eq!(actions.len(), 5);
960 if let Ring1Action::ProcessBatch(b) = &actions[0] {
961 assert_eq!(b.num_rows(), 2);
962 } else {
963 panic!("expected ProcessBatch at 0");
964 }
965 assert!(matches!(actions[1], Ring1Action::AdvanceWatermark(300)));
966 if let Ring1Action::ProcessBatch(b) = &actions[2] {
967 assert_eq!(b.num_rows(), 1);
968 } else {
969 panic!("expected ProcessBatch at 2");
970 }
971 assert!(matches!(actions[3], Ring1Action::Checkpoint(1)));
972 assert!(matches!(actions[4], Ring1Action::Eof));
973 }
974
975 #[test]
976 fn drain_data_correctness() {
977 let schema = make_row_schema(vec![
978 ("ts", DataType::Int64, false),
979 ("val", DataType::Float64, true),
980 ]);
981 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
982
983 for i in 0..5 {
984 send_one_event(&producer, &schema, i * 100, i as f64 * 1.1);
985 }
986 producer.send_watermark(600).unwrap();
987
988 let actions = consumer.drain();
989 assert_eq!(actions.len(), 2);
990 if let Ring1Action::ProcessBatch(batch) = &actions[0] {
991 assert_eq!(batch.num_rows(), 5);
992 let ts_col = batch
993 .column(0)
994 .as_any()
995 .downcast_ref::<Int64Array>()
996 .unwrap();
997 let val_col = batch
998 .column(1)
999 .as_any()
1000 .downcast_ref::<Float64Array>()
1001 .unwrap();
1002 for i in 0..5 {
1003 assert_eq!(ts_col.value(i), (i as i64) * 100);
1004 assert!((val_col.value(i) - (i as f64) * 1.1).abs() < 1e-10);
1005 }
1006 } else {
1007 panic!("expected ProcessBatch");
1008 }
1009 }
1010
1011 #[test]
1012 fn drain_null_fields() {
1013 let schema = make_row_schema(vec![
1014 ("ts", DataType::Int64, false),
1015 ("name", DataType::Utf8, true),
1016 ]);
1017 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
1018
1019 let arena = Bump::new();
1020 let mut row = MutableEventRow::new_in(&arena, &schema, 64);
1021 row.set_i64(0, 1000);
1022 row.set_null(1, true);
1023 let row = row.freeze();
1024 producer.send_event(&row, 1000, 0).unwrap();
1025 producer.send_watermark(2000).unwrap();
1026
1027 let actions = consumer.drain();
1028 assert_eq!(actions.len(), 2);
1029 if let Ring1Action::ProcessBatch(batch) = &actions[0] {
1030 let name_col = batch
1031 .column(1)
1032 .as_any()
1033 .downcast_ref::<StringArray>()
1034 .unwrap();
1035 assert!(name_col.is_null(0));
1036 } else {
1037 panic!("expected ProcessBatch");
1038 }
1039 }
1040
1041 #[test]
1044 fn latency_flush_none_when_empty() {
1045 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
1046 let (_, mut consumer) = default_bridge(schema);
1047 assert!(consumer.check_latency_flush().is_none());
1048 }
1049
1050 #[test]
1051 fn latency_flush_none_when_fresh() {
1052 let schema = make_row_schema(vec![
1053 ("ts", DataType::Int64, false),
1054 ("val", DataType::Float64, true),
1055 ]);
1056 let (producer, mut consumer) = create_pipeline_bridge(
1057 Arc::clone(&schema),
1058 64,
1059 1024,
1060 BatchPolicy::default().with_max_latency(std::time::Duration::from_secs(60)),
1061 BackpressureStrategy::DropNewest,
1062 )
1063 .unwrap();
1064
1065 send_one_event(&producer, &schema, 1000, 1.0);
1066 consumer.drain(); assert!(consumer.check_latency_flush().is_none());
1068 }
1069
1070 #[test]
1071 fn latency_flush_triggers_after_timeout() {
1072 let schema = make_row_schema(vec![
1073 ("ts", DataType::Int64, false),
1074 ("val", DataType::Float64, true),
1075 ]);
1076 let (producer, mut consumer) = create_pipeline_bridge(
1077 Arc::clone(&schema),
1078 64,
1079 1024,
1080 BatchPolicy::default().with_max_latency(std::time::Duration::from_millis(1)),
1081 BackpressureStrategy::DropNewest,
1082 )
1083 .unwrap();
1084
1085 send_one_event(&producer, &schema, 1000, 1.0);
1086 consumer.drain();
1087 assert_eq!(consumer.pending_rows(), 1);
1088
1089 std::thread::sleep(std::time::Duration::from_millis(5));
1090
1091 let action = consumer.check_latency_flush();
1092 assert!(action.is_some());
1093 assert!(matches!(action.unwrap(), Ring1Action::ProcessBatch(_)));
1094 assert_eq!(consumer.pending_rows(), 0);
1095 }
1096
1097 #[test]
1100 fn stats_shared_between_producer_consumer() {
1101 let schema = make_row_schema(vec![
1102 ("ts", DataType::Int64, false),
1103 ("val", DataType::Float64, true),
1104 ]);
1105 let (producer, consumer) = default_bridge(Arc::clone(&schema));
1106
1107 assert!(Arc::ptr_eq(producer.stats(), consumer.stats()));
1109 }
1110
1111 #[test]
1112 fn stats_snapshot_correctness() {
1113 let schema = make_row_schema(vec![
1114 ("ts", DataType::Int64, false),
1115 ("val", DataType::Float64, true),
1116 ]);
1117 let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
1118
1119 send_one_event(&producer, &schema, 100, 1.0);
1120 send_one_event(&producer, &schema, 200, 2.0);
1121 producer.send_watermark(300).unwrap();
1122 producer.send_checkpoint(1).unwrap();
1123
1124 consumer.drain();
1125
1126 let snap = consumer.stats().snapshot();
1127 assert_eq!(snap.events_sent, 2);
1128 assert_eq!(snap.events_dropped, 0);
1129 assert_eq!(snap.watermarks_sent, 1);
1130 assert_eq!(snap.checkpoints_sent, 1);
1131 assert_eq!(snap.batches_flushed, 1);
1133 assert_eq!(snap.rows_flushed, 2);
1134 }
1135
1136 #[test]
1139 fn concurrent_send_drain() {
1140 let schema = make_row_schema(vec![
1141 ("ts", DataType::Int64, false),
1142 ("val", DataType::Float64, true),
1143 ]);
1144 let (producer, mut consumer) = create_pipeline_bridge(
1145 Arc::clone(&schema),
1146 1024,
1147 1024,
1148 BatchPolicy::default().with_max_rows(1024),
1149 BackpressureStrategy::DropNewest,
1150 )
1151 .unwrap();
1152
1153 let schema_clone = Arc::clone(&schema);
1154 let handle = std::thread::spawn(move || {
1155 for i in 0..100 {
1156 let arena = Bump::new();
1157 let mut row = MutableEventRow::new_in(&arena, &schema_clone, 0);
1158 row.set_i64(0, i);
1159 row.set_f64(1, i as f64);
1160 let row = row.freeze();
1161 let _ = producer.send_event(&row, i, 0);
1162 }
1163 producer.send_eof().unwrap();
1164 });
1165
1166 let mut total_rows = 0;
1167 let mut saw_eof = false;
1168 while !saw_eof {
1169 let actions = consumer.drain();
1170 for action in &actions {
1171 match action {
1172 Ring1Action::ProcessBatch(batch) => total_rows += batch.num_rows(),
1173 Ring1Action::Eof => saw_eof = true,
1174 _ => {}
1175 }
1176 }
1177 if !saw_eof {
1178 std::thread::yield_now();
1179 }
1180 }
1181 if consumer.pending_rows() > 0 {
1183 let arena = Bump::new();
1184 let mut row = MutableEventRow::new_in(&arena, &schema, 0);
1185 row.set_i64(0, 0);
1186 row.set_f64(1, 0.0);
1187 total_rows += consumer.pending_rows();
1189 }
1190
1191 handle.join().unwrap();
1192
1193 let snap = consumer.stats().snapshot();
1194 assert_eq!(
1196 total_rows as u64 + snap.events_dropped,
1197 snap.events_sent + snap.events_dropped
1198 );
1199 }
1200
1201 #[test]
1202 fn backpressure_under_load() {
1203 let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
1204 let (producer, mut consumer) = small_bridge(Arc::clone(&schema), 1024, 2);
1206
1207 let mut sent = 0;
1208 let mut dropped = 0;
1209 let arena = Bump::new();
1210 for i in 0..10 {
1211 let mut row = MutableEventRow::new_in(&arena, &schema, 0);
1212 row.set_i64(0, i);
1213 match producer.send_event(&row.freeze(), i, 0) {
1214 Ok(()) => sent += 1,
1215 Err(_) => dropped += 1,
1216 }
1217 }
1218
1219 producer.send_eof().unwrap_or(());
1221 let actions = consumer.drain();
1222 let batch_rows: usize = actions
1223 .iter()
1224 .filter_map(|a| match a {
1225 Ring1Action::ProcessBatch(b) => Some(b.num_rows()),
1226 _ => None,
1227 })
1228 .sum();
1229
1230 assert!(sent >= 1, "at least one event should have been sent");
1232 assert!(dropped > 0, "some events should have been dropped");
1233 assert_eq!(batch_rows + consumer.pending_rows(), sent);
1235 }
1236}