1use super::topk::{
22 encode_f64, encode_i64, encode_not_null, encode_null, encode_utf8, TopKSortColumn,
23};
24use super::{
25 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
26};
27use arrow_array::{Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray};
28use arrow_schema::DataType;
29
30struct SortBufferEntry {
32 sort_key: Vec<u8>,
34 event: Event,
36}
37
38pub struct WatermarkBoundedSortOperator {
42 operator_id: String,
44 sort_columns: Vec<TopKSortColumn>,
46 buffer: Vec<SortBufferEntry>,
48 max_buffer_size: usize,
50 last_watermark: i64,
52 late_events_dropped: u64,
54 cached_sort_indices: Vec<Option<usize>>,
56}
57
58impl WatermarkBoundedSortOperator {
59 #[must_use]
61 pub fn new(
62 operator_id: String,
63 sort_columns: Vec<TopKSortColumn>,
64 max_buffer_size: usize,
65 ) -> Self {
66 let num_sort_cols = sort_columns.len();
67 Self {
68 operator_id,
69 sort_columns,
70 buffer: Vec::with_capacity(max_buffer_size),
71 max_buffer_size,
72 last_watermark: i64::MIN,
73 late_events_dropped: 0,
74 cached_sort_indices: vec![None; num_sort_cols],
75 }
76 }
77
78 #[must_use]
80 pub fn buffer_size(&self) -> usize {
81 self.buffer.len()
82 }
83
84 #[must_use]
86 pub fn is_buffer_empty(&self) -> bool {
87 self.buffer.is_empty()
88 }
89
90 #[must_use]
92 pub fn last_watermark(&self) -> i64 {
93 self.last_watermark
94 }
95
96 #[must_use]
98 pub fn late_events_dropped(&self) -> u64 {
99 self.late_events_dropped
100 }
101
102 fn extract_sort_key(&mut self, event: &Event) -> Vec<u8> {
106 let batch = &event.data;
107 let mut key = Vec::new();
108
109 for (i, col_spec) in self.sort_columns.iter().enumerate() {
110 let col_idx = if let Some(idx) = self.cached_sort_indices[i] {
111 idx
112 } else {
113 let Ok(idx) = batch.schema().index_of(&col_spec.column_name) else {
114 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
115 continue;
116 };
117 self.cached_sort_indices[i] = Some(idx);
118 idx
119 };
120
121 let array = batch.column(col_idx);
122
123 if array.is_null(0) {
124 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
125 continue;
126 }
127
128 match array.data_type() {
129 DataType::Int64 => {
130 if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
131 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
132 encode_i64(arr.value(0), col_spec.descending, &mut key);
133 } else {
134 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
135 }
136 }
137 DataType::Float64 => {
138 if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
139 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
140 encode_f64(arr.value(0), col_spec.descending, &mut key);
141 } else {
142 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
143 }
144 }
145 DataType::Utf8 => {
146 if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
147 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
148 encode_utf8(arr.value(0), col_spec.descending, &mut key);
149 } else {
150 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
151 }
152 }
153 DataType::Timestamp(_, _) => {
154 if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
155 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
156 encode_i64(arr.value(0), col_spec.descending, &mut key);
157 } else {
158 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
159 }
160 }
161 _ => {
162 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
163 }
164 }
165 }
166
167 key
168 }
169
170 fn on_watermark_advance(&mut self, new_watermark: i64) -> OutputVec {
172 if new_watermark <= self.last_watermark {
173 return OutputVec::new();
174 }
175
176 self.buffer.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
178
179 let mut outputs = OutputVec::new();
181 for entry in self.buffer.drain(..) {
182 outputs.push(Output::Event(entry.event));
183 }
184
185 outputs.push(Output::Watermark(new_watermark));
187
188 self.last_watermark = new_watermark;
189 outputs
190 }
191}
192
193impl Operator for WatermarkBoundedSortOperator {
194 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
195 if event.timestamp <= self.last_watermark {
197 self.late_events_dropped += 1;
198 return OutputVec::new();
199 }
200
201 if self.buffer.len() >= self.max_buffer_size {
203 let outputs = self.on_watermark_advance(event.timestamp);
205 let sort_key = self.extract_sort_key(event);
207 self.buffer.push(SortBufferEntry {
208 sort_key,
209 event: event.clone(),
210 });
211 return outputs;
212 }
213
214 let sort_key = self.extract_sort_key(event);
215 self.buffer.push(SortBufferEntry {
216 sort_key,
217 event: event.clone(),
218 });
219
220 OutputVec::new()
221 }
222
223 fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
224 self.on_watermark_advance(timer.timestamp)
226 }
227
228 fn checkpoint(&self) -> OperatorState {
229 let mut data = Vec::new();
230
231 data.extend_from_slice(&self.last_watermark.to_le_bytes());
233
234 data.extend_from_slice(&self.late_events_dropped.to_le_bytes());
236
237 let buf_len = self.buffer.len() as u64;
239 data.extend_from_slice(&buf_len.to_le_bytes());
240
241 for entry in &self.buffer {
243 data.extend_from_slice(&entry.event.timestamp.to_le_bytes());
244 }
245
246 OperatorState {
247 operator_id: self.operator_id.clone(),
248 data,
249 }
250 }
251
252 #[allow(clippy::cast_possible_truncation)] fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
254 if state.data.len() < 24 {
255 return Err(OperatorError::SerializationFailed(
256 "WatermarkSort checkpoint data too short".to_string(),
257 ));
258 }
259
260 let mut offset = 0;
261 self.last_watermark = i64::from_le_bytes(
262 state.data[offset..offset + 8]
263 .try_into()
264 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
265 );
266 offset += 8;
267
268 self.late_events_dropped = u64::from_le_bytes(
269 state.data[offset..offset + 8]
270 .try_into()
271 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
272 );
273 offset += 8;
274
275 let buf_len = u64::from_le_bytes(
276 state.data[offset..offset + 8]
277 .try_into()
278 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
279 ) as usize;
280 offset += 8;
281
282 self.buffer.clear();
283 for _ in 0..buf_len {
284 if offset + 8 > state.data.len() {
285 return Err(OperatorError::SerializationFailed(
286 "WatermarkSort checkpoint truncated".to_string(),
287 ));
288 }
289 let ts = i64::from_le_bytes(
290 state.data[offset..offset + 8]
291 .try_into()
292 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
293 );
294 offset += 8;
295
296 let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
297 arrow_schema::Schema::empty(),
298 ));
299 self.buffer.push(SortBufferEntry {
300 sort_key: Vec::new(),
301 event: Event::new(ts, batch),
302 });
303 }
304
305 Ok(())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::state::InMemoryStore;
313 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
314 use arrow_array::{Float64Array, Int64Array, RecordBatch};
315 use arrow_schema::{DataType, Field, Schema};
316 use smallvec::smallvec;
317 use std::sync::Arc;
318
319 fn make_event(timestamp: i64, value: i64) -> Event {
320 let schema = Arc::new(Schema::new(vec![Field::new(
321 "value",
322 DataType::Int64,
323 false,
324 )]));
325 let batch =
326 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
327 Event::new(timestamp, batch)
328 }
329
330 fn make_event_f64(timestamp: i64, price: f64) -> Event {
331 let schema = Arc::new(Schema::new(vec![Field::new(
332 "price",
333 DataType::Float64,
334 false,
335 )]));
336 let batch =
337 RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
338 Event::new(timestamp, batch)
339 }
340
341 fn create_test_context<'a>(
342 timers: &'a mut TimerService,
343 state: &'a mut dyn crate::state::StateStore,
344 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
345 ) -> OperatorContext<'a> {
346 OperatorContext {
347 event_time: 0,
348 processing_time: 0,
349 timers,
350 state,
351 watermark_generator: watermark_gen,
352 operator_index: 0,
353 }
354 }
355
356 #[test]
357 fn test_watermark_sort_basic() {
358 let mut op = WatermarkBoundedSortOperator::new(
359 "test_wms".to_string(),
360 vec![TopKSortColumn::ascending("value")],
361 100_000,
362 );
363
364 let mut timers = TimerService::new();
365 let mut state = InMemoryStore::new();
366 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
367
368 let e1 = make_event(100, 30);
370 let e2 = make_event(200, 10);
371 let e3 = make_event(300, 20);
372
373 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
374 op.process(&e1, &mut ctx);
375 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
376 op.process(&e2, &mut ctx);
377 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
378 op.process(&e3, &mut ctx);
379
380 assert_eq!(op.buffer_size(), 3);
381 }
382
383 #[test]
384 fn test_watermark_sort_out_of_order() {
385 let mut op = WatermarkBoundedSortOperator::new(
386 "test_wms".to_string(),
387 vec![TopKSortColumn::ascending("value")],
388 100_000,
389 );
390
391 let mut timers = TimerService::new();
392 let mut state = InMemoryStore::new();
393 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
394
395 let events = vec![
397 make_event(100, 30),
398 make_event(200, 10),
399 make_event(300, 20),
400 ];
401
402 for event in &events {
403 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
404 op.process(event, &mut ctx);
405 }
406
407 let timer = Timer {
409 key: smallvec![],
410 timestamp: 500,
411 };
412 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
413 let outputs = op.on_timer(timer, &mut ctx);
414
415 assert_eq!(outputs.len(), 4);
417
418 let vals: Vec<i64> = outputs
420 .iter()
421 .filter_map(|o| match o {
422 Output::Event(e) => Some(
423 e.data
424 .column(0)
425 .as_any()
426 .downcast_ref::<Int64Array>()
427 .unwrap()
428 .value(0),
429 ),
430 _ => None,
431 })
432 .collect();
433 assert_eq!(vals, vec![10, 20, 30]);
434 }
435
436 #[test]
437 fn test_watermark_sort_watermark_advance() {
438 let mut op = WatermarkBoundedSortOperator::new(
439 "test_wms".to_string(),
440 vec![TopKSortColumn::ascending("value")],
441 100_000,
442 );
443
444 let mut timers = TimerService::new();
446 let mut state = InMemoryStore::new();
447 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
448
449 let e1 = make_event(100, 50);
450 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
451 op.process(&e1, &mut ctx);
452
453 let outputs = op.on_watermark_advance(200);
454 assert_eq!(outputs.len(), 2);
456 assert!(op.is_buffer_empty());
457 assert_eq!(op.last_watermark(), 200);
458 }
459
460 #[test]
461 fn test_watermark_sort_late_events() {
462 let mut op = WatermarkBoundedSortOperator::new(
463 "test_wms".to_string(),
464 vec![TopKSortColumn::ascending("value")],
465 100_000,
466 );
467
468 let mut timers = TimerService::new();
469 let mut state = InMemoryStore::new();
470 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
471
472 let e1 = make_event(100, 10);
474 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
475 op.process(&e1, &mut ctx);
476 op.on_watermark_advance(200);
477
478 let late = make_event(50, 5);
480 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
481 let outputs = op.process(&late, &mut ctx);
482
483 assert!(outputs.is_empty());
484 assert_eq!(op.late_events_dropped(), 1);
485 }
486
487 #[test]
488 fn test_watermark_sort_multi_column() {
489 let mut op = WatermarkBoundedSortOperator::new(
490 "test_wms".to_string(),
491 vec![TopKSortColumn::descending("price")],
492 100_000,
493 );
494
495 let mut timers = TimerService::new();
496 let mut state = InMemoryStore::new();
497 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
498
499 let events = vec![
500 make_event_f64(100, 100.0),
501 make_event_f64(200, 300.0),
502 make_event_f64(300, 200.0),
503 ];
504
505 for event in &events {
506 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
507 op.process(event, &mut ctx);
508 }
509
510 let outputs = op.on_watermark_advance(500);
511 let prices: Vec<f64> = outputs
512 .iter()
513 .filter_map(|o| match o {
514 Output::Event(e) => Some(
515 e.data
516 .column(0)
517 .as_any()
518 .downcast_ref::<Float64Array>()
519 .unwrap()
520 .value(0),
521 ),
522 _ => None,
523 })
524 .collect();
525 assert_eq!(prices, vec![300.0, 200.0, 100.0]);
527 }
528
529 #[test]
530 fn test_watermark_sort_empty_buffer() {
531 let mut op = WatermarkBoundedSortOperator::new(
532 "test_wms".to_string(),
533 vec![TopKSortColumn::ascending("value")],
534 100_000,
535 );
536
537 let outputs = op.on_watermark_advance(100);
539 assert_eq!(outputs.len(), 1);
541 match &outputs[0] {
542 Output::Watermark(w) => assert_eq!(*w, 100),
543 _ => panic!("Expected Watermark output"),
544 }
545 }
546
547 #[test]
548 fn test_watermark_sort_buffer_limit() {
549 let mut op = WatermarkBoundedSortOperator::new(
550 "test_wms".to_string(),
551 vec![TopKSortColumn::ascending("value")],
552 3, );
554
555 let mut timers = TimerService::new();
556 let mut state = InMemoryStore::new();
557 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
558
559 for i in 1..=3 {
561 let event = make_event(i * 100, i);
562 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
563 op.process(&event, &mut ctx);
564 }
565 assert_eq!(op.buffer_size(), 3);
566
567 let overflow = make_event(400, 4);
569 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
570 let outputs = op.process(&overflow, &mut ctx);
571
572 assert!(!outputs.is_empty());
574 assert_eq!(op.buffer_size(), 1); }
576
577 #[test]
578 fn test_watermark_sort_checkpoint_restore() {
579 let mut op = WatermarkBoundedSortOperator::new(
580 "test_wms".to_string(),
581 vec![TopKSortColumn::ascending("value")],
582 100_000,
583 );
584
585 let mut timers = TimerService::new();
586 let mut state = InMemoryStore::new();
587 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
588
589 let e1 = make_event(100, 10);
590 let e2 = make_event(200, 20);
591 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
592 op.process(&e1, &mut ctx);
593 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
594 op.process(&e2, &mut ctx);
595
596 let checkpoint = op.checkpoint();
597 assert_eq!(checkpoint.operator_id, "test_wms");
598
599 let mut op2 = WatermarkBoundedSortOperator::new(
600 "test_wms".to_string(),
601 vec![TopKSortColumn::ascending("value")],
602 100_000,
603 );
604 op2.restore(checkpoint).unwrap();
605
606 assert_eq!(op2.buffer_size(), 2);
607 assert_eq!(op2.last_watermark(), i64::MIN);
608 }
609
610 #[test]
611 fn test_watermark_sort_ascending_and_descending() {
612 let mut op_asc = WatermarkBoundedSortOperator::new(
614 "asc".to_string(),
615 vec![TopKSortColumn::ascending("value")],
616 100_000,
617 );
618
619 let mut timers = TimerService::new();
620 let mut state = InMemoryStore::new();
621 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
622
623 for val in [30i64, 10, 20] {
624 let event = make_event(val * 10, val);
625 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
626 op_asc.process(&event, &mut ctx);
627 }
628
629 let asc_out = op_asc.on_watermark_advance(1000);
630 let asc_vals: Vec<i64> = asc_out
631 .iter()
632 .filter_map(|o| match o {
633 Output::Event(e) => Some(
634 e.data
635 .column(0)
636 .as_any()
637 .downcast_ref::<Int64Array>()
638 .unwrap()
639 .value(0),
640 ),
641 _ => None,
642 })
643 .collect();
644 assert_eq!(asc_vals, vec![10, 20, 30]);
645
646 let mut op_desc = WatermarkBoundedSortOperator::new(
648 "desc".to_string(),
649 vec![TopKSortColumn::descending("value")],
650 100_000,
651 );
652
653 for val in [30i64, 10, 20] {
654 let event = make_event(val * 10, val);
655 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
656 op_desc.process(&event, &mut ctx);
657 }
658
659 let desc_out = op_desc.on_watermark_advance(1000);
660 let desc_vals: Vec<i64> = desc_out
661 .iter()
662 .filter_map(|o| match o {
663 Output::Event(e) => Some(
664 e.data
665 .column(0)
666 .as_any()
667 .downcast_ref::<Int64Array>()
668 .unwrap()
669 .value(0),
670 ),
671 _ => None,
672 })
673 .collect();
674 assert_eq!(desc_vals, vec![30, 20, 10]);
675 }
676
677 #[test]
678 fn test_watermark_sort_preserves_watermarks() {
679 let mut op = WatermarkBoundedSortOperator::new(
680 "test_wms".to_string(),
681 vec![TopKSortColumn::ascending("value")],
682 100_000,
683 );
684
685 let mut timers = TimerService::new();
686 let mut state = InMemoryStore::new();
687 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
688
689 let e1 = make_event(100, 10);
690 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
691 op.process(&e1, &mut ctx);
692
693 let outputs = op.on_watermark_advance(200);
694 let last = outputs.last().unwrap();
696 match last {
697 Output::Watermark(w) => assert_eq!(*w, 200),
698 _ => panic!("Expected Watermark as last output"),
699 }
700 }
701}