Skip to main content

laminar_core/operator/
watermark_sort.rs

1//! # Watermark-Bounded Sort Operator
2//!
3//! Buffers events between watermark boundaries, emits sorted batch
4//! when watermark advances. Useful for producing ordered output
5//! (e.g., sorted Parquet files) from out-of-order streams.
6//!
7//! ## Memory Bounds
8//!
9//! Only holds events in the `(last_watermark, current_watermark)` range,
10//! which is bounded by `max_out_of_orderness`. A `max_buffer_size` safety
11//! limit prevents unbounded growth.
12//!
13//! ## How It Works
14//!
15//! 1. Events arrive out of order (within bounded disorder)
16//! 2. Events are buffered until watermark advances
17//! 3. On watermark advance, all events with timestamp <= new watermark
18//!    are sorted and emitted as a batch
19//! 4. Late events (timestamp <= last emitted watermark) are dropped
20
21use super::topk::{
22    encode_f64, encode_i64, encode_not_null, encode_null, encode_utf8, TopKSortColumn,
23};
24use super::{
25    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
26};
27use arrow_array::{Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray};
28use arrow_schema::DataType;
29
30/// A buffered event with its pre-computed sort key.
31struct SortBufferEntry {
32    /// Memcomparable sort key.
33    sort_key: Vec<u8>,
34    /// The original event.
35    event: Event,
36}
37
38/// Watermark-bounded sort operator.
39///
40/// Buffers events between watermark advances and emits sorted batches.
41pub struct WatermarkBoundedSortOperator {
42    /// Operator identifier for checkpointing.
43    operator_id: String,
44    /// Sort column specifications.
45    sort_columns: Vec<TopKSortColumn>,
46    /// Event buffer (unsorted).
47    buffer: Vec<SortBufferEntry>,
48    /// Maximum buffer size (safety limit).
49    max_buffer_size: usize,
50    /// Last watermark value that triggered emission.
51    last_watermark: i64,
52    /// Number of late events dropped.
53    late_events_dropped: u64,
54    /// Cached column indices for sort columns — resolved on first event.
55    cached_sort_indices: Vec<Option<usize>>,
56}
57
58impl WatermarkBoundedSortOperator {
59    /// Creates a new watermark-bounded sort operator.
60    #[must_use]
61    pub fn new(
62        operator_id: String,
63        sort_columns: Vec<TopKSortColumn>,
64        max_buffer_size: usize,
65    ) -> Self {
66        let num_sort_cols = sort_columns.len();
67        Self {
68            operator_id,
69            sort_columns,
70            buffer: Vec::with_capacity(max_buffer_size),
71            max_buffer_size,
72            last_watermark: i64::MIN,
73            late_events_dropped: 0,
74            cached_sort_indices: vec![None; num_sort_cols],
75        }
76    }
77
78    /// Returns the current buffer size.
79    #[must_use]
80    pub fn buffer_size(&self) -> usize {
81        self.buffer.len()
82    }
83
84    /// Returns true if the buffer is empty.
85    #[must_use]
86    pub fn is_buffer_empty(&self) -> bool {
87        self.buffer.is_empty()
88    }
89
90    /// Returns the last emitted watermark.
91    #[must_use]
92    pub fn last_watermark(&self) -> i64 {
93        self.last_watermark
94    }
95
96    /// Returns the count of late events that were dropped.
97    #[must_use]
98    pub fn late_events_dropped(&self) -> u64 {
99        self.late_events_dropped
100    }
101
102    /// Extracts a memcomparable sort key from an event.
103    ///
104    /// Caches column indices on first call to avoid per-event schema lookups.
105    fn extract_sort_key(&mut self, event: &Event) -> Vec<u8> {
106        let batch = &event.data;
107        let mut key = Vec::new();
108
109        for (i, col_spec) in self.sort_columns.iter().enumerate() {
110            let col_idx = if let Some(idx) = self.cached_sort_indices[i] {
111                idx
112            } else {
113                let Ok(idx) = batch.schema().index_of(&col_spec.column_name) else {
114                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
115                    continue;
116                };
117                self.cached_sort_indices[i] = Some(idx);
118                idx
119            };
120
121            let array = batch.column(col_idx);
122
123            if array.is_null(0) {
124                encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
125                continue;
126            }
127
128            match array.data_type() {
129                DataType::Int64 => {
130                    if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
131                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
132                        encode_i64(arr.value(0), col_spec.descending, &mut key);
133                    } else {
134                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
135                    }
136                }
137                DataType::Float64 => {
138                    if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
139                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
140                        encode_f64(arr.value(0), col_spec.descending, &mut key);
141                    } else {
142                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
143                    }
144                }
145                DataType::Utf8 => {
146                    if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
147                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
148                        encode_utf8(arr.value(0), col_spec.descending, &mut key);
149                    } else {
150                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
151                    }
152                }
153                DataType::Timestamp(_, _) => {
154                    if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
155                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
156                        encode_i64(arr.value(0), col_spec.descending, &mut key);
157                    } else {
158                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
159                    }
160                }
161                _ => {
162                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
163                }
164            }
165        }
166
167        key
168    }
169
170    /// Processes a watermark advance: sorts and emits all buffered events.
171    fn on_watermark_advance(&mut self, new_watermark: i64) -> OutputVec {
172        if new_watermark <= self.last_watermark {
173            return OutputVec::new();
174        }
175
176        // Sort the buffer
177        self.buffer.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
178
179        // Emit all buffered events
180        let mut outputs = OutputVec::new();
181        for entry in self.buffer.drain(..) {
182            outputs.push(Output::Event(entry.event));
183        }
184
185        // Emit the watermark
186        outputs.push(Output::Watermark(new_watermark));
187
188        self.last_watermark = new_watermark;
189        outputs
190    }
191}
192
193impl Operator for WatermarkBoundedSortOperator {
194    fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
195        // Drop late events (before or at last watermark)
196        if event.timestamp <= self.last_watermark {
197            self.late_events_dropped += 1;
198            return OutputVec::new();
199        }
200
201        // Buffer the event if under capacity
202        if self.buffer.len() >= self.max_buffer_size {
203            // Safety: force a sort+emit when buffer is full
204            let outputs = self.on_watermark_advance(event.timestamp);
205            // Then buffer the new event
206            let sort_key = self.extract_sort_key(event);
207            self.buffer.push(SortBufferEntry {
208                sort_key,
209                event: event.clone(),
210            });
211            return outputs;
212        }
213
214        let sort_key = self.extract_sort_key(event);
215        self.buffer.push(SortBufferEntry {
216            sort_key,
217            event: event.clone(),
218        });
219
220        OutputVec::new()
221    }
222
223    fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
224        // Timer carries watermark timestamp
225        self.on_watermark_advance(timer.timestamp)
226    }
227
228    fn checkpoint(&self) -> OperatorState {
229        let mut data = Vec::new();
230
231        // Write last watermark
232        data.extend_from_slice(&self.last_watermark.to_le_bytes());
233
234        // Write late events dropped
235        data.extend_from_slice(&self.late_events_dropped.to_le_bytes());
236
237        // Write buffer size
238        let buf_len = self.buffer.len() as u64;
239        data.extend_from_slice(&buf_len.to_le_bytes());
240
241        // Write event timestamps
242        for entry in &self.buffer {
243            data.extend_from_slice(&entry.event.timestamp.to_le_bytes());
244        }
245
246        OperatorState {
247            operator_id: self.operator_id.clone(),
248            data,
249        }
250    }
251
252    #[allow(clippy::cast_possible_truncation)] // Checkpoint wire format uses u64 for counts
253    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
254        if state.data.len() < 24 {
255            return Err(OperatorError::SerializationFailed(
256                "WatermarkSort checkpoint data too short".to_string(),
257            ));
258        }
259
260        let mut offset = 0;
261        self.last_watermark = i64::from_le_bytes(
262            state.data[offset..offset + 8]
263                .try_into()
264                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
265        );
266        offset += 8;
267
268        self.late_events_dropped = u64::from_le_bytes(
269            state.data[offset..offset + 8]
270                .try_into()
271                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
272        );
273        offset += 8;
274
275        let buf_len = u64::from_le_bytes(
276            state.data[offset..offset + 8]
277                .try_into()
278                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
279        ) as usize;
280        offset += 8;
281
282        self.buffer.clear();
283        for _ in 0..buf_len {
284            if offset + 8 > state.data.len() {
285                return Err(OperatorError::SerializationFailed(
286                    "WatermarkSort checkpoint truncated".to_string(),
287                ));
288            }
289            let ts = i64::from_le_bytes(
290                state.data[offset..offset + 8]
291                    .try_into()
292                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
293            );
294            offset += 8;
295
296            let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
297                arrow_schema::Schema::empty(),
298            ));
299            self.buffer.push(SortBufferEntry {
300                sort_key: Vec::new(),
301                event: Event::new(ts, batch),
302            });
303        }
304
305        Ok(())
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use crate::state::InMemoryStore;
313    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
314    use arrow_array::{Float64Array, Int64Array, RecordBatch};
315    use arrow_schema::{DataType, Field, Schema};
316    use smallvec::smallvec;
317    use std::sync::Arc;
318
319    fn make_event(timestamp: i64, value: i64) -> Event {
320        let schema = Arc::new(Schema::new(vec![Field::new(
321            "value",
322            DataType::Int64,
323            false,
324        )]));
325        let batch =
326            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
327        Event::new(timestamp, batch)
328    }
329
330    fn make_event_f64(timestamp: i64, price: f64) -> Event {
331        let schema = Arc::new(Schema::new(vec![Field::new(
332            "price",
333            DataType::Float64,
334            false,
335        )]));
336        let batch =
337            RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
338        Event::new(timestamp, batch)
339    }
340
341    fn create_test_context<'a>(
342        timers: &'a mut TimerService,
343        state: &'a mut dyn crate::state::StateStore,
344        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
345    ) -> OperatorContext<'a> {
346        OperatorContext {
347            event_time: 0,
348            processing_time: 0,
349            timers,
350            state,
351            watermark_generator: watermark_gen,
352            operator_index: 0,
353        }
354    }
355
356    #[test]
357    fn test_watermark_sort_basic() {
358        let mut op = WatermarkBoundedSortOperator::new(
359            "test_wms".to_string(),
360            vec![TopKSortColumn::ascending("value")],
361            100_000,
362        );
363
364        let mut timers = TimerService::new();
365        let mut state = InMemoryStore::new();
366        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
367
368        // Buffer events
369        let e1 = make_event(100, 30);
370        let e2 = make_event(200, 10);
371        let e3 = make_event(300, 20);
372
373        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
374        op.process(&e1, &mut ctx);
375        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
376        op.process(&e2, &mut ctx);
377        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
378        op.process(&e3, &mut ctx);
379
380        assert_eq!(op.buffer_size(), 3);
381    }
382
383    #[test]
384    fn test_watermark_sort_out_of_order() {
385        let mut op = WatermarkBoundedSortOperator::new(
386            "test_wms".to_string(),
387            vec![TopKSortColumn::ascending("value")],
388            100_000,
389        );
390
391        let mut timers = TimerService::new();
392        let mut state = InMemoryStore::new();
393        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
394
395        // Out of order values
396        let events = vec![
397            make_event(100, 30),
398            make_event(200, 10),
399            make_event(300, 20),
400        ];
401
402        for event in &events {
403            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
404            op.process(event, &mut ctx);
405        }
406
407        // Watermark advance triggers sorted emission
408        let timer = Timer {
409            key: smallvec![],
410            timestamp: 500,
411        };
412        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
413        let outputs = op.on_timer(timer, &mut ctx);
414
415        // 3 events + 1 watermark
416        assert_eq!(outputs.len(), 4);
417
418        // Verify sorted by value ascending: 10, 20, 30
419        let vals: Vec<i64> = outputs
420            .iter()
421            .filter_map(|o| match o {
422                Output::Event(e) => Some(
423                    e.data
424                        .column(0)
425                        .as_any()
426                        .downcast_ref::<Int64Array>()
427                        .unwrap()
428                        .value(0),
429                ),
430                _ => None,
431            })
432            .collect();
433        assert_eq!(vals, vec![10, 20, 30]);
434    }
435
436    #[test]
437    fn test_watermark_sort_watermark_advance() {
438        let mut op = WatermarkBoundedSortOperator::new(
439            "test_wms".to_string(),
440            vec![TopKSortColumn::ascending("value")],
441            100_000,
442        );
443
444        // Directly test watermark advance
445        let mut timers = TimerService::new();
446        let mut state = InMemoryStore::new();
447        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
448
449        let e1 = make_event(100, 50);
450        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
451        op.process(&e1, &mut ctx);
452
453        let outputs = op.on_watermark_advance(200);
454        // Should emit 1 event + 1 watermark
455        assert_eq!(outputs.len(), 2);
456        assert!(op.is_buffer_empty());
457        assert_eq!(op.last_watermark(), 200);
458    }
459
460    #[test]
461    fn test_watermark_sort_late_events() {
462        let mut op = WatermarkBoundedSortOperator::new(
463            "test_wms".to_string(),
464            vec![TopKSortColumn::ascending("value")],
465            100_000,
466        );
467
468        let mut timers = TimerService::new();
469        let mut state = InMemoryStore::new();
470        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
471
472        // Advance watermark
473        let e1 = make_event(100, 10);
474        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
475        op.process(&e1, &mut ctx);
476        op.on_watermark_advance(200);
477
478        // Late event (timestamp 50 <= watermark 200)
479        let late = make_event(50, 5);
480        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
481        let outputs = op.process(&late, &mut ctx);
482
483        assert!(outputs.is_empty());
484        assert_eq!(op.late_events_dropped(), 1);
485    }
486
487    #[test]
488    fn test_watermark_sort_multi_column() {
489        let mut op = WatermarkBoundedSortOperator::new(
490            "test_wms".to_string(),
491            vec![TopKSortColumn::descending("price")],
492            100_000,
493        );
494
495        let mut timers = TimerService::new();
496        let mut state = InMemoryStore::new();
497        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
498
499        let events = vec![
500            make_event_f64(100, 100.0),
501            make_event_f64(200, 300.0),
502            make_event_f64(300, 200.0),
503        ];
504
505        for event in &events {
506            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
507            op.process(event, &mut ctx);
508        }
509
510        let outputs = op.on_watermark_advance(500);
511        let prices: Vec<f64> = outputs
512            .iter()
513            .filter_map(|o| match o {
514                Output::Event(e) => Some(
515                    e.data
516                        .column(0)
517                        .as_any()
518                        .downcast_ref::<Float64Array>()
519                        .unwrap()
520                        .value(0),
521                ),
522                _ => None,
523            })
524            .collect();
525        // Descending: 300, 200, 100
526        assert_eq!(prices, vec![300.0, 200.0, 100.0]);
527    }
528
529    #[test]
530    fn test_watermark_sort_empty_buffer() {
531        let mut op = WatermarkBoundedSortOperator::new(
532            "test_wms".to_string(),
533            vec![TopKSortColumn::ascending("value")],
534            100_000,
535        );
536
537        // Advancing watermark on empty buffer
538        let outputs = op.on_watermark_advance(100);
539        // Only watermark output
540        assert_eq!(outputs.len(), 1);
541        match &outputs[0] {
542            Output::Watermark(w) => assert_eq!(*w, 100),
543            _ => panic!("Expected Watermark output"),
544        }
545    }
546
547    #[test]
548    fn test_watermark_sort_buffer_limit() {
549        let mut op = WatermarkBoundedSortOperator::new(
550            "test_wms".to_string(),
551            vec![TopKSortColumn::ascending("value")],
552            3, // Very small buffer limit
553        );
554
555        let mut timers = TimerService::new();
556        let mut state = InMemoryStore::new();
557        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
558
559        // Fill buffer to capacity
560        for i in 1..=3 {
561            let event = make_event(i * 100, i);
562            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
563            op.process(&event, &mut ctx);
564        }
565        assert_eq!(op.buffer_size(), 3);
566
567        // Next event triggers a forced flush
568        let overflow = make_event(400, 4);
569        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
570        let outputs = op.process(&overflow, &mut ctx);
571
572        // Should have flushed previous buffer (3 events + watermark) + 1 new buffered
573        assert!(!outputs.is_empty());
574        assert_eq!(op.buffer_size(), 1); // The new event is buffered
575    }
576
577    #[test]
578    fn test_watermark_sort_checkpoint_restore() {
579        let mut op = WatermarkBoundedSortOperator::new(
580            "test_wms".to_string(),
581            vec![TopKSortColumn::ascending("value")],
582            100_000,
583        );
584
585        let mut timers = TimerService::new();
586        let mut state = InMemoryStore::new();
587        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
588
589        let e1 = make_event(100, 10);
590        let e2 = make_event(200, 20);
591        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
592        op.process(&e1, &mut ctx);
593        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
594        op.process(&e2, &mut ctx);
595
596        let checkpoint = op.checkpoint();
597        assert_eq!(checkpoint.operator_id, "test_wms");
598
599        let mut op2 = WatermarkBoundedSortOperator::new(
600            "test_wms".to_string(),
601            vec![TopKSortColumn::ascending("value")],
602            100_000,
603        );
604        op2.restore(checkpoint).unwrap();
605
606        assert_eq!(op2.buffer_size(), 2);
607        assert_eq!(op2.last_watermark(), i64::MIN);
608    }
609
610    #[test]
611    fn test_watermark_sort_ascending_and_descending() {
612        // Test with ascending sort
613        let mut op_asc = WatermarkBoundedSortOperator::new(
614            "asc".to_string(),
615            vec![TopKSortColumn::ascending("value")],
616            100_000,
617        );
618
619        let mut timers = TimerService::new();
620        let mut state = InMemoryStore::new();
621        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
622
623        for val in [30i64, 10, 20] {
624            let event = make_event(val * 10, val);
625            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
626            op_asc.process(&event, &mut ctx);
627        }
628
629        let asc_out = op_asc.on_watermark_advance(1000);
630        let asc_vals: Vec<i64> = asc_out
631            .iter()
632            .filter_map(|o| match o {
633                Output::Event(e) => Some(
634                    e.data
635                        .column(0)
636                        .as_any()
637                        .downcast_ref::<Int64Array>()
638                        .unwrap()
639                        .value(0),
640                ),
641                _ => None,
642            })
643            .collect();
644        assert_eq!(asc_vals, vec![10, 20, 30]);
645
646        // Test with descending sort
647        let mut op_desc = WatermarkBoundedSortOperator::new(
648            "desc".to_string(),
649            vec![TopKSortColumn::descending("value")],
650            100_000,
651        );
652
653        for val in [30i64, 10, 20] {
654            let event = make_event(val * 10, val);
655            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
656            op_desc.process(&event, &mut ctx);
657        }
658
659        let desc_out = op_desc.on_watermark_advance(1000);
660        let desc_vals: Vec<i64> = desc_out
661            .iter()
662            .filter_map(|o| match o {
663                Output::Event(e) => Some(
664                    e.data
665                        .column(0)
666                        .as_any()
667                        .downcast_ref::<Int64Array>()
668                        .unwrap()
669                        .value(0),
670                ),
671                _ => None,
672            })
673            .collect();
674        assert_eq!(desc_vals, vec![30, 20, 10]);
675    }
676
677    #[test]
678    fn test_watermark_sort_preserves_watermarks() {
679        let mut op = WatermarkBoundedSortOperator::new(
680            "test_wms".to_string(),
681            vec![TopKSortColumn::ascending("value")],
682            100_000,
683        );
684
685        let mut timers = TimerService::new();
686        let mut state = InMemoryStore::new();
687        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
688
689        let e1 = make_event(100, 10);
690        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
691        op.process(&e1, &mut ctx);
692
693        let outputs = op.on_watermark_advance(200);
694        // Last output should be the watermark
695        let last = outputs.last().unwrap();
696        match last {
697            Output::Watermark(w) => assert_eq!(*w, 200),
698            _ => panic!("Expected Watermark as last output"),
699        }
700    }
701}