1use super::topk::{
15 encode_f64, encode_i64, encode_not_null, encode_null, encode_utf8, TopKSortColumn,
16};
17use super::{
18 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
19};
20use arrow_array::{
21 Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray,
22 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
23};
24use arrow_schema::{DataType, TimeUnit};
25use rustc_hash::FxHashMap;
26
27pub struct WindowLocalSortOperator {
32 operator_id: String,
34 sort_columns: Vec<TopKSortColumn>,
36 window_buffers: FxHashMap<i64, Vec<BufferedEvent>>,
38 current_watermark: i64,
40 limit: Option<usize>,
42 cached_sort_indices: Vec<Option<usize>>,
44}
45
46struct BufferedEvent {
48 sort_key: Vec<u8>,
50 event: Event,
52}
53
54impl WindowLocalSortOperator {
55 #[must_use]
57 pub fn new(
58 operator_id: String,
59 sort_columns: Vec<TopKSortColumn>,
60 limit: Option<usize>,
61 ) -> Self {
62 let num_sort_cols = sort_columns.len();
63 Self {
64 operator_id,
65 sort_columns,
66 window_buffers: FxHashMap::default(),
67 current_watermark: i64::MIN,
68 limit,
69 cached_sort_indices: vec![None; num_sort_cols],
70 }
71 }
72
73 #[must_use]
75 pub fn active_window_count(&self) -> usize {
76 self.window_buffers.len()
77 }
78
79 #[must_use]
81 pub fn total_buffered_events(&self) -> usize {
82 self.window_buffers.values().map(Vec::len).sum()
83 }
84
85 #[must_use]
87 pub fn current_watermark(&self) -> i64 {
88 self.current_watermark
89 }
90
91 fn extract_sort_key(&mut self, event: &Event) -> Vec<u8> {
95 let batch = &event.data;
96 let mut key = Vec::new();
97
98 for (i, col_spec) in self.sort_columns.iter().enumerate() {
99 let col_idx = if let Some(idx) = self.cached_sort_indices[i] {
100 idx
101 } else {
102 let Ok(idx) = batch.schema().index_of(&col_spec.column_name) else {
103 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
104 continue;
105 };
106 self.cached_sort_indices[i] = Some(idx);
107 idx
108 };
109
110 let array = batch.column(col_idx);
111
112 if array.is_null(0) {
113 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
114 continue;
115 }
116
117 match array.data_type() {
118 DataType::Int64 => {
119 if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
120 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
121 encode_i64(arr.value(0), col_spec.descending, &mut key);
122 } else {
123 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
124 }
125 }
126 DataType::Float64 => {
127 if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
128 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
129 encode_f64(arr.value(0), col_spec.descending, &mut key);
130 } else {
131 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
132 }
133 }
134 DataType::Utf8 => {
135 if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
136 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
137 encode_utf8(arr.value(0), col_spec.descending, &mut key);
138 } else {
139 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
140 }
141 }
142 DataType::Timestamp(unit, _) => {
143 let val = match unit {
146 TimeUnit::Second => array
147 .as_any()
148 .downcast_ref::<TimestampSecondArray>()
149 .map(|a| a.value(0)),
150 TimeUnit::Millisecond => array
151 .as_any()
152 .downcast_ref::<TimestampMillisecondArray>()
153 .map(|a| a.value(0)),
154 TimeUnit::Microsecond => array
155 .as_any()
156 .downcast_ref::<TimestampMicrosecondArray>()
157 .map(|a| a.value(0)),
158 TimeUnit::Nanosecond => array
159 .as_any()
160 .downcast_ref::<TimestampNanosecondArray>()
161 .map(|a| a.value(0)),
162 };
163 if let Some(v) = val {
164 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
165 encode_i64(v, col_spec.descending, &mut key);
166 } else {
167 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
168 }
169 }
170 _ => {
171 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
172 }
173 }
174 }
175
176 key
177 }
178
179 fn buffer_event(&mut self, window_start: i64, event: &Event) {
181 let sort_key = self.extract_sort_key(event);
182 let buffered = BufferedEvent {
183 sort_key,
184 event: event.clone(),
185 };
186 self.window_buffers
187 .entry(window_start)
188 .or_default()
189 .push(buffered);
190 }
191
192 fn sort_and_emit_window(&mut self, window_start: i64) -> OutputVec {
194 let mut outputs = OutputVec::new();
195
196 if let Some(mut buffer) = self.window_buffers.remove(&window_start) {
197 buffer.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
199
200 let limit = self.limit.unwrap_or(buffer.len());
202 for buffered in buffer.into_iter().take(limit) {
203 outputs.push(Output::Event(buffered.event));
204 }
205 }
206
207 outputs
208 }
209
210 fn check_and_emit_closed_windows(&mut self) -> OutputVec {
212 let mut outputs = OutputVec::new();
213
214 let closed_windows: Vec<i64> = self
216 .window_buffers
217 .keys()
218 .filter(|&&ws| ws < self.current_watermark)
219 .copied()
220 .collect();
221
222 let mut closed_windows = closed_windows;
224 closed_windows.sort_unstable();
225
226 for window_start in closed_windows {
227 let window_outputs = self.sort_and_emit_window(window_start);
228 outputs.extend(window_outputs);
229 }
230
231 outputs
232 }
233}
234
235impl Operator for WindowLocalSortOperator {
236 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
237 let window_start = event.timestamp;
241 self.buffer_event(window_start, event);
242 OutputVec::new()
243 }
244
245 fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
246 self.current_watermark = timer.timestamp;
249 self.check_and_emit_closed_windows()
250 }
251
252 fn checkpoint(&self) -> OperatorState {
253 let mut data = Vec::new();
254
255 data.extend_from_slice(&self.current_watermark.to_le_bytes());
257
258 let num_windows = self.window_buffers.len() as u64;
260 data.extend_from_slice(&num_windows.to_le_bytes());
261
262 for (&window_start, buffer) in &self.window_buffers {
264 data.extend_from_slice(&window_start.to_le_bytes());
265 let count = buffer.len() as u64;
266 data.extend_from_slice(&count.to_le_bytes());
267 for be in buffer {
269 data.extend_from_slice(&be.event.timestamp.to_le_bytes());
270 }
271 }
272
273 OperatorState {
274 operator_id: self.operator_id.clone(),
275 data,
276 }
277 }
278
279 #[allow(clippy::cast_possible_truncation)] fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
281 if state.data.len() < 16 {
282 return Err(OperatorError::SerializationFailed(
283 "WindowLocalSort checkpoint data too short".to_string(),
284 ));
285 }
286
287 let mut offset = 0;
288 self.current_watermark = i64::from_le_bytes(
289 state.data[offset..offset + 8]
290 .try_into()
291 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
292 );
293 offset += 8;
294
295 let num_windows = u64::from_le_bytes(
296 state.data[offset..offset + 8]
297 .try_into()
298 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
299 ) as usize;
300 offset += 8;
301
302 self.window_buffers.clear();
303 for _ in 0..num_windows {
304 if offset + 16 > state.data.len() {
305 return Err(OperatorError::SerializationFailed(
306 "WindowLocalSort checkpoint truncated".to_string(),
307 ));
308 }
309 let window_start = i64::from_le_bytes(
310 state.data[offset..offset + 8]
311 .try_into()
312 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
313 );
314 offset += 8;
315
316 let count = u64::from_le_bytes(
317 state.data[offset..offset + 8]
318 .try_into()
319 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
320 ) as usize;
321 offset += 8;
322
323 let mut buffer = Vec::with_capacity(count);
324 for _ in 0..count {
325 if offset + 8 > state.data.len() {
326 return Err(OperatorError::SerializationFailed(
327 "WindowLocalSort checkpoint truncated at events".to_string(),
328 ));
329 }
330 let ts = i64::from_le_bytes(
331 state.data[offset..offset + 8]
332 .try_into()
333 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
334 );
335 offset += 8;
336
337 let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
338 arrow_schema::Schema::empty(),
339 ));
340 buffer.push(BufferedEvent {
341 sort_key: Vec::new(),
342 event: Event::new(ts, batch),
343 });
344 }
345 self.window_buffers.insert(window_start, buffer);
346 }
347
348 Ok(())
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use crate::state::InMemoryStore;
356 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
357 use arrow_array::{Float64Array, Int64Array, RecordBatch};
358 use arrow_schema::{DataType, Field, Schema};
359 use smallvec::smallvec;
360 use std::sync::Arc;
361
362 fn make_event(timestamp: i64, price: f64) -> Event {
363 let schema = Arc::new(Schema::new(vec![Field::new(
364 "price",
365 DataType::Float64,
366 false,
367 )]));
368 let batch =
369 RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
370 Event::new(timestamp, batch)
371 }
372
373 fn make_event_i64(timestamp: i64, value: i64) -> Event {
374 let schema = Arc::new(Schema::new(vec![Field::new(
375 "value",
376 DataType::Int64,
377 false,
378 )]));
379 let batch =
380 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
381 Event::new(timestamp, batch)
382 }
383
384 fn create_test_context<'a>(
385 timers: &'a mut TimerService,
386 state: &'a mut dyn crate::state::StateStore,
387 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
388 ) -> OperatorContext<'a> {
389 OperatorContext {
390 event_time: 0,
391 processing_time: 0,
392 timers,
393 state,
394 watermark_generator: watermark_gen,
395 operator_index: 0,
396 }
397 }
398
399 #[test]
400 fn test_window_sort_single_window() {
401 let mut op = WindowLocalSortOperator::new(
402 "test_ws".to_string(),
403 vec![TopKSortColumn::ascending("price")],
404 None,
405 );
406
407 op.buffer_event(1000, &make_event(1000, 300.0));
409 op.buffer_event(1000, &make_event(1001, 100.0));
410 op.buffer_event(1000, &make_event(1002, 200.0));
411
412 let outputs = op.sort_and_emit_window(1000);
414 assert_eq!(outputs.len(), 3);
415
416 let prices: Vec<f64> = outputs
418 .iter()
419 .filter_map(|o| match o {
420 Output::Event(e) => Some(
421 e.data
422 .column(0)
423 .as_any()
424 .downcast_ref::<Float64Array>()
425 .unwrap()
426 .value(0),
427 ),
428 _ => None,
429 })
430 .collect();
431 assert_eq!(prices, vec![100.0, 200.0, 300.0]);
432 }
433
434 #[test]
435 fn test_window_sort_multiple_windows() {
436 let mut op = WindowLocalSortOperator::new(
437 "test_ws".to_string(),
438 vec![TopKSortColumn::ascending("price")],
439 None,
440 );
441
442 op.buffer_event(1000, &make_event(1000, 300.0));
444 op.buffer_event(1000, &make_event(1001, 100.0));
445 op.buffer_event(2000, &make_event(2000, 250.0));
446 op.buffer_event(2000, &make_event(2001, 50.0));
447
448 assert_eq!(op.active_window_count(), 2);
449 assert_eq!(op.total_buffered_events(), 4);
450
451 let out1 = op.sort_and_emit_window(1000);
453 assert_eq!(out1.len(), 2);
454 assert_eq!(op.active_window_count(), 1);
455
456 let out2 = op.sort_and_emit_window(2000);
458 assert_eq!(out2.len(), 2);
459 assert_eq!(op.active_window_count(), 0);
460 }
461
462 #[test]
463 fn test_window_sort_multi_column() {
464 let mut op = WindowLocalSortOperator::new(
465 "test_ws".to_string(),
466 vec![TopKSortColumn::ascending("value")],
467 None,
468 );
469
470 op.buffer_event(1000, &make_event_i64(1000, 30));
471 op.buffer_event(1000, &make_event_i64(1001, 10));
472 op.buffer_event(1000, &make_event_i64(1002, 20));
473
474 let outputs = op.sort_and_emit_window(1000);
475 let vals: Vec<i64> = outputs
476 .iter()
477 .filter_map(|o| match o {
478 Output::Event(e) => Some(
479 e.data
480 .column(0)
481 .as_any()
482 .downcast_ref::<Int64Array>()
483 .unwrap()
484 .value(0),
485 ),
486 _ => None,
487 })
488 .collect();
489 assert_eq!(vals, vec![10, 20, 30]);
490 }
491
492 #[test]
493 fn test_window_sort_with_limit() {
494 let mut op = WindowLocalSortOperator::new(
495 "test_ws".to_string(),
496 vec![TopKSortColumn::ascending("price")],
497 Some(2),
498 );
499
500 op.buffer_event(1000, &make_event(1000, 300.0));
501 op.buffer_event(1000, &make_event(1001, 100.0));
502 op.buffer_event(1000, &make_event(1002, 200.0));
503
504 let outputs = op.sort_and_emit_window(1000);
505 assert_eq!(outputs.len(), 2);
507
508 let prices: Vec<f64> = outputs
509 .iter()
510 .filter_map(|o| match o {
511 Output::Event(e) => Some(
512 e.data
513 .column(0)
514 .as_any()
515 .downcast_ref::<Float64Array>()
516 .unwrap()
517 .value(0),
518 ),
519 _ => None,
520 })
521 .collect();
522 assert_eq!(prices, vec![100.0, 200.0]);
523 }
524
525 #[test]
526 fn test_window_sort_empty_window() {
527 let mut op = WindowLocalSortOperator::new(
528 "test_ws".to_string(),
529 vec![TopKSortColumn::ascending("price")],
530 None,
531 );
532
533 let outputs = op.sort_and_emit_window(1000);
535 assert!(outputs.is_empty());
536 }
537
538 #[test]
539 fn test_window_sort_watermark_triggers_emit() {
540 let mut op = WindowLocalSortOperator::new(
541 "test_ws".to_string(),
542 vec![TopKSortColumn::ascending("price")],
543 None,
544 );
545
546 let mut timers = TimerService::new();
547 let mut state = InMemoryStore::new();
548 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
549
550 op.buffer_event(1000, &make_event(1000, 300.0));
552 op.buffer_event(1000, &make_event(1001, 100.0));
553
554 let timer = Timer {
556 key: smallvec![],
557 timestamp: 2000, };
559 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
560 let outputs = op.on_timer(timer, &mut ctx);
561
562 assert_eq!(outputs.len(), 2);
563 assert_eq!(op.active_window_count(), 0);
564 }
565
566 #[test]
567 fn test_window_sort_preserves_other_outputs() {
568 let mut op = WindowLocalSortOperator::new(
570 "test_ws".to_string(),
571 vec![TopKSortColumn::ascending("price")],
572 None,
573 );
574
575 op.buffer_event(1000, &make_event(1000, 100.0));
576 op.buffer_event(2000, &make_event(2000, 200.0));
577
578 let outputs = op.sort_and_emit_window(1000);
580 assert_eq!(outputs.len(), 1);
581 assert_eq!(op.active_window_count(), 1);
583 assert_eq!(op.total_buffered_events(), 1);
584 }
585
586 #[test]
587 fn test_window_sort_checkpoint_restore() {
588 let mut op = WindowLocalSortOperator::new(
589 "test_ws".to_string(),
590 vec![TopKSortColumn::ascending("price")],
591 Some(5),
592 );
593
594 op.buffer_event(1000, &make_event(1000, 100.0));
595 op.buffer_event(2000, &make_event(2000, 200.0));
596
597 let checkpoint = op.checkpoint();
598 assert_eq!(checkpoint.operator_id, "test_ws");
599
600 let mut op2 = WindowLocalSortOperator::new(
601 "test_ws".to_string(),
602 vec![TopKSortColumn::ascending("price")],
603 Some(5),
604 );
605 op2.restore(checkpoint).unwrap();
606
607 assert_eq!(op2.active_window_count(), 2);
608 }
609
610 #[test]
611 fn test_window_sort_descending() {
612 let mut op = WindowLocalSortOperator::new(
613 "test_ws".to_string(),
614 vec![TopKSortColumn::descending("price")],
615 None,
616 );
617
618 op.buffer_event(1000, &make_event(1000, 100.0));
619 op.buffer_event(1000, &make_event(1001, 300.0));
620 op.buffer_event(1000, &make_event(1002, 200.0));
621
622 let outputs = op.sort_and_emit_window(1000);
623 let prices: Vec<f64> = outputs
624 .iter()
625 .filter_map(|o| match o {
626 Output::Event(e) => Some(
627 e.data
628 .column(0)
629 .as_any()
630 .downcast_ref::<Float64Array>()
631 .unwrap()
632 .value(0),
633 ),
634 _ => None,
635 })
636 .collect();
637 assert_eq!(prices, vec![300.0, 200.0, 100.0]);
639 }
640
641 #[test]
642 fn test_window_sort_large_window() {
643 let mut op = WindowLocalSortOperator::new(
644 "test_ws".to_string(),
645 vec![TopKSortColumn::ascending("value")],
646 None,
647 );
648
649 for i in (0..100).rev() {
651 op.buffer_event(1000, &make_event_i64(1000 + i, i));
652 }
653
654 let outputs = op.sort_and_emit_window(1000);
655 assert_eq!(outputs.len(), 100);
656
657 let vals: Vec<i64> = outputs
659 .iter()
660 .filter_map(|o| match o {
661 Output::Event(e) => Some(
662 e.data
663 .column(0)
664 .as_any()
665 .downcast_ref::<Int64Array>()
666 .unwrap()
667 .value(0),
668 ),
669 _ => None,
670 })
671 .collect();
672 let expected: Vec<i64> = (0..100).collect();
673 assert_eq!(vals, expected);
674 }
675}