1use super::window::ChangelogRecord;
23use super::{
24 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
25};
26use arrow_array::{Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray};
27use arrow_schema::DataType;
28
29#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct TopKSortColumn {
32 pub column_name: String,
34 pub descending: bool,
36 pub nulls_first: bool,
38}
39
40impl TopKSortColumn {
41 #[must_use]
43 pub fn ascending(name: impl Into<String>) -> Self {
44 Self {
45 column_name: name.into(),
46 descending: false,
47 nulls_first: false,
48 }
49 }
50
51 #[must_use]
53 pub fn descending(name: impl Into<String>) -> Self {
54 Self {
55 column_name: name.into(),
56 descending: true,
57 nulls_first: false,
58 }
59 }
60
61 #[must_use]
63 pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
64 self.nulls_first = nulls_first;
65 self
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum TopKEmitStrategy {
72 OnUpdate,
74 OnWatermark,
76 Periodic(i64),
78}
79
80#[derive(Debug, Clone)]
82struct TopKEntry {
83 sort_key: Vec<u8>,
85 event: Event,
87}
88
89pub struct StreamingTopKOperator {
95 operator_id: String,
97 k: usize,
99 sort_columns: Vec<TopKSortColumn>,
101 entries: Vec<TopKEntry>,
103 emit_strategy: TopKEmitStrategy,
105 pending_changes: Vec<ChangelogRecord>,
107 sequence_counter: u64,
109 current_watermark: i64,
111 cached_sort_indices: Vec<Option<usize>>,
113}
114
115impl StreamingTopKOperator {
116 #[must_use]
118 pub fn new(
119 operator_id: String,
120 k: usize,
121 sort_columns: Vec<TopKSortColumn>,
122 emit_strategy: TopKEmitStrategy,
123 ) -> Self {
124 let num_sort_cols = sort_columns.len();
125 Self {
126 operator_id,
127 k,
128 sort_columns,
129 entries: Vec::with_capacity(k),
130 emit_strategy,
131 pending_changes: Vec::new(),
132 sequence_counter: 0,
133 current_watermark: i64::MIN,
134 cached_sort_indices: vec![None; num_sort_cols],
135 }
136 }
137
138 #[must_use]
140 pub fn len(&self) -> usize {
141 self.entries.len()
142 }
143
144 #[must_use]
146 pub fn is_empty(&self) -> bool {
147 self.entries.is_empty()
148 }
149
150 #[must_use]
152 pub fn entries(&self) -> Vec<&Event> {
153 self.entries.iter().map(|e| &e.event).collect()
154 }
155
156 #[must_use]
158 pub fn current_watermark(&self) -> i64 {
159 self.current_watermark
160 }
161
162 #[must_use]
164 pub fn pending_changes_count(&self) -> usize {
165 self.pending_changes.len()
166 }
167
168 fn extract_sort_key(&mut self, event: &Event) -> Vec<u8> {
172 let batch = &event.data;
173 let mut key = Vec::new();
174
175 for (i, col_spec) in self.sort_columns.iter().enumerate() {
176 let col_idx = if let Some(idx) = self.cached_sort_indices[i] {
177 idx
178 } else {
179 let Ok(idx) = batch.schema().index_of(&col_spec.column_name) else {
180 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
181 continue;
182 };
183 self.cached_sort_indices[i] = Some(idx);
184 idx
185 };
186
187 let array = batch.column(col_idx);
188
189 if array.is_null(0) {
190 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
191 continue;
192 }
193
194 match array.data_type() {
195 DataType::Int64 => {
196 if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
197 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
198 encode_i64(arr.value(0), col_spec.descending, &mut key);
199 } else {
200 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
201 }
202 }
203 DataType::Float64 => {
204 if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
205 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
206 encode_f64(arr.value(0), col_spec.descending, &mut key);
207 } else {
208 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
209 }
210 }
211 DataType::Utf8 => {
212 if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
213 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
214 encode_utf8(arr.value(0), col_spec.descending, &mut key);
215 } else {
216 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
217 }
218 }
219 DataType::Timestamp(_, _) => {
220 if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
221 encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
222 encode_i64(arr.value(0), col_spec.descending, &mut key);
223 } else {
224 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
225 }
226 }
227 _ => {
228 encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
230 }
231 }
232 }
233
234 key
235 }
236
237 fn find_insert_position(&self, sort_key: &[u8]) -> usize {
240 self.entries
241 .binary_search_by(|entry| entry.sort_key.as_slice().cmp(sort_key))
242 .unwrap_or_else(|pos| pos)
243 }
244
245 fn would_enter_topk(&self, sort_key: &[u8]) -> bool {
247 if self.entries.len() < self.k {
248 return true;
249 }
250 if let Some(worst) = self.entries.last() {
252 sort_key < worst.sort_key.as_slice()
253 } else {
254 true
255 }
256 }
257
258 fn process_event(&mut self, event: &Event, emit_timestamp: i64) -> Vec<ChangelogRecord> {
260 let sort_key = self.extract_sort_key(event);
261
262 if !self.would_enter_topk(&sort_key) {
263 return Vec::new();
264 }
265
266 let insert_pos = self.find_insert_position(&sort_key);
267 let mut changes = Vec::new();
268
269 let new_entry = TopKEntry {
271 sort_key,
272 event: event.clone(),
273 };
274 self.entries.insert(insert_pos, new_entry);
275
276 changes.push(ChangelogRecord::insert(event.clone(), emit_timestamp));
278
279 for i in (insert_pos + 1)..self.entries.len().min(self.k) {
281 let shifted_event = &self.entries[i].event;
282 let (before, after) = ChangelogRecord::update(
284 shifted_event.clone(),
285 shifted_event.clone(),
286 emit_timestamp,
287 );
288 changes.push(before);
289 changes.push(after);
290 }
291
292 if self.entries.len() > self.k {
294 let evicted = self.entries.pop().unwrap();
295 changes.push(ChangelogRecord::delete(evicted.event, emit_timestamp));
296 }
297
298 self.sequence_counter += 1;
299 changes
300 }
301
302 fn flush_pending(&mut self) -> OutputVec {
304 let mut outputs = OutputVec::new();
305 for record in self.pending_changes.drain(..) {
306 outputs.push(Output::Changelog(record));
307 }
308 outputs
309 }
310}
311
312impl Operator for StreamingTopKOperator {
313 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
314 let emit_timestamp = event.timestamp;
315 let changes = self.process_event(event, emit_timestamp);
316
317 match &self.emit_strategy {
318 TopKEmitStrategy::OnUpdate => {
319 let mut outputs = OutputVec::new();
320 for record in changes {
321 outputs.push(Output::Changelog(record));
322 }
323 outputs
324 }
325 TopKEmitStrategy::OnWatermark | TopKEmitStrategy::Periodic(_) => {
326 self.pending_changes.extend(changes);
327 OutputVec::new()
328 }
329 }
330 }
331
332 fn on_timer(&mut self, _timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
333 match &self.emit_strategy {
335 TopKEmitStrategy::Periodic(_) => self.flush_pending(),
336 _ => OutputVec::new(),
337 }
338 }
339
340 fn checkpoint(&self) -> OperatorState {
341 let mut data = Vec::new();
344
345 let count = self.entries.len() as u64;
347 data.extend_from_slice(&count.to_le_bytes());
348
349 data.extend_from_slice(&self.current_watermark.to_le_bytes());
351
352 data.extend_from_slice(&self.sequence_counter.to_le_bytes());
354
355 for entry in &self.entries {
357 let key_len = entry.sort_key.len() as u64;
358 data.extend_from_slice(&key_len.to_le_bytes());
359 data.extend_from_slice(&entry.sort_key);
360 data.extend_from_slice(&entry.event.timestamp.to_le_bytes());
361 }
362
363 OperatorState {
364 operator_id: self.operator_id.clone(),
365 data,
366 }
367 }
368
369 #[allow(clippy::cast_possible_truncation)] fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
371 if state.data.len() < 24 {
372 return Err(OperatorError::SerializationFailed(
373 "TopK checkpoint data too short".to_string(),
374 ));
375 }
376
377 let mut offset = 0;
378 let count = u64::from_le_bytes(
379 state.data[offset..offset + 8]
380 .try_into()
381 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
382 ) as usize;
383 offset += 8;
384
385 self.current_watermark = i64::from_le_bytes(
386 state.data[offset..offset + 8]
387 .try_into()
388 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
389 );
390 offset += 8;
391
392 self.sequence_counter = u64::from_le_bytes(
393 state.data[offset..offset + 8]
394 .try_into()
395 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
396 );
397 offset += 8;
398
399 self.entries.clear();
401 for _ in 0..count {
402 if offset + 8 > state.data.len() {
403 return Err(OperatorError::SerializationFailed(
404 "TopK checkpoint truncated".to_string(),
405 ));
406 }
407 let key_len = u64::from_le_bytes(
408 state.data[offset..offset + 8]
409 .try_into()
410 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
411 ) as usize;
412 offset += 8;
413
414 if offset + key_len + 8 > state.data.len() {
415 return Err(OperatorError::SerializationFailed(
416 "TopK checkpoint truncated at key".to_string(),
417 ));
418 }
419 let sort_key = state.data[offset..offset + key_len].to_vec();
420 offset += key_len;
421
422 let timestamp = i64::from_le_bytes(
423 state.data[offset..offset + 8]
424 .try_into()
425 .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
426 );
427 offset += 8;
428
429 let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
431 arrow_schema::Schema::empty(),
432 ));
433 self.entries.push(TopKEntry {
434 sort_key,
435 event: Event::new(timestamp, batch),
436 });
437 }
438
439 Ok(())
440 }
441}
442
443pub fn encode_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
447 if nulls_first {
452 if descending {
453 key.push(0xFF);
454 } else {
455 key.push(0x00);
456 }
457 } else if descending {
458 key.push(0x00);
459 } else {
460 key.push(0xFF);
461 }
462}
463
464pub fn encode_not_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
466 if nulls_first {
467 if descending {
468 key.push(0x00);
469 } else {
470 key.push(0x01);
471 }
472 } else if descending {
473 key.push(0x01);
474 } else {
475 key.push(0x00);
476 }
477}
478
479pub fn encode_i64(val: i64, descending: bool, key: &mut Vec<u8>) {
484 #[allow(clippy::cast_sign_loss)]
485 let unsigned = (val as u64) ^ (1u64 << 63);
486 let bytes = unsigned.to_be_bytes();
487 if descending {
488 key.extend(bytes.iter().map(|b| !b));
489 } else {
490 key.extend_from_slice(&bytes);
491 }
492}
493
494pub fn encode_f64(val: f64, descending: bool, key: &mut Vec<u8>) {
500 let bits = val.to_bits();
501 let encoded = if bits & (1u64 << 63) == 0 {
502 bits ^ (1u64 << 63)
503 } else {
504 !bits
505 };
506 let bytes = encoded.to_be_bytes();
507 if descending {
508 key.extend(bytes.iter().map(|b| !b));
509 } else {
510 key.extend_from_slice(&bytes);
511 }
512}
513
514pub fn encode_utf8(val: &str, descending: bool, key: &mut Vec<u8>) {
519 if descending {
520 key.extend(val.as_bytes().iter().map(|b| !b));
521 key.push(0xFF); } else {
523 key.extend_from_slice(val.as_bytes());
524 key.push(0x00); }
526}
527
528#[cfg(test)]
529#[allow(clippy::cast_possible_wrap)]
530mod tests {
531 use super::super::window::CdcOperation;
532 use super::*;
533 use crate::state::InMemoryStore;
534 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
535 use arrow_array::{Float64Array, Int64Array, RecordBatch, StringArray};
536 use arrow_schema::{DataType, Field, Schema};
537 use std::sync::Arc;
538
539 fn make_event(timestamp: i64, price: f64) -> Event {
540 let schema = Arc::new(Schema::new(vec![Field::new(
541 "price",
542 DataType::Float64,
543 false,
544 )]));
545 let batch =
546 RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
547 Event::new(timestamp, batch)
548 }
549
550 fn make_event_i64(timestamp: i64, value: i64) -> Event {
551 let schema = Arc::new(Schema::new(vec![Field::new(
552 "value",
553 DataType::Int64,
554 false,
555 )]));
556 let batch =
557 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
558 Event::new(timestamp, batch)
559 }
560
561 fn make_event_str(timestamp: i64, name: &str) -> Event {
562 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
563 let batch =
564 RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec![name]))]).unwrap();
565 Event::new(timestamp, batch)
566 }
567
568 fn make_multi_column_event(timestamp: i64, category: &str, price: f64) -> Event {
569 let schema = Arc::new(Schema::new(vec![
570 Field::new("category", DataType::Utf8, false),
571 Field::new("price", DataType::Float64, false),
572 ]));
573 let batch = RecordBatch::try_new(
574 schema,
575 vec![
576 Arc::new(StringArray::from(vec![category])),
577 Arc::new(Float64Array::from(vec![price])),
578 ],
579 )
580 .unwrap();
581 Event::new(timestamp, batch)
582 }
583
584 fn create_topk(
585 k: usize,
586 sort_columns: Vec<TopKSortColumn>,
587 strategy: TopKEmitStrategy,
588 ) -> StreamingTopKOperator {
589 StreamingTopKOperator::new("test_topk".to_string(), k, sort_columns, strategy)
590 }
591
592 fn create_test_context<'a>(
593 timers: &'a mut TimerService,
594 state: &'a mut dyn crate::state::StateStore,
595 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
596 ) -> OperatorContext<'a> {
597 OperatorContext {
598 event_time: 0,
599 processing_time: 0,
600 timers,
601 state,
602 watermark_generator: watermark_gen,
603 operator_index: 0,
604 }
605 }
606
607 #[test]
610 fn test_topk_sort_key_extraction_int64() {
611 let mut op = create_topk(
612 3,
613 vec![TopKSortColumn::ascending("value")],
614 TopKEmitStrategy::OnUpdate,
615 );
616 let e1 = make_event_i64(1, 100);
617 let e2 = make_event_i64(2, 200);
618 let e3 = make_event_i64(3, -50);
619
620 let k1 = op.extract_sort_key(&e1);
621 let k2 = op.extract_sort_key(&e2);
622 let k3 = op.extract_sort_key(&e3);
623
624 assert!(k3 < k1);
626 assert!(k1 < k2);
627 }
628
629 #[test]
630 fn test_topk_sort_key_extraction_float64() {
631 let mut op = create_topk(
632 3,
633 vec![TopKSortColumn::descending("price")],
634 TopKEmitStrategy::OnUpdate,
635 );
636 let e1 = make_event(1, 150.0);
637 let e2 = make_event(2, 200.0);
638 let e3 = make_event(3, 100.0);
639
640 let k1 = op.extract_sort_key(&e1);
641 let k2 = op.extract_sort_key(&e2);
642 let k3 = op.extract_sort_key(&e3);
643
644 assert!(k2 < k1);
646 assert!(k1 < k3);
647 }
648
649 #[test]
650 fn test_topk_sort_key_extraction_utf8() {
651 let mut op = create_topk(
652 3,
653 vec![TopKSortColumn::ascending("name")],
654 TopKEmitStrategy::OnUpdate,
655 );
656 let e1 = make_event_str(1, "apple");
657 let e2 = make_event_str(2, "banana");
658 let e3 = make_event_str(3, "cherry");
659
660 let k1 = op.extract_sort_key(&e1);
661 let k2 = op.extract_sort_key(&e2);
662 let k3 = op.extract_sort_key(&e3);
663
664 assert!(k1 < k2);
666 assert!(k2 < k3);
667 }
668
669 #[test]
670 fn test_topk_sort_key_extraction_timestamp() {
671 let schema = Arc::new(Schema::new(vec![Field::new(
672 "ts",
673 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
674 false,
675 )]));
676 let batch = RecordBatch::try_new(
677 schema,
678 vec![Arc::new(arrow_array::TimestampMicrosecondArray::from(
679 vec![1000],
680 ))],
681 )
682 .unwrap();
683 let event = Event::new(1, batch);
684
685 let mut op = create_topk(
686 3,
687 vec![TopKSortColumn::ascending("ts")],
688 TopKEmitStrategy::OnUpdate,
689 );
690 let key = op.extract_sort_key(&event);
691 assert!(!key.is_empty());
692 }
693
694 #[test]
697 fn test_topk_insert_below_capacity() {
698 let mut op = create_topk(
699 3,
700 vec![TopKSortColumn::descending("price")],
701 TopKEmitStrategy::OnUpdate,
702 );
703
704 let mut timers = TimerService::new();
705 let mut state = InMemoryStore::new();
706 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
707
708 let event = make_event(1, 150.0);
709 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
710 let outputs = op.process(&event, &mut ctx);
711
712 assert_eq!(op.len(), 1);
713 assert!(!outputs.is_empty());
715 }
716
717 #[test]
718 fn test_topk_insert_at_capacity_better_entry() {
719 let mut op = create_topk(
720 2,
721 vec![TopKSortColumn::descending("price")],
722 TopKEmitStrategy::OnUpdate,
723 );
724
725 let mut timers = TimerService::new();
726 let mut state = InMemoryStore::new();
727 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
728
729 for (i, price) in [100.0, 150.0].iter().enumerate() {
731 let event = make_event(i as i64, *price);
732 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
733 op.process(&event, &mut ctx);
734 }
735 assert_eq!(op.len(), 2);
736
737 let better = make_event(3, 200.0);
739 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
740 let outputs = op.process(&better, &mut ctx);
741
742 assert_eq!(op.len(), 2);
743 assert!(outputs.len() >= 2);
745 }
746
747 #[test]
748 fn test_topk_insert_at_capacity_worse_entry() {
749 let mut op = create_topk(
750 2,
751 vec![TopKSortColumn::descending("price")],
752 TopKEmitStrategy::OnUpdate,
753 );
754
755 let mut timers = TimerService::new();
756 let mut state = InMemoryStore::new();
757 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
758
759 for (i, price) in [200.0, 150.0].iter().enumerate() {
761 let event = make_event(i as i64, *price);
762 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
763 op.process(&event, &mut ctx);
764 }
765
766 let worse = make_event(3, 50.0);
768 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
769 let outputs = op.process(&worse, &mut ctx);
770
771 assert_eq!(op.len(), 2);
772 assert!(outputs.is_empty());
774 }
775
776 #[test]
777 fn test_topk_ascending_order() {
778 let mut op = create_topk(
779 3,
780 vec![TopKSortColumn::ascending("value")],
781 TopKEmitStrategy::OnUpdate,
782 );
783
784 let mut timers = TimerService::new();
785 let mut state = InMemoryStore::new();
786 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
787
788 for (i, val) in [30i64, 10, 20].iter().enumerate() {
790 let event = make_event_i64(i as i64, *val);
791 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
792 op.process(&event, &mut ctx);
793 }
794
795 assert_eq!(op.len(), 3);
796 let entries = op.entries();
798 let vals: Vec<i64> = entries
799 .iter()
800 .map(|e| {
801 e.data
802 .column(0)
803 .as_any()
804 .downcast_ref::<Int64Array>()
805 .unwrap()
806 .value(0)
807 })
808 .collect();
809 assert_eq!(vals, vec![10, 20, 30]);
810 }
811
812 #[test]
813 fn test_topk_descending_order() {
814 let mut op = create_topk(
815 3,
816 vec![TopKSortColumn::descending("price")],
817 TopKEmitStrategy::OnUpdate,
818 );
819
820 let mut timers = TimerService::new();
821 let mut state = InMemoryStore::new();
822 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
823
824 for (i, price) in [100.0, 200.0, 150.0].iter().enumerate() {
826 let event = make_event(i as i64, *price);
827 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
828 op.process(&event, &mut ctx);
829 }
830
831 let entries = op.entries();
832 let prices: Vec<f64> = entries
833 .iter()
834 .map(|e| {
835 e.data
836 .column(0)
837 .as_any()
838 .downcast_ref::<Float64Array>()
839 .unwrap()
840 .value(0)
841 })
842 .collect();
843 assert_eq!(prices, vec![200.0, 150.0, 100.0]);
844 }
845
846 #[test]
847 fn test_topk_multi_column_sort() {
848 let mut op = create_topk(
849 3,
850 vec![
851 TopKSortColumn::ascending("category"),
852 TopKSortColumn::descending("price"),
853 ],
854 TopKEmitStrategy::OnUpdate,
855 );
856
857 let mut timers = TimerService::new();
858 let mut state = InMemoryStore::new();
859 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
860
861 let events = vec![
862 make_multi_column_event(1, "B", 100.0),
863 make_multi_column_event(2, "A", 200.0),
864 make_multi_column_event(3, "A", 150.0),
865 ];
866
867 for event in &events {
868 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
869 op.process(event, &mut ctx);
870 }
871
872 let entries = op.entries();
875 let cats: Vec<&str> = entries
876 .iter()
877 .map(|e| {
878 e.data
879 .column(0)
880 .as_any()
881 .downcast_ref::<StringArray>()
882 .unwrap()
883 .value(0)
884 })
885 .collect();
886 assert_eq!(cats, vec!["A", "A", "B"]);
887 }
888
889 #[test]
890 fn test_topk_nulls_first() {
891 let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(true)];
892 let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
893
894 let mut timers = TimerService::new();
895 let mut state = InMemoryStore::new();
896 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
897
898 let schema = Arc::new(Schema::new(vec![Field::new(
900 "value",
901 DataType::Int64,
902 true,
903 )]));
904 let null_array = Int64Array::new_null(1);
905 let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
906 let null_event = Event::new(1, batch);
907
908 let val_event = make_event_i64(2, 100);
909
910 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
911 op.process(&val_event, &mut ctx);
912 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
913 op.process(&null_event, &mut ctx);
914
915 let entries = op.entries();
917 assert_eq!(entries.len(), 2);
918 assert!(entries[0].data.column(0).is_null(0));
919 }
920
921 #[test]
922 fn test_topk_nulls_last() {
923 let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(false)];
924 let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
925
926 let mut timers = TimerService::new();
927 let mut state = InMemoryStore::new();
928 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
929
930 let schema = Arc::new(Schema::new(vec![Field::new(
931 "value",
932 DataType::Int64,
933 true,
934 )]));
935 let null_array = Int64Array::new_null(1);
936 let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
937 let null_event = Event::new(1, batch);
938
939 let val_event = make_event_i64(2, 100);
940
941 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
942 op.process(&null_event, &mut ctx);
943 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
944 op.process(&val_event, &mut ctx);
945
946 let entries = op.entries();
948 assert_eq!(entries.len(), 2);
949 assert!(!entries[0].data.column(0).is_null(0));
950 }
951
952 #[test]
955 fn test_topk_emit_on_update_insert() {
956 let mut op = create_topk(
957 3,
958 vec![TopKSortColumn::descending("price")],
959 TopKEmitStrategy::OnUpdate,
960 );
961
962 let mut timers = TimerService::new();
963 let mut state = InMemoryStore::new();
964 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
965
966 let event = make_event(1, 150.0);
967 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
968 let outputs = op.process(&event, &mut ctx);
969
970 assert_eq!(outputs.len(), 1);
972 match &outputs[0] {
973 Output::Changelog(rec) => {
974 assert_eq!(rec.operation, CdcOperation::Insert);
975 assert_eq!(rec.weight, 1);
976 }
977 _ => panic!("Expected Changelog output"),
978 }
979 }
980
981 #[test]
982 fn test_topk_emit_on_update_eviction() {
983 let mut op = create_topk(
984 1,
985 vec![TopKSortColumn::descending("price")],
986 TopKEmitStrategy::OnUpdate,
987 );
988
989 let mut timers = TimerService::new();
990 let mut state = InMemoryStore::new();
991 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
992
993 let event1 = make_event(1, 100.0);
995 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
996 op.process(&event1, &mut ctx);
997
998 let event2 = make_event(2, 200.0);
1000 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1001 let outputs = op.process(&event2, &mut ctx);
1002
1003 let mut has_insert = false;
1005 let mut has_delete = false;
1006 for output in &outputs {
1007 if let Output::Changelog(rec) = output {
1008 match rec.operation {
1009 CdcOperation::Insert => has_insert = true,
1010 CdcOperation::Delete => has_delete = true,
1011 _ => {}
1012 }
1013 }
1014 }
1015 assert!(has_insert);
1016 assert!(has_delete);
1017 }
1018
1019 #[test]
1020 fn test_topk_emit_on_update_rank_change() {
1021 let mut op = create_topk(
1022 3,
1023 vec![TopKSortColumn::descending("price")],
1024 TopKEmitStrategy::OnUpdate,
1025 );
1026
1027 let mut timers = TimerService::new();
1028 let mut state = InMemoryStore::new();
1029 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1030
1031 let e1 = make_event(1, 100.0);
1033 let e2 = make_event(2, 200.0);
1034 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1035 op.process(&e1, &mut ctx);
1036 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1037 op.process(&e2, &mut ctx);
1038
1039 let e3 = make_event(3, 150.0);
1041 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1042 let outputs = op.process(&e3, &mut ctx);
1043
1044 let mut has_insert = false;
1046 let mut has_update_before = false;
1047 let mut has_update_after = false;
1048 for output in &outputs {
1049 if let Output::Changelog(rec) = output {
1050 match rec.operation {
1051 CdcOperation::Insert => has_insert = true,
1052 CdcOperation::UpdateBefore => has_update_before = true,
1053 CdcOperation::UpdateAfter => has_update_after = true,
1054 CdcOperation::Delete => {}
1055 }
1056 }
1057 }
1058 assert!(has_insert);
1059 assert!(has_update_before);
1060 assert!(has_update_after);
1061 }
1062
1063 #[test]
1064 fn test_topk_emit_on_watermark_batched() {
1065 let mut op = create_topk(
1066 3,
1067 vec![TopKSortColumn::descending("price")],
1068 TopKEmitStrategy::OnWatermark,
1069 );
1070
1071 let mut timers = TimerService::new();
1072 let mut state = InMemoryStore::new();
1073 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1074
1075 let e1 = make_event(1, 100.0);
1077 let e2 = make_event(2, 200.0);
1078 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1079 let out1 = op.process(&e1, &mut ctx);
1080 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1081 let out2 = op.process(&e2, &mut ctx);
1082
1083 assert!(out1.is_empty());
1084 assert!(out2.is_empty());
1085 assert!(op.pending_changes_count() > 0);
1086 }
1087
1088 #[test]
1089 fn test_topk_emit_periodic() {
1090 let mut op = create_topk(
1091 3,
1092 vec![TopKSortColumn::descending("price")],
1093 TopKEmitStrategy::Periodic(1000),
1094 );
1095
1096 let mut timers = TimerService::new();
1097 let mut state = InMemoryStore::new();
1098 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1099
1100 let e1 = make_event(1, 100.0);
1102 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1103 op.process(&e1, &mut ctx);
1104
1105 assert!(op.pending_changes_count() > 0);
1106
1107 let timer = Timer {
1109 key: smallvec::smallvec![],
1110 timestamp: 1000,
1111 };
1112 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1113 let outputs = op.on_timer(timer, &mut ctx);
1114
1115 assert!(!outputs.is_empty());
1116 assert_eq!(op.pending_changes_count(), 0);
1117 }
1118
1119 #[test]
1122 fn test_topk_empty_heap() {
1123 let op = create_topk(
1124 3,
1125 vec![TopKSortColumn::descending("price")],
1126 TopKEmitStrategy::OnUpdate,
1127 );
1128 assert!(op.is_empty());
1129 assert_eq!(op.len(), 0);
1130 }
1131
1132 #[test]
1133 fn test_topk_k_equals_one() {
1134 let mut op = create_topk(
1135 1,
1136 vec![TopKSortColumn::descending("price")],
1137 TopKEmitStrategy::OnUpdate,
1138 );
1139
1140 let mut timers = TimerService::new();
1141 let mut state = InMemoryStore::new();
1142 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1143
1144 let e1 = make_event(1, 100.0);
1145 let e2 = make_event(2, 200.0);
1146 let e3 = make_event(3, 50.0);
1147
1148 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1149 op.process(&e1, &mut ctx);
1150 assert_eq!(op.len(), 1);
1151
1152 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1153 op.process(&e2, &mut ctx);
1154 assert_eq!(op.len(), 1);
1155
1156 let entries = op.entries();
1158 let price = entries[0]
1159 .data
1160 .column(0)
1161 .as_any()
1162 .downcast_ref::<Float64Array>()
1163 .unwrap()
1164 .value(0);
1165 assert!((price - 200.0).abs() < f64::EPSILON);
1166
1167 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1169 let outputs = op.process(&e3, &mut ctx);
1170 assert!(outputs.is_empty());
1171 }
1172
1173 #[test]
1174 fn test_topk_large_k() {
1175 let mut op = create_topk(
1176 100,
1177 vec![TopKSortColumn::ascending("value")],
1178 TopKEmitStrategy::OnUpdate,
1179 );
1180
1181 let mut timers = TimerService::new();
1182 let mut state = InMemoryStore::new();
1183 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1184
1185 for i in 0..50 {
1186 let event = make_event_i64(i, i * 10);
1187 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1188 op.process(&event, &mut ctx);
1189 }
1190
1191 assert_eq!(op.len(), 50);
1192 }
1193
1194 #[test]
1195 fn test_topk_duplicate_sort_keys() {
1196 let mut op = create_topk(
1197 3,
1198 vec![TopKSortColumn::descending("price")],
1199 TopKEmitStrategy::OnUpdate,
1200 );
1201
1202 let mut timers = TimerService::new();
1203 let mut state = InMemoryStore::new();
1204 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1205
1206 for i in 0..3 {
1208 let event = make_event(i, 100.0);
1209 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1210 op.process(&event, &mut ctx);
1211 }
1212
1213 assert_eq!(op.len(), 3);
1214 }
1215
1216 #[test]
1219 fn test_topk_checkpoint_roundtrip() {
1220 let mut op = create_topk(
1221 3,
1222 vec![TopKSortColumn::descending("price")],
1223 TopKEmitStrategy::OnUpdate,
1224 );
1225
1226 let mut timers = TimerService::new();
1227 let mut state = InMemoryStore::new();
1228 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1229
1230 for (i, price) in [150.0, 200.0, 100.0].iter().enumerate() {
1231 let event = make_event(i as i64, *price);
1232 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1233 op.process(&event, &mut ctx);
1234 }
1235
1236 let checkpoint = op.checkpoint();
1237 assert_eq!(checkpoint.operator_id, "test_topk");
1238 assert!(!checkpoint.data.is_empty());
1239
1240 let mut op2 = create_topk(
1242 3,
1243 vec![TopKSortColumn::descending("price")],
1244 TopKEmitStrategy::OnUpdate,
1245 );
1246 op2.restore(checkpoint).unwrap();
1247
1248 assert_eq!(op2.len(), 3);
1249 }
1250
1251 #[test]
1252 fn test_topk_restore_and_continue() {
1253 let mut op = create_topk(
1254 2,
1255 vec![TopKSortColumn::descending("price")],
1256 TopKEmitStrategy::OnUpdate,
1257 );
1258
1259 let mut timers = TimerService::new();
1260 let mut state = InMemoryStore::new();
1261 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1262
1263 let event = make_event(1, 150.0);
1264 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1265 op.process(&event, &mut ctx);
1266
1267 let checkpoint = op.checkpoint();
1268
1269 let mut op2 = create_topk(
1270 2,
1271 vec![TopKSortColumn::descending("price")],
1272 TopKEmitStrategy::OnUpdate,
1273 );
1274 op2.restore(checkpoint).unwrap();
1275
1276 assert_eq!(op2.len(), 1);
1278 }
1279
1280 #[test]
1283 fn test_topk_changelog_record_types() {
1284 let record = ChangelogRecord::insert(make_event(1, 100.0), 1);
1285 assert_eq!(record.operation, CdcOperation::Insert);
1286 assert_eq!(record.weight, 1);
1287
1288 let record = ChangelogRecord::delete(make_event(1, 100.0), 1);
1289 assert_eq!(record.operation, CdcOperation::Delete);
1290 assert_eq!(record.weight, -1);
1291
1292 let (before, after) =
1293 ChangelogRecord::update(make_event(1, 100.0), make_event(2, 200.0), 1);
1294 assert_eq!(before.operation, CdcOperation::UpdateBefore);
1295 assert_eq!(after.operation, CdcOperation::UpdateAfter);
1296 }
1297
1298 #[test]
1299 fn test_topk_no_emission_on_no_change() {
1300 let mut op = create_topk(
1301 2,
1302 vec![TopKSortColumn::descending("price")],
1303 TopKEmitStrategy::OnUpdate,
1304 );
1305
1306 let mut timers = TimerService::new();
1307 let mut state = InMemoryStore::new();
1308 let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1309
1310 let e1 = make_event(1, 200.0);
1312 let e2 = make_event(2, 150.0);
1313 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1314 op.process(&e1, &mut ctx);
1315 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1316 op.process(&e2, &mut ctx);
1317
1318 let e3 = make_event(3, 50.0);
1320 let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1321 let outputs = op.process(&e3, &mut ctx);
1322
1323 assert!(outputs.is_empty());
1324 }
1325}