Skip to main content

laminar_core/compiler/
pipeline_bridge.rs

1//! Ring 0 / Ring 1 pipeline bridge via lock-free SPSC queue.
2//!
3//! The [`PipelineBridge`] (Ring 0 producer) sends compiled pipeline output as
4//! individual [`BridgeMessage`]s through an [`SpscQueue`]. The [`BridgeConsumer`]
5//! (Ring 1) accumulates events into Arrow `RecordBatch` according to a
6//! [`BatchPolicy`], emitting [`Ring1Action`]s for downstream stateful operators.
7//!
8//! This explicit, watermark-aware handoff prevents partial-batch emissions tied
9//! to arbitrary batch boundaries (Issue #55). Watermarks flush pending rows
10//! *before* the watermark is forwarded, ensuring Ring 1 operators see complete
11//! event sets.
12
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Instant;
16
17use arrow_array::RecordBatch;
18use smallvec::SmallVec;
19
20use super::bridge::{BridgeError, RowBatchBridge};
21use super::policy::{BackpressureStrategy, BatchPolicy};
22use super::row::{EventRow, RowSchema};
23use crate::tpc::SpscQueue;
24
25// ────────────────────────────── Errors ──────────────────────────────
26
27/// Errors from the pipeline bridge.
28#[derive(Debug, thiserror::Error)]
29pub enum PipelineBridgeError {
30    /// The SPSC queue is full and the backpressure strategy dropped the event.
31    #[error("backpressure: dropped {0} event(s)")]
32    Backpressure(usize),
33    /// The output schema is incompatible with the bridge configuration.
34    #[error("schema mismatch: {0}")]
35    SchemaMismatch(String),
36    /// Batch formation failed in the underlying [`RowBatchBridge`].
37    #[error("batch formation error: {0}")]
38    BatchFormation(#[from] BridgeError),
39}
40
41// ────────────────────────────── Messages ─────────────────────────────
42
43/// A message sent from Ring 0 through the SPSC queue to Ring 1.
44// The size disparity between `Event` (280 bytes) and other variants is
45// intentional: `SmallVec<[u8; 256]>` keeps row data inline to avoid heap
46// allocations on the Ring 0 hot path. Boxing would negate the benefit.
47#[derive(Debug)]
48#[allow(clippy::large_enum_variant)]
49pub enum BridgeMessage {
50    /// A processed event row with metadata.
51    Event {
52        /// Serialized row bytes (inline for rows <= 256 bytes).
53        row_data: SmallVec<[u8; 256]>,
54        /// Event timestamp (microseconds since epoch).
55        event_time: i64,
56        /// Pre-computed key hash for partitioned operators.
57        key_hash: u64,
58    },
59    /// Watermark advance notification.
60    Watermark {
61        /// The new watermark timestamp.
62        timestamp: i64,
63    },
64    /// Checkpoint barrier — Ring 1 must flush and persist state.
65    CheckpointBarrier {
66        /// The epoch number for this checkpoint.
67        epoch: u64,
68    },
69    /// End of stream — no more messages will follow.
70    Eof,
71}
72
73// ────────────────────────────── Stats ────────────────────────────────
74
75/// Shared counters between producer and consumer.
76pub struct BridgeStats {
77    /// Total events sent by the producer.
78    pub events_sent: AtomicU64,
79    /// Events dropped due to backpressure.
80    pub events_dropped: AtomicU64,
81    /// Watermark messages sent.
82    pub watermarks_sent: AtomicU64,
83    /// Checkpoint barriers sent.
84    pub checkpoints_sent: AtomicU64,
85    /// Batches flushed by the consumer.
86    pub batches_flushed: AtomicU64,
87    /// Total rows flushed across all batches.
88    pub rows_flushed: AtomicU64,
89}
90
91impl BridgeStats {
92    /// Creates a new zeroed stats instance.
93    #[must_use]
94    pub fn new() -> Self {
95        Self {
96            events_sent: AtomicU64::new(0),
97            events_dropped: AtomicU64::new(0),
98            watermarks_sent: AtomicU64::new(0),
99            checkpoints_sent: AtomicU64::new(0),
100            batches_flushed: AtomicU64::new(0),
101            rows_flushed: AtomicU64::new(0),
102        }
103    }
104
105    /// Takes a point-in-time snapshot of all counters.
106    #[must_use]
107    pub fn snapshot(&self) -> BridgeStatsSnapshot {
108        BridgeStatsSnapshot {
109            events_sent: self.events_sent.load(Ordering::Relaxed),
110            events_dropped: self.events_dropped.load(Ordering::Relaxed),
111            watermarks_sent: self.watermarks_sent.load(Ordering::Relaxed),
112            checkpoints_sent: self.checkpoints_sent.load(Ordering::Relaxed),
113            batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
114            rows_flushed: self.rows_flushed.load(Ordering::Relaxed),
115        }
116    }
117}
118
119impl Default for BridgeStats {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125impl std::fmt::Debug for BridgeStats {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.debug_struct("BridgeStats")
128            .field("events_sent", &self.events_sent.load(Ordering::Relaxed))
129            .field(
130                "events_dropped",
131                &self.events_dropped.load(Ordering::Relaxed),
132            )
133            .field(
134                "watermarks_sent",
135                &self.watermarks_sent.load(Ordering::Relaxed),
136            )
137            .field(
138                "checkpoints_sent",
139                &self.checkpoints_sent.load(Ordering::Relaxed),
140            )
141            .field(
142                "batches_flushed",
143                &self.batches_flushed.load(Ordering::Relaxed),
144            )
145            .field("rows_flushed", &self.rows_flushed.load(Ordering::Relaxed))
146            .finish()
147    }
148}
149
150/// A point-in-time snapshot of [`BridgeStats`].
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub struct BridgeStatsSnapshot {
153    /// Total events sent by the producer.
154    pub events_sent: u64,
155    /// Events dropped due to backpressure.
156    pub events_dropped: u64,
157    /// Watermark messages sent.
158    pub watermarks_sent: u64,
159    /// Checkpoint barriers sent.
160    pub checkpoints_sent: u64,
161    /// Batches flushed by the consumer.
162    pub batches_flushed: u64,
163    /// Total rows flushed across all batches.
164    pub rows_flushed: u64,
165}
166
167// ────────────────────────────── Producer ─────────────────────────────
168
169/// Ring 0 side of the pipeline bridge (producer).
170///
171/// Sends compiled pipeline output through a lock-free SPSC queue.
172/// The `send_*` methods take `&self` because `SpscQueue::push` uses
173/// atomic internals and does not require `&mut self`.
174pub struct PipelineBridge {
175    queue: Arc<SpscQueue<BridgeMessage>>,
176    output_schema: Arc<RowSchema>,
177    backpressure_strategy: BackpressureStrategy,
178    stats: Arc<BridgeStats>,
179}
180
181impl PipelineBridge {
182    /// Sends a processed event row through the bridge.
183    ///
184    /// Copies `row.data()` into a `SmallVec` (inline for <=256 bytes) so the
185    /// caller's arena can be reset after this call returns.
186    ///
187    /// # Errors
188    ///
189    /// Returns [`PipelineBridgeError::Backpressure`] if the queue is full and
190    /// the strategy is [`BackpressureStrategy::DropNewest`].
191    pub fn send_event(
192        &self,
193        row: &EventRow<'_>,
194        event_time: i64,
195        key_hash: u64,
196    ) -> Result<(), PipelineBridgeError> {
197        let data = row.data();
198        debug_assert!(
199            data.len() <= 256,
200            "BridgeMessage::Event row_data spilled to heap ({} bytes > 256 inline capacity)",
201            data.len()
202        );
203        let msg = BridgeMessage::Event {
204            row_data: SmallVec::from_slice(data),
205            event_time,
206            key_hash,
207        };
208        self.try_send(msg)
209    }
210
211    /// Sends a watermark advance through the bridge.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`PipelineBridgeError::Backpressure`] if the queue is full.
216    pub fn send_watermark(&self, timestamp: i64) -> Result<(), PipelineBridgeError> {
217        let msg = BridgeMessage::Watermark { timestamp };
218        if self.queue.push(msg).is_err() {
219            return Err(PipelineBridgeError::Backpressure(0));
220        }
221        self.stats.watermarks_sent.fetch_add(1, Ordering::Relaxed);
222        Ok(())
223    }
224
225    /// Sends a checkpoint barrier through the bridge.
226    ///
227    /// # Errors
228    ///
229    /// Returns [`PipelineBridgeError::Backpressure`] if the queue is full.
230    pub fn send_checkpoint(&self, epoch: u64) -> Result<(), PipelineBridgeError> {
231        let msg = BridgeMessage::CheckpointBarrier { epoch };
232        if self.queue.push(msg).is_err() {
233            return Err(PipelineBridgeError::Backpressure(0));
234        }
235        self.stats.checkpoints_sent.fetch_add(1, Ordering::Relaxed);
236        Ok(())
237    }
238
239    /// Sends an end-of-stream marker through the bridge.
240    ///
241    /// # Errors
242    ///
243    /// Returns [`PipelineBridgeError::Backpressure`] if the queue is full.
244    pub fn send_eof(&self) -> Result<(), PipelineBridgeError> {
245        if self.queue.push(BridgeMessage::Eof).is_err() {
246            return Err(PipelineBridgeError::Backpressure(0));
247        }
248        Ok(())
249    }
250
251    /// Returns `true` if the SPSC queue has room for at least one more message.
252    #[must_use]
253    pub fn has_capacity(&self) -> bool {
254        !self.queue.is_full()
255    }
256
257    /// Returns `true` if the SPSC queue is full.
258    #[must_use]
259    pub fn is_backpressured(&self) -> bool {
260        self.queue.is_full()
261    }
262
263    /// Returns a reference to the shared statistics.
264    #[must_use]
265    pub fn stats(&self) -> &Arc<BridgeStats> {
266        &self.stats
267    }
268
269    /// Returns the output schema.
270    #[must_use]
271    pub fn output_schema(&self) -> &Arc<RowSchema> {
272        &self.output_schema
273    }
274
275    /// Attempts to send a message, applying backpressure strategy on failure.
276    fn try_send(&self, msg: BridgeMessage) -> Result<(), PipelineBridgeError> {
277        if self.queue.push(msg).is_ok() {
278            self.stats.events_sent.fetch_add(1, Ordering::Relaxed);
279            return Ok(());
280        }
281        // Queue full — apply strategy.
282        match &self.backpressure_strategy {
283            BackpressureStrategy::DropNewest => {
284                self.stats.events_dropped.fetch_add(1, Ordering::Relaxed);
285                Err(PipelineBridgeError::Backpressure(1))
286            }
287            BackpressureStrategy::PauseSource => Err(PipelineBridgeError::Backpressure(0)),
288            BackpressureStrategy::SpillToDisk { .. } => {
289                // Spill-to-disk is not yet implemented; fall back to drop.
290                self.stats.events_dropped.fetch_add(1, Ordering::Relaxed);
291                Err(PipelineBridgeError::Backpressure(1))
292            }
293        }
294    }
295}
296
297impl std::fmt::Debug for PipelineBridge {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        f.debug_struct("PipelineBridge")
300            .field("backpressure_strategy", &self.backpressure_strategy)
301            .field("stats", &self.stats)
302            .finish_non_exhaustive()
303    }
304}
305
306// ────────────────────────────── Consumer ─────────────────────────────
307
308/// Ring 1 side of the pipeline bridge (consumer).
309///
310/// Drains [`BridgeMessage`]s from the SPSC queue, accumulates events in a
311/// [`RowBatchBridge`], and produces [`Ring1Action`]s when flush conditions
312/// are met.
313pub struct BridgeConsumer {
314    queue: Arc<SpscQueue<BridgeMessage>>,
315    batch_bridge: RowBatchBridge,
316    row_schema: Arc<RowSchema>,
317    policy: BatchPolicy,
318    current_watermark: i64,
319    batch_start_time: Option<Instant>,
320    stats: Arc<BridgeStats>,
321}
322
323impl BridgeConsumer {
324    /// Drains all available messages from the queue and returns the resulting actions.
325    ///
326    /// Actions are returned in order:
327    /// 1. `ProcessBatch` when flush conditions are met
328    /// 2. `AdvanceWatermark` after any pending rows are flushed
329    /// 3. `Checkpoint` after any pending rows are flushed
330    /// 4. `Eof` after any pending rows are flushed
331    pub fn drain(&mut self) -> SmallVec<[Ring1Action; 4]> {
332        let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
333
334        while let Some(msg) = self.queue.pop() {
335            match msg {
336                BridgeMessage::Event {
337                    row_data,
338                    event_time: _,
339                    key_hash: _,
340                } => {
341                    self.append_event_row(&row_data);
342                    if self.batch_bridge.row_count() >= self.policy.max_rows {
343                        self.flush_batch(&mut actions);
344                    }
345                }
346                BridgeMessage::Watermark { timestamp } => {
347                    if self.policy.flush_on_watermark && self.batch_bridge.row_count() > 0 {
348                        self.flush_batch(&mut actions);
349                    }
350                    self.current_watermark = timestamp;
351                    actions.push(Ring1Action::AdvanceWatermark(timestamp));
352                }
353                BridgeMessage::CheckpointBarrier { epoch } => {
354                    if self.batch_bridge.row_count() > 0 {
355                        self.flush_batch(&mut actions);
356                    }
357                    actions.push(Ring1Action::Checkpoint(epoch));
358                }
359                BridgeMessage::Eof => {
360                    if self.batch_bridge.row_count() > 0 {
361                        self.flush_batch(&mut actions);
362                    }
363                    actions.push(Ring1Action::Eof);
364                }
365            }
366        }
367
368        actions
369    }
370
371    /// Checks whether the maximum latency has been exceeded and flushes if so.
372    ///
373    /// Returns `Some(Ring1Action::ProcessBatch)` if a latency-triggered flush
374    /// occurred, or `None` otherwise.
375    pub fn check_latency_flush(&mut self) -> Option<Ring1Action> {
376        if self.batch_bridge.row_count() == 0 {
377            return None;
378        }
379        let start = self.batch_start_time?;
380        if start.elapsed() >= self.policy.max_latency {
381            let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
382            self.flush_batch(&mut actions);
383            actions.into_iter().next()
384        } else {
385            None
386        }
387    }
388
389    /// Returns the current watermark as last seen from the producer.
390    #[must_use]
391    pub fn current_watermark(&self) -> i64 {
392        self.current_watermark
393    }
394
395    /// Returns the number of rows pending in the batch (not yet flushed).
396    #[must_use]
397    pub fn pending_rows(&self) -> usize {
398        self.batch_bridge.row_count()
399    }
400
401    /// Returns a reference to the shared statistics.
402    #[must_use]
403    pub fn stats(&self) -> &Arc<BridgeStats> {
404        &self.stats
405    }
406
407    /// Appends raw event bytes to the internal batch bridge.
408    fn append_event_row(&mut self, row_data: &[u8]) {
409        let row = EventRow::new(row_data, &self.row_schema);
410        // The batch bridge should not be full here — we flush when row_count
411        // reaches max_rows, and max_rows <= bridge capacity.
412        self.batch_bridge
413            .append_row(&row)
414            .expect("BridgeConsumer: batch bridge overflow (capacity < max_rows?)");
415        if self.batch_start_time.is_none() {
416            self.batch_start_time = Some(Instant::now());
417        }
418    }
419
420    /// Flushes the current batch into a `Ring1Action::ProcessBatch`.
421    fn flush_batch(&mut self, actions: &mut SmallVec<[Ring1Action; 4]>) {
422        let row_count = self.batch_bridge.row_count();
423        if row_count == 0 {
424            return;
425        }
426        let batch = self.batch_bridge.flush();
427        self.batch_start_time = None;
428        self.stats.batches_flushed.fetch_add(1, Ordering::Relaxed);
429        self.stats
430            .rows_flushed
431            .fetch_add(row_count as u64, Ordering::Relaxed);
432        actions.push(Ring1Action::ProcessBatch(batch));
433    }
434}
435
436impl std::fmt::Debug for BridgeConsumer {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct("BridgeConsumer")
439            .field("current_watermark", &self.current_watermark)
440            .field("pending_rows", &self.batch_bridge.row_count())
441            .field("policy", &self.policy)
442            .field("stats", &self.stats)
443            .finish_non_exhaustive()
444    }
445}
446
447// ────────────────────────────── Actions ──────────────────────────────
448
449/// An action for Ring 1 produced by the [`BridgeConsumer`].
450#[derive(Debug)]
451pub enum Ring1Action {
452    /// A batch of events ready for Ring 1 operators (windows, joins, etc.).
453    ProcessBatch(RecordBatch),
454    /// Watermark advance — Ring 1 should fire timers and close windows.
455    AdvanceWatermark(i64),
456    /// Checkpoint barrier — Ring 1 must flush and persist state at this epoch.
457    Checkpoint(u64),
458    /// End of stream — no more events or watermarks will arrive.
459    Eof,
460}
461
462// ────────────────────────────── Factory ──────────────────────────────
463
464/// Creates a paired [`PipelineBridge`] (producer) and [`BridgeConsumer`] (consumer).
465///
466/// # Arguments
467///
468/// * `schema` - The Arrow-derived row schema for events crossing the bridge.
469/// * `queue_capacity` - SPSC queue capacity (rounded up to next power of 2).
470/// * `batch_capacity` - Maximum rows per `RecordBatch` (>= `policy.max_rows`).
471/// * `policy` - Batching policy for the consumer.
472/// * `strategy` - Backpressure strategy for the producer.
473///
474/// # Errors
475///
476/// Returns [`PipelineBridgeError::SchemaMismatch`] if `batch_capacity < policy.max_rows`.
477/// Returns [`PipelineBridgeError::BatchFormation`] if the schema contains unsupported types.
478pub fn create_pipeline_bridge(
479    schema: Arc<RowSchema>,
480    queue_capacity: usize,
481    batch_capacity: usize,
482    policy: BatchPolicy,
483    strategy: BackpressureStrategy,
484) -> Result<(PipelineBridge, BridgeConsumer), PipelineBridgeError> {
485    if batch_capacity < policy.max_rows {
486        return Err(PipelineBridgeError::SchemaMismatch(format!(
487            "batch_capacity ({batch_capacity}) must be >= policy.max_rows ({})",
488            policy.max_rows
489        )));
490    }
491
492    let arrow_schema = schema.arrow_schema().clone();
493    let batch_bridge = RowBatchBridge::new(arrow_schema, batch_capacity)?;
494    let queue = Arc::new(SpscQueue::new(queue_capacity));
495    let stats = Arc::new(BridgeStats::new());
496
497    let producer = PipelineBridge {
498        queue: Arc::clone(&queue),
499        output_schema: Arc::clone(&schema),
500        backpressure_strategy: strategy,
501        stats: Arc::clone(&stats),
502    };
503
504    let consumer = BridgeConsumer {
505        queue,
506        batch_bridge,
507        row_schema: schema,
508        policy,
509        current_watermark: i64::MIN,
510        batch_start_time: None,
511        stats,
512    };
513
514    Ok((producer, consumer))
515}
516
517#[cfg(test)]
518#[allow(
519    clippy::approx_constant,
520    clippy::cast_precision_loss,
521    clippy::cast_possible_wrap
522)]
523mod tests {
524    use super::*;
525    use crate::compiler::row::MutableEventRow;
526    use arrow_array::{Array, Float64Array, Int64Array, StringArray};
527    use arrow_schema::{DataType, Field, Schema};
528    use bumpalo::Bump;
529    use std::sync::Arc;
530
531    // ── Helpers ──────────────────────────────────────────────────────
532
533    fn make_arrow_schema(fields: Vec<(&str, DataType, bool)>) -> arrow_schema::SchemaRef {
534        Arc::new(Schema::new(
535            fields
536                .into_iter()
537                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
538                .collect::<Vec<_>>(),
539        ))
540    }
541
542    fn make_row_schema(fields: Vec<(&str, DataType, bool)>) -> Arc<RowSchema> {
543        let arrow = make_arrow_schema(fields);
544        Arc::new(RowSchema::from_arrow(&arrow).unwrap())
545    }
546
547    fn default_bridge(schema: Arc<RowSchema>) -> (PipelineBridge, BridgeConsumer) {
548        create_pipeline_bridge(
549            schema,
550            64,
551            1024,
552            BatchPolicy::default(),
553            BackpressureStrategy::DropNewest,
554        )
555        .unwrap()
556    }
557
558    fn small_bridge(
559        schema: Arc<RowSchema>,
560        max_rows: usize,
561        queue_cap: usize,
562    ) -> (PipelineBridge, BridgeConsumer) {
563        create_pipeline_bridge(
564            schema,
565            queue_cap,
566            max_rows.max(1),
567            BatchPolicy::default().with_max_rows(max_rows.max(1)),
568            BackpressureStrategy::DropNewest,
569        )
570        .unwrap()
571    }
572
573    fn send_one_event(producer: &PipelineBridge, schema: &RowSchema, ts: i64, val: f64) {
574        let arena = Bump::new();
575        let mut row = MutableEventRow::new_in(&arena, schema, 0);
576        row.set_i64(0, ts);
577        row.set_f64(1, val);
578        let row = row.freeze();
579        producer.send_event(&row, ts, 0).unwrap();
580    }
581
582    // ── Factory tests ───────────────────────────────────────────────
583
584    #[test]
585    fn factory_basic_schema() {
586        let schema = make_row_schema(vec![
587            ("ts", DataType::Int64, false),
588            ("val", DataType::Float64, true),
589        ]);
590        let (producer, consumer) = default_bridge(schema);
591        assert!(producer.has_capacity());
592        assert_eq!(consumer.pending_rows(), 0);
593        assert_eq!(consumer.current_watermark(), i64::MIN);
594    }
595
596    #[test]
597    fn factory_mixed_types() {
598        let schema = make_row_schema(vec![
599            ("ts", DataType::Int64, false),
600            ("name", DataType::Utf8, true),
601            ("flag", DataType::Boolean, false),
602        ]);
603        let result = create_pipeline_bridge(
604            schema,
605            32,
606            1024,
607            BatchPolicy::default(),
608            BackpressureStrategy::DropNewest,
609        );
610        assert!(result.is_ok());
611    }
612
613    #[test]
614    fn factory_batch_capacity_too_small() {
615        let schema = make_row_schema(vec![("x", DataType::Int64, false)]);
616        let result = create_pipeline_bridge(
617            schema,
618            32,
619            10, // < default max_rows of 1024
620            BatchPolicy::default(),
621            BackpressureStrategy::DropNewest,
622        );
623        assert!(result.is_err());
624        let err = result.unwrap_err();
625        assert!(matches!(err, PipelineBridgeError::SchemaMismatch(_)));
626    }
627
628    // ── Producer tests ──────────────────────────────────────────────
629
630    #[test]
631    fn send_event_basic() {
632        let schema = make_row_schema(vec![
633            ("ts", DataType::Int64, false),
634            ("val", DataType::Float64, true),
635        ]);
636        let (producer, _consumer) = default_bridge(Arc::clone(&schema));
637        send_one_event(&producer, &schema, 1000, 3.14);
638
639        let snap = producer.stats().snapshot();
640        assert_eq!(snap.events_sent, 1);
641        assert_eq!(snap.events_dropped, 0);
642    }
643
644    #[test]
645    fn send_event_row_data_matches() {
646        let schema = make_row_schema(vec![
647            ("ts", DataType::Int64, false),
648            ("val", DataType::Float64, true),
649        ]);
650        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
651
652        let arena = Bump::new();
653        let mut row = MutableEventRow::new_in(&arena, &schema, 0);
654        row.set_i64(0, 42);
655        row.set_f64(1, 2.718);
656        let row = row.freeze();
657        let original_data: Vec<u8> = row.data().to_vec();
658
659        producer.send_event(&row, 42, 0).unwrap();
660
661        // Peek at the message in the queue via drain.
662        let actions = consumer.drain();
663        // The event should be pending (not flushed yet because max_rows=1024).
664        assert!(actions.is_empty());
665        assert_eq!(consumer.pending_rows(), 1);
666
667        // Verify the data was preserved by sending a watermark to flush.
668        producer.send_watermark(100).unwrap();
669        let actions = consumer.drain();
670
671        assert_eq!(actions.len(), 2); // ProcessBatch + AdvanceWatermark
672        if let Ring1Action::ProcessBatch(batch) = &actions[0] {
673            assert_eq!(batch.num_rows(), 1);
674            let col0 = batch
675                .column(0)
676                .as_any()
677                .downcast_ref::<Int64Array>()
678                .unwrap();
679            assert_eq!(col0.value(0), 42);
680            let col1 = batch
681                .column(1)
682                .as_any()
683                .downcast_ref::<Float64Array>()
684                .unwrap();
685            assert!((col1.value(0) - 2.718).abs() < f64::EPSILON);
686        } else {
687            panic!("expected ProcessBatch, got {:?}", actions[0]);
688        }
689
690        // Verify original data length matches (schema-level check).
691        assert!(!original_data.is_empty());
692    }
693
694    #[test]
695    fn send_event_preserves_metadata() {
696        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
697        let (producer, _) = default_bridge(Arc::clone(&schema));
698
699        let arena = Bump::new();
700        let mut row = MutableEventRow::new_in(&arena, &schema, 0);
701        row.set_i64(0, 999);
702        let row = row.freeze();
703        producer.send_event(&row, 999, 0xDEAD).unwrap();
704
705        let snap = producer.stats().snapshot();
706        assert_eq!(snap.events_sent, 1);
707    }
708
709    #[test]
710    fn send_watermark() {
711        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
712        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
713
714        producer.send_watermark(5000).unwrap();
715        let actions = consumer.drain();
716
717        assert_eq!(actions.len(), 1);
718        assert!(matches!(actions[0], Ring1Action::AdvanceWatermark(5000)));
719        assert_eq!(consumer.current_watermark(), 5000);
720
721        let snap = producer.stats().snapshot();
722        assert_eq!(snap.watermarks_sent, 1);
723    }
724
725    #[test]
726    fn send_checkpoint() {
727        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
728        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
729
730        producer.send_checkpoint(7).unwrap();
731        let actions = consumer.drain();
732
733        assert_eq!(actions.len(), 1);
734        assert!(matches!(actions[0], Ring1Action::Checkpoint(7)));
735
736        let snap = producer.stats().snapshot();
737        assert_eq!(snap.checkpoints_sent, 1);
738    }
739
740    #[test]
741    fn send_eof() {
742        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
743        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
744
745        producer.send_eof().unwrap();
746        let actions = consumer.drain();
747
748        assert_eq!(actions.len(), 1);
749        assert!(matches!(actions[0], Ring1Action::Eof));
750    }
751
752    #[test]
753    fn backpressure_drop() {
754        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
755        // Queue capacity of 2 (rounded to 2), holding 1 usable slot.
756        let (producer, _consumer) = small_bridge(Arc::clone(&schema), 1024, 2);
757
758        let arena = Bump::new();
759        let mut row = MutableEventRow::new_in(&arena, &schema, 0);
760        row.set_i64(0, 1);
761        let row = row.freeze();
762
763        // Fill the queue.
764        producer.send_event(&row, 1, 0).unwrap();
765
766        // Next push should be backpressured.
767        let result = producer.send_event(&row, 2, 0);
768        assert!(result.is_err());
769        assert!(matches!(
770            result.unwrap_err(),
771            PipelineBridgeError::Backpressure(1)
772        ));
773
774        let snap = producer.stats().snapshot();
775        assert_eq!(snap.events_dropped, 1);
776    }
777
778    #[test]
779    fn has_capacity_and_backpressure() {
780        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
781        let (producer, _consumer) = small_bridge(Arc::clone(&schema), 1024, 2);
782
783        assert!(producer.has_capacity());
784        assert!(!producer.is_backpressured());
785
786        let arena = Bump::new();
787        let mut row = MutableEventRow::new_in(&arena, &schema, 0);
788        row.set_i64(0, 1);
789        producer.send_event(&row.freeze(), 1, 0).unwrap();
790
791        // Queue of capacity 2 has 1 usable slot.
792        assert!(!producer.has_capacity());
793        assert!(producer.is_backpressured());
794    }
795
796    // ── Consumer drain tests ────────────────────────────────────────
797
798    #[test]
799    fn drain_empty_queue() {
800        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
801        let (_, mut consumer) = default_bridge(schema);
802        let actions = consumer.drain();
803        assert!(actions.is_empty());
804    }
805
806    #[test]
807    fn drain_single_event_pending() {
808        let schema = make_row_schema(vec![
809            ("ts", DataType::Int64, false),
810            ("val", DataType::Float64, true),
811        ]);
812        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
813        send_one_event(&producer, &schema, 1000, 1.0);
814
815        let actions = consumer.drain();
816        // With max_rows=1024, one event is not enough to flush.
817        assert!(actions.is_empty());
818        assert_eq!(consumer.pending_rows(), 1);
819    }
820
821    #[test]
822    fn drain_batch_full_flush() {
823        let schema = make_row_schema(vec![
824            ("ts", DataType::Int64, false),
825            ("val", DataType::Float64, true),
826        ]);
827        // max_rows=3, so 3 events should trigger a flush.
828        let (producer, mut consumer) = small_bridge(Arc::clone(&schema), 3, 64);
829
830        for i in 0..3 {
831            send_one_event(&producer, &schema, i, i as f64);
832        }
833
834        let actions = consumer.drain();
835        assert_eq!(actions.len(), 1);
836        if let Ring1Action::ProcessBatch(batch) = &actions[0] {
837            assert_eq!(batch.num_rows(), 3);
838        } else {
839            panic!("expected ProcessBatch");
840        }
841        assert_eq!(consumer.pending_rows(), 0);
842    }
843
844    #[test]
845    fn drain_watermark_flushes_pending() {
846        let schema = make_row_schema(vec![
847            ("ts", DataType::Int64, false),
848            ("val", DataType::Float64, true),
849        ]);
850        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
851        send_one_event(&producer, &schema, 1000, 1.0);
852        send_one_event(&producer, &schema, 2000, 2.0);
853        producer.send_watermark(3000).unwrap();
854
855        let actions = consumer.drain();
856        // Should be: ProcessBatch(2 rows), AdvanceWatermark(3000).
857        assert_eq!(actions.len(), 2);
858        assert!(matches!(actions[0], Ring1Action::ProcessBatch(_)));
859        assert!(matches!(actions[1], Ring1Action::AdvanceWatermark(3000)));
860    }
861
862    #[test]
863    fn drain_watermark_no_flush_when_empty() {
864        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
865        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
866        producer.send_watermark(5000).unwrap();
867
868        let actions = consumer.drain();
869        // Just the watermark, no batch.
870        assert_eq!(actions.len(), 1);
871        assert!(matches!(actions[0], Ring1Action::AdvanceWatermark(5000)));
872    }
873
874    #[test]
875    fn drain_watermark_no_flush_when_disabled() {
876        let schema = make_row_schema(vec![
877            ("ts", DataType::Int64, false),
878            ("val", DataType::Float64, true),
879        ]);
880        let (producer, mut consumer) = create_pipeline_bridge(
881            Arc::clone(&schema),
882            64,
883            1024,
884            BatchPolicy::default().with_flush_on_watermark(false),
885            BackpressureStrategy::DropNewest,
886        )
887        .unwrap();
888
889        send_one_event(&producer, &schema, 1000, 1.0);
890        producer.send_watermark(5000).unwrap();
891
892        let actions = consumer.drain();
893        // Only watermark — no flush because flush_on_watermark=false.
894        assert_eq!(actions.len(), 1);
895        assert!(matches!(actions[0], Ring1Action::AdvanceWatermark(5000)));
896        assert_eq!(consumer.pending_rows(), 1);
897    }
898
899    #[test]
900    fn drain_checkpoint_flushes() {
901        let schema = make_row_schema(vec![
902            ("ts", DataType::Int64, false),
903            ("val", DataType::Float64, true),
904        ]);
905        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
906        send_one_event(&producer, &schema, 1000, 1.0);
907        producer.send_checkpoint(42).unwrap();
908
909        let actions = consumer.drain();
910        assert_eq!(actions.len(), 2);
911        assert!(matches!(actions[0], Ring1Action::ProcessBatch(_)));
912        assert!(matches!(actions[1], Ring1Action::Checkpoint(42)));
913    }
914
915    #[test]
916    fn drain_eof_flushes() {
917        let schema = make_row_schema(vec![
918            ("ts", DataType::Int64, false),
919            ("val", DataType::Float64, true),
920        ]);
921        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
922        send_one_event(&producer, &schema, 1000, 1.0);
923        producer.send_eof().unwrap();
924
925        let actions = consumer.drain();
926        assert_eq!(actions.len(), 2);
927        assert!(matches!(actions[0], Ring1Action::ProcessBatch(_)));
928        assert!(matches!(actions[1], Ring1Action::Eof));
929    }
930
931    #[test]
932    fn drain_eof_empty() {
933        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
934        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
935        producer.send_eof().unwrap();
936
937        let actions = consumer.drain();
938        assert_eq!(actions.len(), 1);
939        assert!(matches!(actions[0], Ring1Action::Eof));
940    }
941
942    #[test]
943    fn drain_interleaved_messages() {
944        let schema = make_row_schema(vec![
945            ("ts", DataType::Int64, false),
946            ("val", DataType::Float64, true),
947        ]);
948        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
949
950        send_one_event(&producer, &schema, 100, 1.0);
951        send_one_event(&producer, &schema, 200, 2.0);
952        producer.send_watermark(300).unwrap();
953        send_one_event(&producer, &schema, 400, 3.0);
954        producer.send_checkpoint(1).unwrap();
955        producer.send_eof().unwrap();
956
957        let actions = consumer.drain();
958        // Expected: Batch(2 rows), Watermark(300), Batch(1 row), Checkpoint(1), Eof
959        assert_eq!(actions.len(), 5);
960        if let Ring1Action::ProcessBatch(b) = &actions[0] {
961            assert_eq!(b.num_rows(), 2);
962        } else {
963            panic!("expected ProcessBatch at 0");
964        }
965        assert!(matches!(actions[1], Ring1Action::AdvanceWatermark(300)));
966        if let Ring1Action::ProcessBatch(b) = &actions[2] {
967            assert_eq!(b.num_rows(), 1);
968        } else {
969            panic!("expected ProcessBatch at 2");
970        }
971        assert!(matches!(actions[3], Ring1Action::Checkpoint(1)));
972        assert!(matches!(actions[4], Ring1Action::Eof));
973    }
974
975    #[test]
976    fn drain_data_correctness() {
977        let schema = make_row_schema(vec![
978            ("ts", DataType::Int64, false),
979            ("val", DataType::Float64, true),
980        ]);
981        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
982
983        for i in 0..5 {
984            send_one_event(&producer, &schema, i * 100, i as f64 * 1.1);
985        }
986        producer.send_watermark(600).unwrap();
987
988        let actions = consumer.drain();
989        assert_eq!(actions.len(), 2);
990        if let Ring1Action::ProcessBatch(batch) = &actions[0] {
991            assert_eq!(batch.num_rows(), 5);
992            let ts_col = batch
993                .column(0)
994                .as_any()
995                .downcast_ref::<Int64Array>()
996                .unwrap();
997            let val_col = batch
998                .column(1)
999                .as_any()
1000                .downcast_ref::<Float64Array>()
1001                .unwrap();
1002            for i in 0..5 {
1003                assert_eq!(ts_col.value(i), (i as i64) * 100);
1004                assert!((val_col.value(i) - (i as f64) * 1.1).abs() < 1e-10);
1005            }
1006        } else {
1007            panic!("expected ProcessBatch");
1008        }
1009    }
1010
1011    #[test]
1012    fn drain_null_fields() {
1013        let schema = make_row_schema(vec![
1014            ("ts", DataType::Int64, false),
1015            ("name", DataType::Utf8, true),
1016        ]);
1017        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
1018
1019        let arena = Bump::new();
1020        let mut row = MutableEventRow::new_in(&arena, &schema, 64);
1021        row.set_i64(0, 1000);
1022        row.set_null(1, true);
1023        let row = row.freeze();
1024        producer.send_event(&row, 1000, 0).unwrap();
1025        producer.send_watermark(2000).unwrap();
1026
1027        let actions = consumer.drain();
1028        assert_eq!(actions.len(), 2);
1029        if let Ring1Action::ProcessBatch(batch) = &actions[0] {
1030            let name_col = batch
1031                .column(1)
1032                .as_any()
1033                .downcast_ref::<StringArray>()
1034                .unwrap();
1035            assert!(name_col.is_null(0));
1036        } else {
1037            panic!("expected ProcessBatch");
1038        }
1039    }
1040
1041    // ── Latency flush tests ─────────────────────────────────────────
1042
1043    #[test]
1044    fn latency_flush_none_when_empty() {
1045        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
1046        let (_, mut consumer) = default_bridge(schema);
1047        assert!(consumer.check_latency_flush().is_none());
1048    }
1049
1050    #[test]
1051    fn latency_flush_none_when_fresh() {
1052        let schema = make_row_schema(vec![
1053            ("ts", DataType::Int64, false),
1054            ("val", DataType::Float64, true),
1055        ]);
1056        let (producer, mut consumer) = create_pipeline_bridge(
1057            Arc::clone(&schema),
1058            64,
1059            1024,
1060            BatchPolicy::default().with_max_latency(std::time::Duration::from_secs(60)),
1061            BackpressureStrategy::DropNewest,
1062        )
1063        .unwrap();
1064
1065        send_one_event(&producer, &schema, 1000, 1.0);
1066        consumer.drain(); // Process the event into pending.
1067        assert!(consumer.check_latency_flush().is_none());
1068    }
1069
1070    #[test]
1071    fn latency_flush_triggers_after_timeout() {
1072        let schema = make_row_schema(vec![
1073            ("ts", DataType::Int64, false),
1074            ("val", DataType::Float64, true),
1075        ]);
1076        let (producer, mut consumer) = create_pipeline_bridge(
1077            Arc::clone(&schema),
1078            64,
1079            1024,
1080            BatchPolicy::default().with_max_latency(std::time::Duration::from_millis(1)),
1081            BackpressureStrategy::DropNewest,
1082        )
1083        .unwrap();
1084
1085        send_one_event(&producer, &schema, 1000, 1.0);
1086        consumer.drain();
1087        assert_eq!(consumer.pending_rows(), 1);
1088
1089        std::thread::sleep(std::time::Duration::from_millis(5));
1090
1091        let action = consumer.check_latency_flush();
1092        assert!(action.is_some());
1093        assert!(matches!(action.unwrap(), Ring1Action::ProcessBatch(_)));
1094        assert_eq!(consumer.pending_rows(), 0);
1095    }
1096
1097    // ── Stats tests ─────────────────────────────────────────────────
1098
1099    #[test]
1100    fn stats_shared_between_producer_consumer() {
1101        let schema = make_row_schema(vec![
1102            ("ts", DataType::Int64, false),
1103            ("val", DataType::Float64, true),
1104        ]);
1105        let (producer, consumer) = default_bridge(Arc::clone(&schema));
1106
1107        // Same Arc.
1108        assert!(Arc::ptr_eq(producer.stats(), consumer.stats()));
1109    }
1110
1111    #[test]
1112    fn stats_snapshot_correctness() {
1113        let schema = make_row_schema(vec![
1114            ("ts", DataType::Int64, false),
1115            ("val", DataType::Float64, true),
1116        ]);
1117        let (producer, mut consumer) = default_bridge(Arc::clone(&schema));
1118
1119        send_one_event(&producer, &schema, 100, 1.0);
1120        send_one_event(&producer, &schema, 200, 2.0);
1121        producer.send_watermark(300).unwrap();
1122        producer.send_checkpoint(1).unwrap();
1123
1124        consumer.drain();
1125
1126        let snap = consumer.stats().snapshot();
1127        assert_eq!(snap.events_sent, 2);
1128        assert_eq!(snap.events_dropped, 0);
1129        assert_eq!(snap.watermarks_sent, 1);
1130        assert_eq!(snap.checkpoints_sent, 1);
1131        // Watermark flushes the 2 pending rows. Checkpoint has nothing to flush.
1132        assert_eq!(snap.batches_flushed, 1);
1133        assert_eq!(snap.rows_flushed, 2);
1134    }
1135
1136    // ── Concurrency tests ───────────────────────────────────────────
1137
1138    #[test]
1139    fn concurrent_send_drain() {
1140        let schema = make_row_schema(vec![
1141            ("ts", DataType::Int64, false),
1142            ("val", DataType::Float64, true),
1143        ]);
1144        let (producer, mut consumer) = create_pipeline_bridge(
1145            Arc::clone(&schema),
1146            1024,
1147            1024,
1148            BatchPolicy::default().with_max_rows(1024),
1149            BackpressureStrategy::DropNewest,
1150        )
1151        .unwrap();
1152
1153        let schema_clone = Arc::clone(&schema);
1154        let handle = std::thread::spawn(move || {
1155            for i in 0..100 {
1156                let arena = Bump::new();
1157                let mut row = MutableEventRow::new_in(&arena, &schema_clone, 0);
1158                row.set_i64(0, i);
1159                row.set_f64(1, i as f64);
1160                let row = row.freeze();
1161                let _ = producer.send_event(&row, i, 0);
1162            }
1163            producer.send_eof().unwrap();
1164        });
1165
1166        let mut total_rows = 0;
1167        let mut saw_eof = false;
1168        while !saw_eof {
1169            let actions = consumer.drain();
1170            for action in &actions {
1171                match action {
1172                    Ring1Action::ProcessBatch(batch) => total_rows += batch.num_rows(),
1173                    Ring1Action::Eof => saw_eof = true,
1174                    _ => {}
1175                }
1176            }
1177            if !saw_eof {
1178                std::thread::yield_now();
1179            }
1180        }
1181        // Flush any remaining pending rows.
1182        if consumer.pending_rows() > 0 {
1183            let arena = Bump::new();
1184            let mut row = MutableEventRow::new_in(&arena, &schema, 0);
1185            row.set_i64(0, 0);
1186            row.set_f64(1, 0.0);
1187            // Already saw EOF, so just count pending.
1188            total_rows += consumer.pending_rows();
1189        }
1190
1191        handle.join().unwrap();
1192
1193        let snap = consumer.stats().snapshot();
1194        // Some events may have been dropped if the queue was full.
1195        assert_eq!(
1196            total_rows as u64 + snap.events_dropped,
1197            snap.events_sent + snap.events_dropped
1198        );
1199    }
1200
1201    #[test]
1202    fn backpressure_under_load() {
1203        let schema = make_row_schema(vec![("ts", DataType::Int64, false)]);
1204        // Tiny queue: only 1 usable slot (capacity 2, one reserved).
1205        let (producer, mut consumer) = small_bridge(Arc::clone(&schema), 1024, 2);
1206
1207        let mut sent = 0;
1208        let mut dropped = 0;
1209        let arena = Bump::new();
1210        for i in 0..10 {
1211            let mut row = MutableEventRow::new_in(&arena, &schema, 0);
1212            row.set_i64(0, i);
1213            match producer.send_event(&row.freeze(), i, 0) {
1214                Ok(()) => sent += 1,
1215                Err(_) => dropped += 1,
1216            }
1217        }
1218
1219        // Drain whatever made it through.
1220        producer.send_eof().unwrap_or(());
1221        let actions = consumer.drain();
1222        let batch_rows: usize = actions
1223            .iter()
1224            .filter_map(|a| match a {
1225                Ring1Action::ProcessBatch(b) => Some(b.num_rows()),
1226                _ => None,
1227            })
1228            .sum();
1229
1230        // At least one event should have been sent, and some dropped.
1231        assert!(sent >= 1, "at least one event should have been sent");
1232        assert!(dropped > 0, "some events should have been dropped");
1233        // pending rows + batch rows = sent
1234        assert_eq!(batch_rows + consumer.pending_rows(), sent);
1235    }
1236}