Skip to main content

laminar_core/operator/
window_sort.rs

1//! # Window-Local Sort Operator
2//!
3//! ORDER BY within windowed aggregations. Bounded by window size.
4//!
5//! Sits downstream of a window operator. Buffers output events per window.
6//! On watermark advance past window end, sorts the buffer and emits.
7//! Optionally applies a LIMIT for top-N within each window.
8//!
9//! ## Memory Bounds
10//!
11//! Memory is bounded by the maximum number of events per window.
12//! Each window buffer is released immediately after sorting and emission.
13
14use super::topk::{
15    encode_f64, encode_i64, encode_not_null, encode_null, encode_utf8, TopKSortColumn,
16};
17use super::{
18    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
19};
20use arrow_array::{
21    Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray,
22    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
23};
24use arrow_schema::{DataType, TimeUnit};
25use rustc_hash::FxHashMap;
26
27/// Window-local sort operator.
28///
29/// Buffers events per window, sorts on window close (watermark advance),
30/// and emits sorted events. Optionally applies a LIMIT.
31pub struct WindowLocalSortOperator {
32    /// Operator identifier for checkpointing.
33    operator_id: String,
34    /// Sort column specifications.
35    sort_columns: Vec<TopKSortColumn>,
36    /// Per-window event buffers, keyed by window start timestamp.
37    window_buffers: FxHashMap<i64, Vec<BufferedEvent>>,
38    /// Current watermark value.
39    current_watermark: i64,
40    /// Optional LIMIT for top-N within each window.
41    limit: Option<usize>,
42    /// Cached column indices for sort columns — resolved on first event.
43    cached_sort_indices: Vec<Option<usize>>,
44}
45
46/// A buffered event with its pre-computed sort key.
47struct BufferedEvent {
48    /// Memcomparable sort key.
49    sort_key: Vec<u8>,
50    /// The original event.
51    event: Event,
52}
53
54impl WindowLocalSortOperator {
55    /// Creates a new window-local sort operator.
56    #[must_use]
57    pub fn new(
58        operator_id: String,
59        sort_columns: Vec<TopKSortColumn>,
60        limit: Option<usize>,
61    ) -> Self {
62        let num_sort_cols = sort_columns.len();
63        Self {
64            operator_id,
65            sort_columns,
66            window_buffers: FxHashMap::default(),
67            current_watermark: i64::MIN,
68            limit,
69            cached_sort_indices: vec![None; num_sort_cols],
70        }
71    }
72
73    /// Returns the number of active window buffers.
74    #[must_use]
75    pub fn active_window_count(&self) -> usize {
76        self.window_buffers.len()
77    }
78
79    /// Returns the total number of buffered events across all windows.
80    #[must_use]
81    pub fn total_buffered_events(&self) -> usize {
82        self.window_buffers.values().map(Vec::len).sum()
83    }
84
85    /// Returns the current watermark value.
86    #[must_use]
87    pub fn current_watermark(&self) -> i64 {
88        self.current_watermark
89    }
90
91    /// Extracts a memcomparable sort key from an event.
92    ///
93    /// Caches column indices on first call to avoid per-event schema lookups.
94    fn extract_sort_key(&mut self, event: &Event) -> Vec<u8> {
95        let batch = &event.data;
96        let mut key = Vec::new();
97
98        for (i, col_spec) in self.sort_columns.iter().enumerate() {
99            let col_idx = if let Some(idx) = self.cached_sort_indices[i] {
100                idx
101            } else {
102                let Ok(idx) = batch.schema().index_of(&col_spec.column_name) else {
103                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
104                    continue;
105                };
106                self.cached_sort_indices[i] = Some(idx);
107                idx
108            };
109
110            let array = batch.column(col_idx);
111
112            if array.is_null(0) {
113                encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
114                continue;
115            }
116
117            match array.data_type() {
118                DataType::Int64 => {
119                    if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
120                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
121                        encode_i64(arr.value(0), col_spec.descending, &mut key);
122                    } else {
123                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
124                    }
125                }
126                DataType::Float64 => {
127                    if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
128                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
129                        encode_f64(arr.value(0), col_spec.descending, &mut key);
130                    } else {
131                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
132                    }
133                }
134                DataType::Utf8 => {
135                    if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
136                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
137                        encode_utf8(arr.value(0), col_spec.descending, &mut key);
138                    } else {
139                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
140                    }
141                }
142                DataType::Timestamp(unit, _) => {
143                    // All Arrow timestamp arrays store i64 values internally.
144                    // Extract via the correct concrete type for the TimeUnit.
145                    let val = match unit {
146                        TimeUnit::Second => array
147                            .as_any()
148                            .downcast_ref::<TimestampSecondArray>()
149                            .map(|a| a.value(0)),
150                        TimeUnit::Millisecond => array
151                            .as_any()
152                            .downcast_ref::<TimestampMillisecondArray>()
153                            .map(|a| a.value(0)),
154                        TimeUnit::Microsecond => array
155                            .as_any()
156                            .downcast_ref::<TimestampMicrosecondArray>()
157                            .map(|a| a.value(0)),
158                        TimeUnit::Nanosecond => array
159                            .as_any()
160                            .downcast_ref::<TimestampNanosecondArray>()
161                            .map(|a| a.value(0)),
162                    };
163                    if let Some(v) = val {
164                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
165                        encode_i64(v, col_spec.descending, &mut key);
166                    } else {
167                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
168                    }
169                }
170                _ => {
171                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
172                }
173            }
174        }
175
176        key
177    }
178
179    /// Adds an event to the buffer for the given window.
180    fn buffer_event(&mut self, window_start: i64, event: &Event) {
181        let sort_key = self.extract_sort_key(event);
182        let buffered = BufferedEvent {
183            sort_key,
184            event: event.clone(),
185        };
186        self.window_buffers
187            .entry(window_start)
188            .or_default()
189            .push(buffered);
190    }
191
192    /// Sorts and emits a window buffer, applying optional LIMIT.
193    fn sort_and_emit_window(&mut self, window_start: i64) -> OutputVec {
194        let mut outputs = OutputVec::new();
195
196        if let Some(mut buffer) = self.window_buffers.remove(&window_start) {
197            // Sort by memcomparable key
198            buffer.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
199
200            // Apply LIMIT if configured
201            let limit = self.limit.unwrap_or(buffer.len());
202            for buffered in buffer.into_iter().take(limit) {
203                outputs.push(Output::Event(buffered.event));
204            }
205        }
206
207        outputs
208    }
209
210    /// Checks all windows and emits any that are closed by the current watermark.
211    fn check_and_emit_closed_windows(&mut self) -> OutputVec {
212        let mut outputs = OutputVec::new();
213
214        // Collect window starts that are closed by the watermark
215        let closed_windows: Vec<i64> = self
216            .window_buffers
217            .keys()
218            .filter(|&&ws| ws < self.current_watermark)
219            .copied()
220            .collect();
221
222        // Sort for deterministic output
223        let mut closed_windows = closed_windows;
224        closed_windows.sort_unstable();
225
226        for window_start in closed_windows {
227            let window_outputs = self.sort_and_emit_window(window_start);
228            outputs.extend(window_outputs);
229        }
230
231        outputs
232    }
233}
234
235impl Operator for WindowLocalSortOperator {
236    fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
237        // Use the event timestamp as the window start key.
238        // In practice, the upstream window operator tags events with window_start.
239        // Here we use the event timestamp as a simple window key.
240        let window_start = event.timestamp;
241        self.buffer_event(window_start, event);
242        OutputVec::new()
243    }
244
245    fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
246        // Timer carries window_start as the timestamp.
247        // Update watermark and emit closed windows.
248        self.current_watermark = timer.timestamp;
249        self.check_and_emit_closed_windows()
250    }
251
252    fn checkpoint(&self) -> OperatorState {
253        let mut data = Vec::new();
254
255        // Write watermark
256        data.extend_from_slice(&self.current_watermark.to_le_bytes());
257
258        // Write number of windows
259        let num_windows = self.window_buffers.len() as u64;
260        data.extend_from_slice(&num_windows.to_le_bytes());
261
262        // Write each window: start + event count
263        for (&window_start, buffer) in &self.window_buffers {
264            data.extend_from_slice(&window_start.to_le_bytes());
265            let count = buffer.len() as u64;
266            data.extend_from_slice(&count.to_le_bytes());
267            // Write event timestamps
268            for be in buffer {
269                data.extend_from_slice(&be.event.timestamp.to_le_bytes());
270            }
271        }
272
273        OperatorState {
274            operator_id: self.operator_id.clone(),
275            data,
276        }
277    }
278
279    #[allow(clippy::cast_possible_truncation)] // Checkpoint wire format uses u64 for counts
280    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
281        if state.data.len() < 16 {
282            return Err(OperatorError::SerializationFailed(
283                "WindowLocalSort checkpoint data too short".to_string(),
284            ));
285        }
286
287        let mut offset = 0;
288        self.current_watermark = i64::from_le_bytes(
289            state.data[offset..offset + 8]
290                .try_into()
291                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
292        );
293        offset += 8;
294
295        let num_windows = u64::from_le_bytes(
296            state.data[offset..offset + 8]
297                .try_into()
298                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
299        ) as usize;
300        offset += 8;
301
302        self.window_buffers.clear();
303        for _ in 0..num_windows {
304            if offset + 16 > state.data.len() {
305                return Err(OperatorError::SerializationFailed(
306                    "WindowLocalSort checkpoint truncated".to_string(),
307                ));
308            }
309            let window_start = i64::from_le_bytes(
310                state.data[offset..offset + 8]
311                    .try_into()
312                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
313            );
314            offset += 8;
315
316            let count = u64::from_le_bytes(
317                state.data[offset..offset + 8]
318                    .try_into()
319                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
320            ) as usize;
321            offset += 8;
322
323            let mut buffer = Vec::with_capacity(count);
324            for _ in 0..count {
325                if offset + 8 > state.data.len() {
326                    return Err(OperatorError::SerializationFailed(
327                        "WindowLocalSort checkpoint truncated at events".to_string(),
328                    ));
329                }
330                let ts = i64::from_le_bytes(
331                    state.data[offset..offset + 8]
332                        .try_into()
333                        .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
334                );
335                offset += 8;
336
337                let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
338                    arrow_schema::Schema::empty(),
339                ));
340                buffer.push(BufferedEvent {
341                    sort_key: Vec::new(),
342                    event: Event::new(ts, batch),
343                });
344            }
345            self.window_buffers.insert(window_start, buffer);
346        }
347
348        Ok(())
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use crate::state::InMemoryStore;
356    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
357    use arrow_array::{Float64Array, Int64Array, RecordBatch};
358    use arrow_schema::{DataType, Field, Schema};
359    use smallvec::smallvec;
360    use std::sync::Arc;
361
362    fn make_event(timestamp: i64, price: f64) -> Event {
363        let schema = Arc::new(Schema::new(vec![Field::new(
364            "price",
365            DataType::Float64,
366            false,
367        )]));
368        let batch =
369            RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
370        Event::new(timestamp, batch)
371    }
372
373    fn make_event_i64(timestamp: i64, value: i64) -> Event {
374        let schema = Arc::new(Schema::new(vec![Field::new(
375            "value",
376            DataType::Int64,
377            false,
378        )]));
379        let batch =
380            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
381        Event::new(timestamp, batch)
382    }
383
384    fn create_test_context<'a>(
385        timers: &'a mut TimerService,
386        state: &'a mut dyn crate::state::StateStore,
387        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
388    ) -> OperatorContext<'a> {
389        OperatorContext {
390            event_time: 0,
391            processing_time: 0,
392            timers,
393            state,
394            watermark_generator: watermark_gen,
395            operator_index: 0,
396        }
397    }
398
399    #[test]
400    fn test_window_sort_single_window() {
401        let mut op = WindowLocalSortOperator::new(
402            "test_ws".to_string(),
403            vec![TopKSortColumn::ascending("price")],
404            None,
405        );
406
407        // Buffer events in window_start=1000
408        op.buffer_event(1000, &make_event(1000, 300.0));
409        op.buffer_event(1000, &make_event(1001, 100.0));
410        op.buffer_event(1000, &make_event(1002, 200.0));
411
412        // Sort and emit
413        let outputs = op.sort_and_emit_window(1000);
414        assert_eq!(outputs.len(), 3);
415
416        // Should be sorted ascending by price: 100, 200, 300
417        let prices: Vec<f64> = outputs
418            .iter()
419            .filter_map(|o| match o {
420                Output::Event(e) => Some(
421                    e.data
422                        .column(0)
423                        .as_any()
424                        .downcast_ref::<Float64Array>()
425                        .unwrap()
426                        .value(0),
427                ),
428                _ => None,
429            })
430            .collect();
431        assert_eq!(prices, vec![100.0, 200.0, 300.0]);
432    }
433
434    #[test]
435    fn test_window_sort_multiple_windows() {
436        let mut op = WindowLocalSortOperator::new(
437            "test_ws".to_string(),
438            vec![TopKSortColumn::ascending("price")],
439            None,
440        );
441
442        // Buffer events in two windows
443        op.buffer_event(1000, &make_event(1000, 300.0));
444        op.buffer_event(1000, &make_event(1001, 100.0));
445        op.buffer_event(2000, &make_event(2000, 250.0));
446        op.buffer_event(2000, &make_event(2001, 50.0));
447
448        assert_eq!(op.active_window_count(), 2);
449        assert_eq!(op.total_buffered_events(), 4);
450
451        // Emit first window
452        let out1 = op.sort_and_emit_window(1000);
453        assert_eq!(out1.len(), 2);
454        assert_eq!(op.active_window_count(), 1);
455
456        // Emit second window
457        let out2 = op.sort_and_emit_window(2000);
458        assert_eq!(out2.len(), 2);
459        assert_eq!(op.active_window_count(), 0);
460    }
461
462    #[test]
463    fn test_window_sort_multi_column() {
464        let mut op = WindowLocalSortOperator::new(
465            "test_ws".to_string(),
466            vec![TopKSortColumn::ascending("value")],
467            None,
468        );
469
470        op.buffer_event(1000, &make_event_i64(1000, 30));
471        op.buffer_event(1000, &make_event_i64(1001, 10));
472        op.buffer_event(1000, &make_event_i64(1002, 20));
473
474        let outputs = op.sort_and_emit_window(1000);
475        let vals: Vec<i64> = outputs
476            .iter()
477            .filter_map(|o| match o {
478                Output::Event(e) => Some(
479                    e.data
480                        .column(0)
481                        .as_any()
482                        .downcast_ref::<Int64Array>()
483                        .unwrap()
484                        .value(0),
485                ),
486                _ => None,
487            })
488            .collect();
489        assert_eq!(vals, vec![10, 20, 30]);
490    }
491
492    #[test]
493    fn test_window_sort_with_limit() {
494        let mut op = WindowLocalSortOperator::new(
495            "test_ws".to_string(),
496            vec![TopKSortColumn::ascending("price")],
497            Some(2),
498        );
499
500        op.buffer_event(1000, &make_event(1000, 300.0));
501        op.buffer_event(1000, &make_event(1001, 100.0));
502        op.buffer_event(1000, &make_event(1002, 200.0));
503
504        let outputs = op.sort_and_emit_window(1000);
505        // LIMIT 2: only first 2 sorted events
506        assert_eq!(outputs.len(), 2);
507
508        let prices: Vec<f64> = outputs
509            .iter()
510            .filter_map(|o| match o {
511                Output::Event(e) => Some(
512                    e.data
513                        .column(0)
514                        .as_any()
515                        .downcast_ref::<Float64Array>()
516                        .unwrap()
517                        .value(0),
518                ),
519                _ => None,
520            })
521            .collect();
522        assert_eq!(prices, vec![100.0, 200.0]);
523    }
524
525    #[test]
526    fn test_window_sort_empty_window() {
527        let mut op = WindowLocalSortOperator::new(
528            "test_ws".to_string(),
529            vec![TopKSortColumn::ascending("price")],
530            None,
531        );
532
533        // No events buffered for this window
534        let outputs = op.sort_and_emit_window(1000);
535        assert!(outputs.is_empty());
536    }
537
538    #[test]
539    fn test_window_sort_watermark_triggers_emit() {
540        let mut op = WindowLocalSortOperator::new(
541            "test_ws".to_string(),
542            vec![TopKSortColumn::ascending("price")],
543            None,
544        );
545
546        let mut timers = TimerService::new();
547        let mut state = InMemoryStore::new();
548        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
549
550        // Buffer events in window_start=1000
551        op.buffer_event(1000, &make_event(1000, 300.0));
552        op.buffer_event(1000, &make_event(1001, 100.0));
553
554        // Timer with watermark past window end triggers emission
555        let timer = Timer {
556            key: smallvec![],
557            timestamp: 2000, // watermark advances past 1000
558        };
559        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
560        let outputs = op.on_timer(timer, &mut ctx);
561
562        assert_eq!(outputs.len(), 2);
563        assert_eq!(op.active_window_count(), 0);
564    }
565
566    #[test]
567    fn test_window_sort_preserves_other_outputs() {
568        // Ensure window buffers for different windows are independent
569        let mut op = WindowLocalSortOperator::new(
570            "test_ws".to_string(),
571            vec![TopKSortColumn::ascending("price")],
572            None,
573        );
574
575        op.buffer_event(1000, &make_event(1000, 100.0));
576        op.buffer_event(2000, &make_event(2000, 200.0));
577
578        // Only emit window 1000
579        let outputs = op.sort_and_emit_window(1000);
580        assert_eq!(outputs.len(), 1);
581        // Window 2000 still active
582        assert_eq!(op.active_window_count(), 1);
583        assert_eq!(op.total_buffered_events(), 1);
584    }
585
586    #[test]
587    fn test_window_sort_checkpoint_restore() {
588        let mut op = WindowLocalSortOperator::new(
589            "test_ws".to_string(),
590            vec![TopKSortColumn::ascending("price")],
591            Some(5),
592        );
593
594        op.buffer_event(1000, &make_event(1000, 100.0));
595        op.buffer_event(2000, &make_event(2000, 200.0));
596
597        let checkpoint = op.checkpoint();
598        assert_eq!(checkpoint.operator_id, "test_ws");
599
600        let mut op2 = WindowLocalSortOperator::new(
601            "test_ws".to_string(),
602            vec![TopKSortColumn::ascending("price")],
603            Some(5),
604        );
605        op2.restore(checkpoint).unwrap();
606
607        assert_eq!(op2.active_window_count(), 2);
608    }
609
610    #[test]
611    fn test_window_sort_descending() {
612        let mut op = WindowLocalSortOperator::new(
613            "test_ws".to_string(),
614            vec![TopKSortColumn::descending("price")],
615            None,
616        );
617
618        op.buffer_event(1000, &make_event(1000, 100.0));
619        op.buffer_event(1000, &make_event(1001, 300.0));
620        op.buffer_event(1000, &make_event(1002, 200.0));
621
622        let outputs = op.sort_and_emit_window(1000);
623        let prices: Vec<f64> = outputs
624            .iter()
625            .filter_map(|o| match o {
626                Output::Event(e) => Some(
627                    e.data
628                        .column(0)
629                        .as_any()
630                        .downcast_ref::<Float64Array>()
631                        .unwrap()
632                        .value(0),
633                ),
634                _ => None,
635            })
636            .collect();
637        // Descending: 300, 200, 100
638        assert_eq!(prices, vec![300.0, 200.0, 100.0]);
639    }
640
641    #[test]
642    fn test_window_sort_large_window() {
643        let mut op = WindowLocalSortOperator::new(
644            "test_ws".to_string(),
645            vec![TopKSortColumn::ascending("value")],
646            None,
647        );
648
649        // Buffer 100 events in reverse order
650        for i in (0..100).rev() {
651            op.buffer_event(1000, &make_event_i64(1000 + i, i));
652        }
653
654        let outputs = op.sort_and_emit_window(1000);
655        assert_eq!(outputs.len(), 100);
656
657        // Verify sorted ascending
658        let vals: Vec<i64> = outputs
659            .iter()
660            .filter_map(|o| match o {
661                Output::Event(e) => Some(
662                    e.data
663                        .column(0)
664                        .as_any()
665                        .downcast_ref::<Int64Array>()
666                        .unwrap()
667                        .value(0),
668                ),
669                _ => None,
670            })
671            .collect();
672        let expected: Vec<i64> = (0..100).collect();
673        assert_eq!(vals, expected);
674    }
675}