Skip to main content

laminar_core/operator/
topk.rs

1//! # Streaming Top-K Operator
2//!
3//! Bounded heap of top-K items with retraction-based changelog emission.
4//!
5//! Supports `ORDER BY ... LIMIT N` on unbounded streams by maintaining
6//! a sorted buffer of at most K entries. New events that rank within the
7//! top-K cause eviction of the worst entry and optional rank-change
8//! retractions.
9//!
10//! ## Emit Strategies
11//!
12//! - `OnUpdate`: Emit changelog on every state change (lowest latency)
13//! - `OnWatermark`: Buffer changes, emit on watermark advance
14//! - `Periodic(interval)`: Emit on timer interval
15//!
16//! ## Ring 0 Constraints
17//!
18//! - `entries` pre-allocated to capacity K — no reallocation during `process()`
19//! - Sort keys use `Vec<u8>` memcomparable encoding for zero-branch comparison
20//! - Sorted Vec with binary search: O(log K) search + O(K) shift
21
22use super::window::ChangelogRecord;
23use super::{
24    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
25};
26use arrow_array::{Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray};
27use arrow_schema::DataType;
28
29/// Configuration for a sort column in the top-K operator.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct TopKSortColumn {
32    /// Column name in the event schema
33    pub column_name: String,
34    /// Sort in descending order
35    pub descending: bool,
36    /// Place NULL values before non-NULL values
37    pub nulls_first: bool,
38}
39
40impl TopKSortColumn {
41    /// Creates a new ascending sort column.
42    #[must_use]
43    pub fn ascending(name: impl Into<String>) -> Self {
44        Self {
45            column_name: name.into(),
46            descending: false,
47            nulls_first: false,
48        }
49    }
50
51    /// Creates a new descending sort column.
52    #[must_use]
53    pub fn descending(name: impl Into<String>) -> Self {
54        Self {
55            column_name: name.into(),
56            descending: true,
57            nulls_first: false,
58        }
59    }
60
61    /// Sets whether nulls should sort first.
62    #[must_use]
63    pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
64        self.nulls_first = nulls_first;
65        self
66    }
67}
68
69/// Emit strategy for the streaming top-K operator.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum TopKEmitStrategy {
72    /// Emit changelog on every state change (lowest latency, highest volume).
73    OnUpdate,
74    /// Buffer changes, emit batch when watermark advances.
75    OnWatermark,
76    /// Emit on timer at the given interval in microseconds.
77    Periodic(i64),
78}
79
80/// An entry in the top-K buffer.
81#[derive(Debug, Clone)]
82struct TopKEntry {
83    /// Memcomparable sort key for efficient comparison.
84    sort_key: Vec<u8>,
85    /// The original event.
86    event: Event,
87}
88
89/// Streaming top-K operator for `ORDER BY ... LIMIT N`.
90///
91/// Maintains a sorted buffer of at most K entries. Each incoming event
92/// is checked against the current worst entry. If better, it replaces
93/// the worst and changelog records are emitted.
94pub struct StreamingTopKOperator {
95    /// Operator identifier for checkpointing.
96    operator_id: String,
97    /// Number of top entries to maintain.
98    k: usize,
99    /// Sort column specifications.
100    sort_columns: Vec<TopKSortColumn>,
101    /// Sorted entries (best first). Pre-allocated to capacity K.
102    entries: Vec<TopKEntry>,
103    /// Emission strategy.
104    emit_strategy: TopKEmitStrategy,
105    /// Pending changelog records (for OnWatermark/Periodic strategies).
106    pending_changes: Vec<ChangelogRecord>,
107    /// Monotonic sequence counter for changelog ordering.
108    sequence_counter: u64,
109    /// Current watermark value.
110    current_watermark: i64,
111    /// Cached column indices for sort columns — resolved on first event.
112    cached_sort_indices: Vec<Option<usize>>,
113}
114
115impl StreamingTopKOperator {
116    /// Creates a new streaming top-K operator.
117    #[must_use]
118    pub fn new(
119        operator_id: String,
120        k: usize,
121        sort_columns: Vec<TopKSortColumn>,
122        emit_strategy: TopKEmitStrategy,
123    ) -> Self {
124        let num_sort_cols = sort_columns.len();
125        Self {
126            operator_id,
127            k,
128            sort_columns,
129            entries: Vec::with_capacity(k),
130            emit_strategy,
131            pending_changes: Vec::new(),
132            sequence_counter: 0,
133            current_watermark: i64::MIN,
134            cached_sort_indices: vec![None; num_sort_cols],
135        }
136    }
137
138    /// Returns the current number of entries in the top-K buffer.
139    #[must_use]
140    pub fn len(&self) -> usize {
141        self.entries.len()
142    }
143
144    /// Returns true if the top-K buffer is empty.
145    #[must_use]
146    pub fn is_empty(&self) -> bool {
147        self.entries.is_empty()
148    }
149
150    /// Returns the current entries as events (best first).
151    #[must_use]
152    pub fn entries(&self) -> Vec<&Event> {
153        self.entries.iter().map(|e| &e.event).collect()
154    }
155
156    /// Returns the current watermark value.
157    #[must_use]
158    pub fn current_watermark(&self) -> i64 {
159        self.current_watermark
160    }
161
162    /// Returns the number of pending changelog records.
163    #[must_use]
164    pub fn pending_changes_count(&self) -> usize {
165        self.pending_changes.len()
166    }
167
168    /// Extracts a memcomparable sort key from an event.
169    ///
170    /// Caches column indices on first call to avoid per-event schema lookups.
171    fn extract_sort_key(&mut self, event: &Event) -> Vec<u8> {
172        let batch = &event.data;
173        let mut key = Vec::new();
174
175        for (i, col_spec) in self.sort_columns.iter().enumerate() {
176            let col_idx = if let Some(idx) = self.cached_sort_indices[i] {
177                idx
178            } else {
179                let Ok(idx) = batch.schema().index_of(&col_spec.column_name) else {
180                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
181                    continue;
182                };
183                self.cached_sort_indices[i] = Some(idx);
184                idx
185            };
186
187            let array = batch.column(col_idx);
188
189            if array.is_null(0) {
190                encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
191                continue;
192            }
193
194            match array.data_type() {
195                DataType::Int64 => {
196                    if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
197                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
198                        encode_i64(arr.value(0), col_spec.descending, &mut key);
199                    } else {
200                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
201                    }
202                }
203                DataType::Float64 => {
204                    if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
205                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
206                        encode_f64(arr.value(0), col_spec.descending, &mut key);
207                    } else {
208                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
209                    }
210                }
211                DataType::Utf8 => {
212                    if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
213                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
214                        encode_utf8(arr.value(0), col_spec.descending, &mut key);
215                    } else {
216                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
217                    }
218                }
219                DataType::Timestamp(_, _) => {
220                    if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
221                        encode_not_null(col_spec.nulls_first, col_spec.descending, &mut key);
222                        encode_i64(arr.value(0), col_spec.descending, &mut key);
223                    } else {
224                        encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
225                    }
226                }
227                _ => {
228                    // Unsupported type: treat as null
229                    encode_null(col_spec.nulls_first, col_spec.descending, &mut key);
230                }
231            }
232        }
233
234        key
235    }
236
237    /// Finds the insertion position for a sort key using binary search.
238    /// Returns the index where the key should be inserted to maintain sorted order.
239    fn find_insert_position(&self, sort_key: &[u8]) -> usize {
240        self.entries
241            .binary_search_by(|entry| entry.sort_key.as_slice().cmp(sort_key))
242            .unwrap_or_else(|pos| pos)
243    }
244
245    /// Checks if an event with the given sort key would enter the top-K.
246    fn would_enter_topk(&self, sort_key: &[u8]) -> bool {
247        if self.entries.len() < self.k {
248            return true;
249        }
250        // Compare with the worst (last) entry
251        if let Some(worst) = self.entries.last() {
252            sort_key < worst.sort_key.as_slice()
253        } else {
254            true
255        }
256    }
257
258    /// Processes a single event, returning changelog records for the changes.
259    fn process_event(&mut self, event: &Event, emit_timestamp: i64) -> Vec<ChangelogRecord> {
260        let sort_key = self.extract_sort_key(event);
261
262        if !self.would_enter_topk(&sort_key) {
263            return Vec::new();
264        }
265
266        let insert_pos = self.find_insert_position(&sort_key);
267        let mut changes = Vec::new();
268
269        // Insert the new entry
270        let new_entry = TopKEntry {
271            sort_key,
272            event: event.clone(),
273        };
274        self.entries.insert(insert_pos, new_entry);
275
276        // Generate insert changelog
277        changes.push(ChangelogRecord::insert(event.clone(), emit_timestamp));
278
279        // Generate rank change retractions for entries that shifted down
280        for i in (insert_pos + 1)..self.entries.len().min(self.k) {
281            let shifted_event = &self.entries[i].event;
282            // Emit update: rank changed from (i-1) to i
283            let (before, after) = ChangelogRecord::update(
284                shifted_event.clone(),
285                shifted_event.clone(),
286                emit_timestamp,
287            );
288            changes.push(before);
289            changes.push(after);
290        }
291
292        // Evict worst entry if over capacity
293        if self.entries.len() > self.k {
294            let evicted = self.entries.pop().unwrap();
295            changes.push(ChangelogRecord::delete(evicted.event, emit_timestamp));
296        }
297
298        self.sequence_counter += 1;
299        changes
300    }
301
302    /// Flushes pending changelog records as Output.
303    fn flush_pending(&mut self) -> OutputVec {
304        let mut outputs = OutputVec::new();
305        for record in self.pending_changes.drain(..) {
306            outputs.push(Output::Changelog(record));
307        }
308        outputs
309    }
310}
311
312impl Operator for StreamingTopKOperator {
313    fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
314        let emit_timestamp = event.timestamp;
315        let changes = self.process_event(event, emit_timestamp);
316
317        match &self.emit_strategy {
318            TopKEmitStrategy::OnUpdate => {
319                let mut outputs = OutputVec::new();
320                for record in changes {
321                    outputs.push(Output::Changelog(record));
322                }
323                outputs
324            }
325            TopKEmitStrategy::OnWatermark | TopKEmitStrategy::Periodic(_) => {
326                self.pending_changes.extend(changes);
327                OutputVec::new()
328            }
329        }
330    }
331
332    fn on_timer(&mut self, _timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
333        // For Periodic strategy: flush pending changes on timer
334        match &self.emit_strategy {
335            TopKEmitStrategy::Periodic(_) => self.flush_pending(),
336            _ => OutputVec::new(),
337        }
338    }
339
340    fn checkpoint(&self) -> OperatorState {
341        // Serialize entry count + sort keys + timestamps
342        // For simplicity, serialize as JSON-like format
343        let mut data = Vec::new();
344
345        // Write entry count
346        let count = self.entries.len() as u64;
347        data.extend_from_slice(&count.to_le_bytes());
348
349        // Write watermark
350        data.extend_from_slice(&self.current_watermark.to_le_bytes());
351
352        // Write sequence counter
353        data.extend_from_slice(&self.sequence_counter.to_le_bytes());
354
355        // Write each entry's sort key length + sort key
356        for entry in &self.entries {
357            let key_len = entry.sort_key.len() as u64;
358            data.extend_from_slice(&key_len.to_le_bytes());
359            data.extend_from_slice(&entry.sort_key);
360            data.extend_from_slice(&entry.event.timestamp.to_le_bytes());
361        }
362
363        OperatorState {
364            operator_id: self.operator_id.clone(),
365            data,
366        }
367    }
368
369    #[allow(clippy::cast_possible_truncation)] // Checkpoint wire format uses u64 for counts
370    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
371        if state.data.len() < 24 {
372            return Err(OperatorError::SerializationFailed(
373                "TopK checkpoint data too short".to_string(),
374            ));
375        }
376
377        let mut offset = 0;
378        let count = u64::from_le_bytes(
379            state.data[offset..offset + 8]
380                .try_into()
381                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
382        ) as usize;
383        offset += 8;
384
385        self.current_watermark = i64::from_le_bytes(
386            state.data[offset..offset + 8]
387                .try_into()
388                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
389        );
390        offset += 8;
391
392        self.sequence_counter = u64::from_le_bytes(
393            state.data[offset..offset + 8]
394                .try_into()
395                .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
396        );
397        offset += 8;
398
399        // Restore sort keys (events are not fully restored — only sort key metadata)
400        self.entries.clear();
401        for _ in 0..count {
402            if offset + 8 > state.data.len() {
403                return Err(OperatorError::SerializationFailed(
404                    "TopK checkpoint truncated".to_string(),
405                ));
406            }
407            let key_len = u64::from_le_bytes(
408                state.data[offset..offset + 8]
409                    .try_into()
410                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
411            ) as usize;
412            offset += 8;
413
414            if offset + key_len + 8 > state.data.len() {
415                return Err(OperatorError::SerializationFailed(
416                    "TopK checkpoint truncated at key".to_string(),
417                ));
418            }
419            let sort_key = state.data[offset..offset + key_len].to_vec();
420            offset += key_len;
421
422            let timestamp = i64::from_le_bytes(
423                state.data[offset..offset + 8]
424                    .try_into()
425                    .map_err(|e| OperatorError::SerializationFailed(format!("{e}")))?,
426            );
427            offset += 8;
428
429            // Create a minimal event placeholder for the restored entry
430            let batch = arrow_array::RecordBatch::new_empty(std::sync::Arc::new(
431                arrow_schema::Schema::empty(),
432            ));
433            self.entries.push(TopKEntry {
434                sort_key,
435                event: Event::new(timestamp, batch),
436            });
437        }
438
439        Ok(())
440    }
441}
442
443// === Sort key encoding helpers ===
444
445/// Encodes a null value marker into the sort key.
446pub fn encode_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
447    // nulls_first=true + ascending  => null sorts first (0x00)
448    // nulls_first=true + descending => null sorts first (0xFF after flip)
449    // nulls_first=false + ascending => null sorts last (0x01)
450    // nulls_first=false + descending => null sorts last (0x00 after flip)
451    if nulls_first {
452        if descending {
453            key.push(0xFF);
454        } else {
455            key.push(0x00);
456        }
457    } else if descending {
458        key.push(0x00);
459    } else {
460        key.push(0xFF);
461    }
462}
463
464/// Encodes a non-null value marker into the sort key.
465pub fn encode_not_null(nulls_first: bool, descending: bool, key: &mut Vec<u8>) {
466    if nulls_first {
467        if descending {
468            key.push(0x00);
469        } else {
470            key.push(0x01);
471        }
472    } else if descending {
473        key.push(0x01);
474    } else {
475        key.push(0x00);
476    }
477}
478
479/// Encodes an i64 value as memcomparable bytes.
480///
481/// XOR with sign bit to convert signed comparison to unsigned,
482/// then big-endian encoding. Optionally flip all bits for descending.
483pub fn encode_i64(val: i64, descending: bool, key: &mut Vec<u8>) {
484    #[allow(clippy::cast_sign_loss)]
485    let unsigned = (val as u64) ^ (1u64 << 63);
486    let bytes = unsigned.to_be_bytes();
487    if descending {
488        key.extend(bytes.iter().map(|b| !b));
489    } else {
490        key.extend_from_slice(&bytes);
491    }
492}
493
494/// Encodes an f64 value as memcomparable bytes.
495///
496/// Uses IEEE 754 total ordering trick: if positive, flip sign bit;
497/// if negative, flip all bits. This gives correct ordering for all
498/// finite values, infinities, and NaN.
499pub fn encode_f64(val: f64, descending: bool, key: &mut Vec<u8>) {
500    let bits = val.to_bits();
501    let encoded = if bits & (1u64 << 63) == 0 {
502        bits ^ (1u64 << 63)
503    } else {
504        !bits
505    };
506    let bytes = encoded.to_be_bytes();
507    if descending {
508        key.extend(bytes.iter().map(|b| !b));
509    } else {
510        key.extend_from_slice(&bytes);
511    }
512}
513
514/// Encodes a UTF-8 string as memcomparable bytes.
515///
516/// Appends the raw bytes followed by a null terminator.
517/// For descending order, flips all bits.
518pub fn encode_utf8(val: &str, descending: bool, key: &mut Vec<u8>) {
519    if descending {
520        key.extend(val.as_bytes().iter().map(|b| !b));
521        key.push(0xFF); // flipped null terminator
522    } else {
523        key.extend_from_slice(val.as_bytes());
524        key.push(0x00); // null terminator
525    }
526}
527
528#[cfg(test)]
529#[allow(clippy::cast_possible_wrap)]
530mod tests {
531    use super::super::window::CdcOperation;
532    use super::*;
533    use crate::state::InMemoryStore;
534    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
535    use arrow_array::{Float64Array, Int64Array, RecordBatch, StringArray};
536    use arrow_schema::{DataType, Field, Schema};
537    use std::sync::Arc;
538
539    fn make_event(timestamp: i64, price: f64) -> Event {
540        let schema = Arc::new(Schema::new(vec![Field::new(
541            "price",
542            DataType::Float64,
543            false,
544        )]));
545        let batch =
546            RecordBatch::try_new(schema, vec![Arc::new(Float64Array::from(vec![price]))]).unwrap();
547        Event::new(timestamp, batch)
548    }
549
550    fn make_event_i64(timestamp: i64, value: i64) -> Event {
551        let schema = Arc::new(Schema::new(vec![Field::new(
552            "value",
553            DataType::Int64,
554            false,
555        )]));
556        let batch =
557            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
558        Event::new(timestamp, batch)
559    }
560
561    fn make_event_str(timestamp: i64, name: &str) -> Event {
562        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
563        let batch =
564            RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec![name]))]).unwrap();
565        Event::new(timestamp, batch)
566    }
567
568    fn make_multi_column_event(timestamp: i64, category: &str, price: f64) -> Event {
569        let schema = Arc::new(Schema::new(vec![
570            Field::new("category", DataType::Utf8, false),
571            Field::new("price", DataType::Float64, false),
572        ]));
573        let batch = RecordBatch::try_new(
574            schema,
575            vec![
576                Arc::new(StringArray::from(vec![category])),
577                Arc::new(Float64Array::from(vec![price])),
578            ],
579        )
580        .unwrap();
581        Event::new(timestamp, batch)
582    }
583
584    fn create_topk(
585        k: usize,
586        sort_columns: Vec<TopKSortColumn>,
587        strategy: TopKEmitStrategy,
588    ) -> StreamingTopKOperator {
589        StreamingTopKOperator::new("test_topk".to_string(), k, sort_columns, strategy)
590    }
591
592    fn create_test_context<'a>(
593        timers: &'a mut TimerService,
594        state: &'a mut dyn crate::state::StateStore,
595        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
596    ) -> OperatorContext<'a> {
597        OperatorContext {
598            event_time: 0,
599            processing_time: 0,
600            timers,
601            state,
602            watermark_generator: watermark_gen,
603            operator_index: 0,
604        }
605    }
606
607    // --- Sort key encoding tests ---
608
609    #[test]
610    fn test_topk_sort_key_extraction_int64() {
611        let mut op = create_topk(
612            3,
613            vec![TopKSortColumn::ascending("value")],
614            TopKEmitStrategy::OnUpdate,
615        );
616        let e1 = make_event_i64(1, 100);
617        let e2 = make_event_i64(2, 200);
618        let e3 = make_event_i64(3, -50);
619
620        let k1 = op.extract_sort_key(&e1);
621        let k2 = op.extract_sort_key(&e2);
622        let k3 = op.extract_sort_key(&e3);
623
624        // Ascending: -50 < 100 < 200
625        assert!(k3 < k1);
626        assert!(k1 < k2);
627    }
628
629    #[test]
630    fn test_topk_sort_key_extraction_float64() {
631        let mut op = create_topk(
632            3,
633            vec![TopKSortColumn::descending("price")],
634            TopKEmitStrategy::OnUpdate,
635        );
636        let e1 = make_event(1, 150.0);
637        let e2 = make_event(2, 200.0);
638        let e3 = make_event(3, 100.0);
639
640        let k1 = op.extract_sort_key(&e1);
641        let k2 = op.extract_sort_key(&e2);
642        let k3 = op.extract_sort_key(&e3);
643
644        // Descending: 200 < 150 < 100 (in sort key order)
645        assert!(k2 < k1);
646        assert!(k1 < k3);
647    }
648
649    #[test]
650    fn test_topk_sort_key_extraction_utf8() {
651        let mut op = create_topk(
652            3,
653            vec![TopKSortColumn::ascending("name")],
654            TopKEmitStrategy::OnUpdate,
655        );
656        let e1 = make_event_str(1, "apple");
657        let e2 = make_event_str(2, "banana");
658        let e3 = make_event_str(3, "cherry");
659
660        let k1 = op.extract_sort_key(&e1);
661        let k2 = op.extract_sort_key(&e2);
662        let k3 = op.extract_sort_key(&e3);
663
664        // Ascending: apple < banana < cherry
665        assert!(k1 < k2);
666        assert!(k2 < k3);
667    }
668
669    #[test]
670    fn test_topk_sort_key_extraction_timestamp() {
671        let schema = Arc::new(Schema::new(vec![Field::new(
672            "ts",
673            DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
674            false,
675        )]));
676        let batch = RecordBatch::try_new(
677            schema,
678            vec![Arc::new(arrow_array::TimestampMicrosecondArray::from(
679                vec![1000],
680            ))],
681        )
682        .unwrap();
683        let event = Event::new(1, batch);
684
685        let mut op = create_topk(
686            3,
687            vec![TopKSortColumn::ascending("ts")],
688            TopKEmitStrategy::OnUpdate,
689        );
690        let key = op.extract_sort_key(&event);
691        assert!(!key.is_empty());
692    }
693
694    // --- Insertion tests ---
695
696    #[test]
697    fn test_topk_insert_below_capacity() {
698        let mut op = create_topk(
699            3,
700            vec![TopKSortColumn::descending("price")],
701            TopKEmitStrategy::OnUpdate,
702        );
703
704        let mut timers = TimerService::new();
705        let mut state = InMemoryStore::new();
706        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
707
708        let event = make_event(1, 150.0);
709        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
710        let outputs = op.process(&event, &mut ctx);
711
712        assert_eq!(op.len(), 1);
713        // Should emit an Insert changelog
714        assert!(!outputs.is_empty());
715    }
716
717    #[test]
718    fn test_topk_insert_at_capacity_better_entry() {
719        let mut op = create_topk(
720            2,
721            vec![TopKSortColumn::descending("price")],
722            TopKEmitStrategy::OnUpdate,
723        );
724
725        let mut timers = TimerService::new();
726        let mut state = InMemoryStore::new();
727        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
728
729        // Fill to capacity
730        for (i, price) in [100.0, 150.0].iter().enumerate() {
731            let event = make_event(i as i64, *price);
732            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
733            op.process(&event, &mut ctx);
734        }
735        assert_eq!(op.len(), 2);
736
737        // Insert a better entry (200 > 100, which is worst)
738        let better = make_event(3, 200.0);
739        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
740        let outputs = op.process(&better, &mut ctx);
741
742        assert_eq!(op.len(), 2);
743        // Should have Insert + rank changes + Delete for evicted
744        assert!(outputs.len() >= 2);
745    }
746
747    #[test]
748    fn test_topk_insert_at_capacity_worse_entry() {
749        let mut op = create_topk(
750            2,
751            vec![TopKSortColumn::descending("price")],
752            TopKEmitStrategy::OnUpdate,
753        );
754
755        let mut timers = TimerService::new();
756        let mut state = InMemoryStore::new();
757        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
758
759        // Fill with good entries
760        for (i, price) in [200.0, 150.0].iter().enumerate() {
761            let event = make_event(i as i64, *price);
762            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
763            op.process(&event, &mut ctx);
764        }
765
766        // Insert a worse entry (50 < 150)
767        let worse = make_event(3, 50.0);
768        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
769        let outputs = op.process(&worse, &mut ctx);
770
771        assert_eq!(op.len(), 2);
772        // No emission - entry doesn't enter top-K
773        assert!(outputs.is_empty());
774    }
775
776    #[test]
777    fn test_topk_ascending_order() {
778        let mut op = create_topk(
779            3,
780            vec![TopKSortColumn::ascending("value")],
781            TopKEmitStrategy::OnUpdate,
782        );
783
784        let mut timers = TimerService::new();
785        let mut state = InMemoryStore::new();
786        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
787
788        // Insert values: 30, 10, 20 -> top-3 ascending should be [10, 20, 30]
789        for (i, val) in [30i64, 10, 20].iter().enumerate() {
790            let event = make_event_i64(i as i64, *val);
791            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
792            op.process(&event, &mut ctx);
793        }
794
795        assert_eq!(op.len(), 3);
796        // Entries should be sorted by ascending value
797        let entries = op.entries();
798        let vals: Vec<i64> = entries
799            .iter()
800            .map(|e| {
801                e.data
802                    .column(0)
803                    .as_any()
804                    .downcast_ref::<Int64Array>()
805                    .unwrap()
806                    .value(0)
807            })
808            .collect();
809        assert_eq!(vals, vec![10, 20, 30]);
810    }
811
812    #[test]
813    fn test_topk_descending_order() {
814        let mut op = create_topk(
815            3,
816            vec![TopKSortColumn::descending("price")],
817            TopKEmitStrategy::OnUpdate,
818        );
819
820        let mut timers = TimerService::new();
821        let mut state = InMemoryStore::new();
822        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
823
824        // Insert: 100, 200, 150 -> top-3 descending should have 200 first
825        for (i, price) in [100.0, 200.0, 150.0].iter().enumerate() {
826            let event = make_event(i as i64, *price);
827            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
828            op.process(&event, &mut ctx);
829        }
830
831        let entries = op.entries();
832        let prices: Vec<f64> = entries
833            .iter()
834            .map(|e| {
835                e.data
836                    .column(0)
837                    .as_any()
838                    .downcast_ref::<Float64Array>()
839                    .unwrap()
840                    .value(0)
841            })
842            .collect();
843        assert_eq!(prices, vec![200.0, 150.0, 100.0]);
844    }
845
846    #[test]
847    fn test_topk_multi_column_sort() {
848        let mut op = create_topk(
849            3,
850            vec![
851                TopKSortColumn::ascending("category"),
852                TopKSortColumn::descending("price"),
853            ],
854            TopKEmitStrategy::OnUpdate,
855        );
856
857        let mut timers = TimerService::new();
858        let mut state = InMemoryStore::new();
859        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
860
861        let events = vec![
862            make_multi_column_event(1, "B", 100.0),
863            make_multi_column_event(2, "A", 200.0),
864            make_multi_column_event(3, "A", 150.0),
865        ];
866
867        for event in &events {
868            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
869            op.process(event, &mut ctx);
870        }
871
872        // Sort: category ASC, then price DESC within same category
873        // A/200, A/150, B/100
874        let entries = op.entries();
875        let cats: Vec<&str> = entries
876            .iter()
877            .map(|e| {
878                e.data
879                    .column(0)
880                    .as_any()
881                    .downcast_ref::<StringArray>()
882                    .unwrap()
883                    .value(0)
884            })
885            .collect();
886        assert_eq!(cats, vec!["A", "A", "B"]);
887    }
888
889    #[test]
890    fn test_topk_nulls_first() {
891        let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(true)];
892        let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
893
894        let mut timers = TimerService::new();
895        let mut state = InMemoryStore::new();
896        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
897
898        // Create a null value event
899        let schema = Arc::new(Schema::new(vec![Field::new(
900            "value",
901            DataType::Int64,
902            true,
903        )]));
904        let null_array = Int64Array::new_null(1);
905        let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
906        let null_event = Event::new(1, batch);
907
908        let val_event = make_event_i64(2, 100);
909
910        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
911        op.process(&val_event, &mut ctx);
912        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
913        op.process(&null_event, &mut ctx);
914
915        // With nulls_first, the null should sort before 100
916        let entries = op.entries();
917        assert_eq!(entries.len(), 2);
918        assert!(entries[0].data.column(0).is_null(0));
919    }
920
921    #[test]
922    fn test_topk_nulls_last() {
923        let sort_cols = vec![TopKSortColumn::ascending("value").with_nulls_first(false)];
924        let mut op = create_topk(3, sort_cols, TopKEmitStrategy::OnUpdate);
925
926        let mut timers = TimerService::new();
927        let mut state = InMemoryStore::new();
928        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
929
930        let schema = Arc::new(Schema::new(vec![Field::new(
931            "value",
932            DataType::Int64,
933            true,
934        )]));
935        let null_array = Int64Array::new_null(1);
936        let batch = RecordBatch::try_new(schema, vec![Arc::new(null_array)]).unwrap();
937        let null_event = Event::new(1, batch);
938
939        let val_event = make_event_i64(2, 100);
940
941        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
942        op.process(&null_event, &mut ctx);
943        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
944        op.process(&val_event, &mut ctx);
945
946        // With nulls_last, the value should sort before null
947        let entries = op.entries();
948        assert_eq!(entries.len(), 2);
949        assert!(!entries[0].data.column(0).is_null(0));
950    }
951
952    // --- Emit strategy tests ---
953
954    #[test]
955    fn test_topk_emit_on_update_insert() {
956        let mut op = create_topk(
957            3,
958            vec![TopKSortColumn::descending("price")],
959            TopKEmitStrategy::OnUpdate,
960        );
961
962        let mut timers = TimerService::new();
963        let mut state = InMemoryStore::new();
964        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
965
966        let event = make_event(1, 150.0);
967        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
968        let outputs = op.process(&event, &mut ctx);
969
970        // Should emit Insert changelog immediately
971        assert_eq!(outputs.len(), 1);
972        match &outputs[0] {
973            Output::Changelog(rec) => {
974                assert_eq!(rec.operation, CdcOperation::Insert);
975                assert_eq!(rec.weight, 1);
976            }
977            _ => panic!("Expected Changelog output"),
978        }
979    }
980
981    #[test]
982    fn test_topk_emit_on_update_eviction() {
983        let mut op = create_topk(
984            1,
985            vec![TopKSortColumn::descending("price")],
986            TopKEmitStrategy::OnUpdate,
987        );
988
989        let mut timers = TimerService::new();
990        let mut state = InMemoryStore::new();
991        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
992
993        // Fill with one entry
994        let event1 = make_event(1, 100.0);
995        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
996        op.process(&event1, &mut ctx);
997
998        // Better entry evicts the first
999        let event2 = make_event(2, 200.0);
1000        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1001        let outputs = op.process(&event2, &mut ctx);
1002
1003        // Should have Insert + Delete (eviction)
1004        let mut has_insert = false;
1005        let mut has_delete = false;
1006        for output in &outputs {
1007            if let Output::Changelog(rec) = output {
1008                match rec.operation {
1009                    CdcOperation::Insert => has_insert = true,
1010                    CdcOperation::Delete => has_delete = true,
1011                    _ => {}
1012                }
1013            }
1014        }
1015        assert!(has_insert);
1016        assert!(has_delete);
1017    }
1018
1019    #[test]
1020    fn test_topk_emit_on_update_rank_change() {
1021        let mut op = create_topk(
1022            3,
1023            vec![TopKSortColumn::descending("price")],
1024            TopKEmitStrategy::OnUpdate,
1025        );
1026
1027        let mut timers = TimerService::new();
1028        let mut state = InMemoryStore::new();
1029        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1030
1031        // Insert two entries
1032        let e1 = make_event(1, 100.0);
1033        let e2 = make_event(2, 200.0);
1034        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1035        op.process(&e1, &mut ctx);
1036        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1037        op.process(&e2, &mut ctx);
1038
1039        // Insert between them: 150 goes between 200 and 100, shifting 100's rank
1040        let e3 = make_event(3, 150.0);
1041        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1042        let outputs = op.process(&e3, &mut ctx);
1043
1044        // Should have Insert for 150 + UpdateBefore/UpdateAfter for 100's rank change
1045        let mut has_insert = false;
1046        let mut has_update_before = false;
1047        let mut has_update_after = false;
1048        for output in &outputs {
1049            if let Output::Changelog(rec) = output {
1050                match rec.operation {
1051                    CdcOperation::Insert => has_insert = true,
1052                    CdcOperation::UpdateBefore => has_update_before = true,
1053                    CdcOperation::UpdateAfter => has_update_after = true,
1054                    CdcOperation::Delete => {}
1055                }
1056            }
1057        }
1058        assert!(has_insert);
1059        assert!(has_update_before);
1060        assert!(has_update_after);
1061    }
1062
1063    #[test]
1064    fn test_topk_emit_on_watermark_batched() {
1065        let mut op = create_topk(
1066            3,
1067            vec![TopKSortColumn::descending("price")],
1068            TopKEmitStrategy::OnWatermark,
1069        );
1070
1071        let mut timers = TimerService::new();
1072        let mut state = InMemoryStore::new();
1073        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1074
1075        // Insert events — should not emit immediately
1076        let e1 = make_event(1, 100.0);
1077        let e2 = make_event(2, 200.0);
1078        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1079        let out1 = op.process(&e1, &mut ctx);
1080        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1081        let out2 = op.process(&e2, &mut ctx);
1082
1083        assert!(out1.is_empty());
1084        assert!(out2.is_empty());
1085        assert!(op.pending_changes_count() > 0);
1086    }
1087
1088    #[test]
1089    fn test_topk_emit_periodic() {
1090        let mut op = create_topk(
1091            3,
1092            vec![TopKSortColumn::descending("price")],
1093            TopKEmitStrategy::Periodic(1000),
1094        );
1095
1096        let mut timers = TimerService::new();
1097        let mut state = InMemoryStore::new();
1098        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1099
1100        // Insert events — buffered
1101        let e1 = make_event(1, 100.0);
1102        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1103        op.process(&e1, &mut ctx);
1104
1105        assert!(op.pending_changes_count() > 0);
1106
1107        // Timer triggers flush
1108        let timer = Timer {
1109            key: smallvec::smallvec![],
1110            timestamp: 1000,
1111        };
1112        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1113        let outputs = op.on_timer(timer, &mut ctx);
1114
1115        assert!(!outputs.is_empty());
1116        assert_eq!(op.pending_changes_count(), 0);
1117    }
1118
1119    // --- Edge case tests ---
1120
1121    #[test]
1122    fn test_topk_empty_heap() {
1123        let op = create_topk(
1124            3,
1125            vec![TopKSortColumn::descending("price")],
1126            TopKEmitStrategy::OnUpdate,
1127        );
1128        assert!(op.is_empty());
1129        assert_eq!(op.len(), 0);
1130    }
1131
1132    #[test]
1133    fn test_topk_k_equals_one() {
1134        let mut op = create_topk(
1135            1,
1136            vec![TopKSortColumn::descending("price")],
1137            TopKEmitStrategy::OnUpdate,
1138        );
1139
1140        let mut timers = TimerService::new();
1141        let mut state = InMemoryStore::new();
1142        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1143
1144        let e1 = make_event(1, 100.0);
1145        let e2 = make_event(2, 200.0);
1146        let e3 = make_event(3, 50.0);
1147
1148        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1149        op.process(&e1, &mut ctx);
1150        assert_eq!(op.len(), 1);
1151
1152        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1153        op.process(&e2, &mut ctx);
1154        assert_eq!(op.len(), 1);
1155
1156        // Verify it kept the best (200)
1157        let entries = op.entries();
1158        let price = entries[0]
1159            .data
1160            .column(0)
1161            .as_any()
1162            .downcast_ref::<Float64Array>()
1163            .unwrap()
1164            .value(0);
1165        assert!((price - 200.0).abs() < f64::EPSILON);
1166
1167        // Worse entry doesn't change anything
1168        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1169        let outputs = op.process(&e3, &mut ctx);
1170        assert!(outputs.is_empty());
1171    }
1172
1173    #[test]
1174    fn test_topk_large_k() {
1175        let mut op = create_topk(
1176            100,
1177            vec![TopKSortColumn::ascending("value")],
1178            TopKEmitStrategy::OnUpdate,
1179        );
1180
1181        let mut timers = TimerService::new();
1182        let mut state = InMemoryStore::new();
1183        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1184
1185        for i in 0..50 {
1186            let event = make_event_i64(i, i * 10);
1187            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1188            op.process(&event, &mut ctx);
1189        }
1190
1191        assert_eq!(op.len(), 50);
1192    }
1193
1194    #[test]
1195    fn test_topk_duplicate_sort_keys() {
1196        let mut op = create_topk(
1197            3,
1198            vec![TopKSortColumn::descending("price")],
1199            TopKEmitStrategy::OnUpdate,
1200        );
1201
1202        let mut timers = TimerService::new();
1203        let mut state = InMemoryStore::new();
1204        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1205
1206        // Insert three events with the same price
1207        for i in 0..3 {
1208            let event = make_event(i, 100.0);
1209            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1210            op.process(&event, &mut ctx);
1211        }
1212
1213        assert_eq!(op.len(), 3);
1214    }
1215
1216    // --- Checkpoint/restore tests ---
1217
1218    #[test]
1219    fn test_topk_checkpoint_roundtrip() {
1220        let mut op = create_topk(
1221            3,
1222            vec![TopKSortColumn::descending("price")],
1223            TopKEmitStrategy::OnUpdate,
1224        );
1225
1226        let mut timers = TimerService::new();
1227        let mut state = InMemoryStore::new();
1228        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1229
1230        for (i, price) in [150.0, 200.0, 100.0].iter().enumerate() {
1231            let event = make_event(i as i64, *price);
1232            let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1233            op.process(&event, &mut ctx);
1234        }
1235
1236        let checkpoint = op.checkpoint();
1237        assert_eq!(checkpoint.operator_id, "test_topk");
1238        assert!(!checkpoint.data.is_empty());
1239
1240        // Restore to a new operator
1241        let mut op2 = create_topk(
1242            3,
1243            vec![TopKSortColumn::descending("price")],
1244            TopKEmitStrategy::OnUpdate,
1245        );
1246        op2.restore(checkpoint).unwrap();
1247
1248        assert_eq!(op2.len(), 3);
1249    }
1250
1251    #[test]
1252    fn test_topk_restore_and_continue() {
1253        let mut op = create_topk(
1254            2,
1255            vec![TopKSortColumn::descending("price")],
1256            TopKEmitStrategy::OnUpdate,
1257        );
1258
1259        let mut timers = TimerService::new();
1260        let mut state = InMemoryStore::new();
1261        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1262
1263        let event = make_event(1, 150.0);
1264        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1265        op.process(&event, &mut ctx);
1266
1267        let checkpoint = op.checkpoint();
1268
1269        let mut op2 = create_topk(
1270            2,
1271            vec![TopKSortColumn::descending("price")],
1272            TopKEmitStrategy::OnUpdate,
1273        );
1274        op2.restore(checkpoint).unwrap();
1275
1276        // Should be able to continue processing
1277        assert_eq!(op2.len(), 1);
1278    }
1279
1280    // --- Changelog record tests ---
1281
1282    #[test]
1283    fn test_topk_changelog_record_types() {
1284        let record = ChangelogRecord::insert(make_event(1, 100.0), 1);
1285        assert_eq!(record.operation, CdcOperation::Insert);
1286        assert_eq!(record.weight, 1);
1287
1288        let record = ChangelogRecord::delete(make_event(1, 100.0), 1);
1289        assert_eq!(record.operation, CdcOperation::Delete);
1290        assert_eq!(record.weight, -1);
1291
1292        let (before, after) =
1293            ChangelogRecord::update(make_event(1, 100.0), make_event(2, 200.0), 1);
1294        assert_eq!(before.operation, CdcOperation::UpdateBefore);
1295        assert_eq!(after.operation, CdcOperation::UpdateAfter);
1296    }
1297
1298    #[test]
1299    fn test_topk_no_emission_on_no_change() {
1300        let mut op = create_topk(
1301            2,
1302            vec![TopKSortColumn::descending("price")],
1303            TopKEmitStrategy::OnUpdate,
1304        );
1305
1306        let mut timers = TimerService::new();
1307        let mut state = InMemoryStore::new();
1308        let mut wm = BoundedOutOfOrdernessGenerator::new(0);
1309
1310        // Fill to capacity with good entries
1311        let e1 = make_event(1, 200.0);
1312        let e2 = make_event(2, 150.0);
1313        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1314        op.process(&e1, &mut ctx);
1315        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1316        op.process(&e2, &mut ctx);
1317
1318        // Worse entry — no change
1319        let e3 = make_event(3, 50.0);
1320        let mut ctx = create_test_context(&mut timers, &mut state, &mut wm);
1321        let outputs = op.process(&e3, &mut ctx);
1322
1323        assert!(outputs.is_empty());
1324    }
1325}