Skip to main content

laminar_core/operator/
changelog.rs

1//! # Changelog and Retraction Support
2//!
3//! Z-set style changelog records with integer weights for incremental computation.
4//! This is the foundation for exactly-once sinks, cascading materialized views,
5//! and CDC connectors.
6//!
7//! ## Key Concepts
8//!
9//! - **Z-sets**: Elements have integer weights. Weight > 0 → insert, weight < 0 → delete.
10//! - **Retraction**: Emitting (-old, +new) pairs to correct previous results.
11//! - **CDC Envelope**: Debezium-compatible format for downstream systems.
12//!
13//! ## Ring Architecture
14//!
15//! - **Ring 0**: `ChangelogRef` and `ChangelogBuffer` for zero-allocation hot path
16//! - **Ring 1**: `LateDataRetractionGenerator` and `CdcEnvelope` serialization
17//! - **Ring 2**: Changelog configuration and CDC format selection
18//!
19//! ## Example
20//!
21//! ```rust,no_run
22//! use laminar_core::operator::changelog::{
23//!     ChangelogBuffer, ChangelogRef, RetractableCountAccumulator,
24//!     RetractableAccumulator, CdcEnvelope, CdcSource,
25//! };
26//! use laminar_core::operator::window::CdcOperation;
27//!
28//! // Ring 0: Zero-allocation changelog tracking
29//! let mut buffer = ChangelogBuffer::with_capacity(1024);
30//! buffer.push(ChangelogRef::insert(0, 0));
31//! buffer.push(ChangelogRef::delete(0, 1));
32//!
33//! // Ring 1: Retractable aggregation
34//! let mut agg = RetractableCountAccumulator::default();
35//! agg.add(());
36//! agg.add(());
37//! assert_eq!(agg.result(), 2);
38//! agg.retract(&());
39//! assert_eq!(agg.result(), 1);
40//!
41//! // CDC envelope for sinks
42//! let source = CdcSource::new("laminardb", "default", "orders");
43//! let envelope = CdcEnvelope::insert(serde_json::json!({"id": 1}), source, 1000);
44//! ```
45
46use super::window::{CdcOperation, WindowId};
47use rustc_hash::FxHashMap;
48use serde::{Deserialize, Serialize};
49
50// Ring 0: Zero-Allocation Types
51
52/// Zero-allocation changelog reference for Ring 0 hot path.
53///
54/// Instead of allocating a full `ChangelogRecord`, this stores
55/// offsets into the event batch with the operation type.
56///
57/// Size: 12 bytes (u32 + u32 + i16 + u8 + padding)
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59#[repr(C)]
60pub struct ChangelogRef {
61    /// Offset into the current batch
62    pub batch_offset: u32,
63    /// Row index within the batch
64    pub row_index: u32,
65    /// Z-set weight (+1 or -1)
66    pub weight: i16,
67    /// Operation type (stored as u8 for compactness)
68    operation_raw: u8,
69}
70
71impl ChangelogRef {
72    /// Creates a new changelog reference.
73    #[inline]
74    #[must_use]
75    pub fn new(batch_offset: u32, row_index: u32, weight: i16, operation: CdcOperation) -> Self {
76        Self {
77            batch_offset,
78            row_index,
79            weight,
80            operation_raw: operation.to_u8(),
81        }
82    }
83
84    /// Creates an insert reference.
85    #[inline]
86    #[must_use]
87    pub fn insert(batch_offset: u32, row_index: u32) -> Self {
88        Self::new(batch_offset, row_index, 1, CdcOperation::Insert)
89    }
90
91    /// Creates a delete reference.
92    #[inline]
93    #[must_use]
94    pub fn delete(batch_offset: u32, row_index: u32) -> Self {
95        Self::new(batch_offset, row_index, -1, CdcOperation::Delete)
96    }
97
98    /// Creates an update-before reference (retraction).
99    #[inline]
100    #[must_use]
101    pub fn update_before(batch_offset: u32, row_index: u32) -> Self {
102        Self::new(batch_offset, row_index, -1, CdcOperation::UpdateBefore)
103    }
104
105    /// Creates an update-after reference.
106    #[inline]
107    #[must_use]
108    pub fn update_after(batch_offset: u32, row_index: u32) -> Self {
109        Self::new(batch_offset, row_index, 1, CdcOperation::UpdateAfter)
110    }
111
112    /// Returns the CDC operation type.
113    #[inline]
114    #[must_use]
115    pub fn operation(&self) -> CdcOperation {
116        CdcOperation::from_u8(self.operation_raw)
117    }
118
119    /// Returns true if this is an insert-type operation.
120    #[inline]
121    #[must_use]
122    pub fn is_insert(&self) -> bool {
123        self.weight > 0
124    }
125
126    /// Returns true if this is a delete-type operation.
127    #[inline]
128    #[must_use]
129    pub fn is_delete(&self) -> bool {
130        self.weight < 0
131    }
132}
133
134/// Ring 0 changelog buffer (pre-allocated, reused per epoch).
135///
136/// This buffer stores changelog references without allocating on the hot path
137/// (after initial warmup). When the buffer is full, it signals backpressure.
138///
139/// # Example
140///
141/// ```rust,no_run
142/// use laminar_core::operator::changelog::{ChangelogBuffer, ChangelogRef};
143///
144/// let mut buffer = ChangelogBuffer::with_capacity(1024);
145///
146/// // Push references (no allocation after warmup)
147/// for i in 0..100 {
148///     buffer.push(ChangelogRef::insert(i, 0));
149/// }
150///
151/// // Drain for Ring 1 processing
152/// for changelog_ref in buffer.drain() {
153///     // Process in Ring 1
154/// }
155/// ```
156pub struct ChangelogBuffer {
157    /// Pre-allocated changelog references
158    refs: Vec<ChangelogRef>,
159    /// Current write position
160    len: usize,
161    /// Capacity
162    capacity: usize,
163}
164
165impl ChangelogBuffer {
166    /// Creates a new buffer with the given capacity.
167    #[must_use]
168    pub fn with_capacity(capacity: usize) -> Self {
169        let mut refs = Vec::with_capacity(capacity);
170        // Pre-warm the buffer to avoid allocations during hot path
171        refs.resize(
172            capacity,
173            ChangelogRef {
174                batch_offset: 0,
175                row_index: 0,
176                weight: 0,
177                operation_raw: 0,
178            },
179        );
180        Self {
181            refs,
182            len: 0,
183            capacity,
184        }
185    }
186
187    /// Pushes a changelog reference (no allocation if under capacity).
188    ///
189    /// Returns `true` if the reference was added, `false` if buffer is full
190    /// (backpressure signal).
191    #[inline]
192    pub fn push(&mut self, changelog_ref: ChangelogRef) -> bool {
193        if self.len < self.capacity {
194            self.refs[self.len] = changelog_ref;
195            self.len += 1;
196            true
197        } else {
198            false // Buffer full - backpressure signal
199        }
200    }
201
202    /// Pushes a retraction pair (update-before, update-after).
203    ///
204    /// Returns `true` if both references were added, `false` if buffer is full.
205    #[inline]
206    pub fn push_retraction(
207        &mut self,
208        batch_offset: u32,
209        old_row_index: u32,
210        new_row_index: u32,
211    ) -> bool {
212        if self.len + 2 <= self.capacity {
213            self.refs[self.len] = ChangelogRef::update_before(batch_offset, old_row_index);
214            self.refs[self.len + 1] = ChangelogRef::update_after(batch_offset, new_row_index);
215            self.len += 2;
216            true
217        } else {
218            false
219        }
220    }
221
222    /// Drains references for Ring 1 processing.
223    ///
224    /// After draining, the buffer is empty but retains its capacity.
225    pub fn drain(&mut self) -> impl Iterator<Item = ChangelogRef> + '_ {
226        let len = self.len;
227        self.len = 0;
228        self.refs[..len].iter().copied()
229    }
230
231    /// Returns current count of references.
232    #[inline]
233    #[must_use]
234    pub fn len(&self) -> usize {
235        self.len
236    }
237
238    /// Returns true if the buffer is empty.
239    #[inline]
240    #[must_use]
241    pub fn is_empty(&self) -> bool {
242        self.len == 0
243    }
244
245    /// Returns true if the buffer is full.
246    #[inline]
247    #[must_use]
248    pub fn is_full(&self) -> bool {
249        self.len >= self.capacity
250    }
251
252    /// Returns the buffer capacity.
253    #[inline]
254    #[must_use]
255    pub fn capacity(&self) -> usize {
256        self.capacity
257    }
258
259    /// Returns available space in the buffer.
260    #[inline]
261    #[must_use]
262    pub fn available(&self) -> usize {
263        self.capacity.saturating_sub(self.len)
264    }
265
266    /// Clears the buffer without deallocating.
267    #[inline]
268    pub fn clear(&mut self) {
269        self.len = 0;
270    }
271
272    /// Returns a slice of the current references.
273    #[must_use]
274    pub fn as_slice(&self) -> &[ChangelogRef] {
275        &self.refs[..self.len]
276    }
277}
278
279impl Default for ChangelogBuffer {
280    fn default() -> Self {
281        Self::with_capacity(1024)
282    }
283}
284
285// Retractable Aggregators
286
287/// Extension trait for accumulators that support retractions.
288///
289/// Retractable accumulators can "un-apply" a value, which is essential for:
290/// - Late data corrections (emit -old, +new pairs)
291/// - Cascading materialized views
292/// - Changelog-based downstream consumers
293///
294/// # Retraction Efficiency
295///
296/// Some aggregators support O(1) retraction (count, sum, avg), while others
297/// may require O(n) recomputation (min, max without value tracking).
298/// Use `supports_efficient_retraction()` to check.
299pub trait RetractableAccumulator: Default + Clone + Send {
300    /// The input type for the aggregation.
301    type Input;
302    /// The output type produced by the aggregation.
303    type Output;
304
305    /// Adds a value to the accumulator.
306    fn add(&mut self, value: Self::Input);
307
308    /// Retracts (un-applies) a value from the accumulator.
309    ///
310    /// This is the inverse of `add`. For example:
311    /// - Count: decrement by 1
312    /// - Sum: subtract the value
313    /// - Avg: update sum and count
314    fn retract(&mut self, value: &Self::Input);
315
316    /// Merges another accumulator into this one.
317    fn merge(&mut self, other: &Self);
318
319    /// Extracts the final result from the accumulator.
320    fn result(&self) -> Self::Output;
321
322    /// Returns true if the accumulator is empty (no values added).
323    fn is_empty(&self) -> bool;
324
325    /// Returns true if this accumulator can efficiently retract.
326    ///
327    /// Some aggregators (like Min/Max without value tracking) may need to
328    /// scan all values on retraction if the retracted value was the current
329    /// min/max.
330    fn supports_efficient_retraction(&self) -> bool {
331        true
332    }
333
334    /// Resets the accumulator to its initial state.
335    fn reset(&mut self);
336}
337
338/// Retractable count accumulator.
339///
340/// Uses signed integer to support negative counts from retractions.
341/// In a correct pipeline, the count should never go negative.
342#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
343pub struct RetractableCountAccumulator {
344    /// Signed count to support retraction
345    count: i64,
346}
347
348impl RetractableCountAccumulator {
349    /// Creates a new count accumulator.
350    #[must_use]
351    pub fn new() -> Self {
352        Self::default()
353    }
354
355    /// Returns the current count (may be negative during retraction).
356    #[must_use]
357    pub fn count(&self) -> i64 {
358        self.count
359    }
360}
361
362impl RetractableAccumulator for RetractableCountAccumulator {
363    type Input = ();
364    type Output = i64;
365
366    #[inline]
367    fn add(&mut self, _value: ()) {
368        self.count += 1;
369    }
370
371    #[inline]
372    fn retract(&mut self, _value: &()) {
373        self.count -= 1;
374    }
375
376    fn merge(&mut self, other: &Self) {
377        self.count += other.count;
378    }
379
380    fn result(&self) -> i64 {
381        self.count
382    }
383
384    fn is_empty(&self) -> bool {
385        self.count == 0
386    }
387
388    fn reset(&mut self) {
389        self.count = 0;
390    }
391}
392
393/// Retractable sum accumulator.
394///
395/// Supports O(1) retraction by simple subtraction.
396#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
397pub struct RetractableSumAccumulator {
398    /// Running sum (signed)
399    sum: i64,
400    /// Count of values for `is_empty` check
401    count: i64,
402}
403
404impl RetractableSumAccumulator {
405    /// Creates a new sum accumulator.
406    #[must_use]
407    pub fn new() -> Self {
408        Self::default()
409    }
410
411    /// Returns the current sum.
412    #[must_use]
413    pub fn sum(&self) -> i64 {
414        self.sum
415    }
416}
417
418impl RetractableAccumulator for RetractableSumAccumulator {
419    type Input = i64;
420    type Output = i64;
421
422    #[inline]
423    fn add(&mut self, value: i64) {
424        self.sum += value;
425        self.count += 1;
426    }
427
428    #[inline]
429    fn retract(&mut self, value: &i64) {
430        self.sum -= value;
431        self.count -= 1;
432    }
433
434    fn merge(&mut self, other: &Self) {
435        self.sum += other.sum;
436        self.count += other.count;
437    }
438
439    fn result(&self) -> i64 {
440        self.sum
441    }
442
443    fn is_empty(&self) -> bool {
444        self.count == 0
445    }
446
447    fn reset(&mut self) {
448        self.sum = 0;
449        self.count = 0;
450    }
451}
452
453/// Retractable average accumulator.
454///
455/// Supports O(1) retraction by updating sum and count.
456#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
457pub struct RetractableAvgAccumulator {
458    /// Running sum
459    sum: i64,
460    /// Count of values
461    count: i64,
462}
463
464impl RetractableAvgAccumulator {
465    /// Creates a new average accumulator.
466    #[must_use]
467    pub fn new() -> Self {
468        Self::default()
469    }
470
471    /// Returns the current sum.
472    #[must_use]
473    pub fn sum(&self) -> i64 {
474        self.sum
475    }
476
477    /// Returns the current count.
478    #[must_use]
479    pub fn count(&self) -> i64 {
480        self.count
481    }
482}
483
484impl RetractableAccumulator for RetractableAvgAccumulator {
485    type Input = i64;
486    type Output = Option<f64>;
487
488    #[inline]
489    fn add(&mut self, value: i64) {
490        self.sum += value;
491        self.count += 1;
492    }
493
494    #[inline]
495    fn retract(&mut self, value: &i64) {
496        self.sum -= value;
497        self.count -= 1;
498    }
499
500    fn merge(&mut self, other: &Self) {
501        self.sum += other.sum;
502        self.count += other.count;
503    }
504
505    #[allow(clippy::cast_precision_loss)]
506    fn result(&self) -> Option<f64> {
507        if self.count > 0 {
508            Some(self.sum as f64 / self.count as f64)
509        } else {
510            None
511        }
512    }
513
514    fn is_empty(&self) -> bool {
515        self.count == 0
516    }
517
518    fn reset(&mut self) {
519        self.sum = 0;
520        self.count = 0;
521    }
522}
523
524/// Retractable min accumulator with counted value tracking.
525///
526/// Uses a `BTreeMap<i64, usize>` (value → count) for O(log n) insert/remove
527/// and O(1) min via `first_key_value()`. Bounded memory proportional to
528/// distinct values, not total event count.
529#[derive(Debug, Clone, Default)]
530pub struct RetractableMinAccumulator {
531    /// Value → occurrence count (sorted for O(1) min access)
532    counts: std::collections::BTreeMap<i64, usize>,
533}
534
535impl RetractableMinAccumulator {
536    /// Creates a new min accumulator.
537    #[must_use]
538    pub fn new() -> Self {
539        Self::default()
540    }
541}
542
543impl RetractableAccumulator for RetractableMinAccumulator {
544    type Input = i64;
545    type Output = Option<i64>;
546
547    fn add(&mut self, value: i64) {
548        *self.counts.entry(value).or_insert(0) += 1;
549    }
550
551    fn retract(&mut self, value: &i64) {
552        if let Some(count) = self.counts.get_mut(value) {
553            *count -= 1;
554            if *count == 0 {
555                self.counts.remove(value);
556            }
557        }
558    }
559
560    fn merge(&mut self, other: &Self) {
561        for (&val, &cnt) in &other.counts {
562            *self.counts.entry(val).or_insert(0) += cnt;
563        }
564    }
565
566    fn result(&self) -> Option<i64> {
567        self.counts.keys().next().copied()
568    }
569
570    fn is_empty(&self) -> bool {
571        self.counts.is_empty()
572    }
573
574    fn supports_efficient_retraction(&self) -> bool {
575        true
576    }
577
578    fn reset(&mut self) {
579        self.counts.clear();
580    }
581}
582
583/// Retractable max accumulator with counted value tracking.
584///
585/// Uses a `BTreeMap<i64, usize>` (value → count) for O(log n) insert/remove
586/// and O(1) max via `last_key_value()`. Bounded memory proportional to
587/// distinct values, not total event count.
588#[derive(Debug, Clone, Default)]
589pub struct RetractableMaxAccumulator {
590    /// Value → occurrence count (sorted for O(1) max access)
591    counts: std::collections::BTreeMap<i64, usize>,
592}
593
594impl RetractableMaxAccumulator {
595    /// Creates a new max accumulator.
596    #[must_use]
597    pub fn new() -> Self {
598        Self::default()
599    }
600}
601
602impl RetractableAccumulator for RetractableMaxAccumulator {
603    type Input = i64;
604    type Output = Option<i64>;
605
606    fn add(&mut self, value: i64) {
607        *self.counts.entry(value).or_insert(0) += 1;
608    }
609
610    fn retract(&mut self, value: &i64) {
611        if let Some(count) = self.counts.get_mut(value) {
612            *count -= 1;
613            if *count == 0 {
614                self.counts.remove(value);
615            }
616        }
617    }
618
619    fn merge(&mut self, other: &Self) {
620        for (&val, &cnt) in &other.counts {
621            *self.counts.entry(val).or_insert(0) += cnt;
622        }
623    }
624
625    fn result(&self) -> Option<i64> {
626        self.counts.keys().next_back().copied()
627    }
628
629    fn is_empty(&self) -> bool {
630        self.counts.is_empty()
631    }
632
633    fn supports_efficient_retraction(&self) -> bool {
634        true
635    }
636
637    fn reset(&mut self) {
638        self.counts.clear();
639    }
640}
641
642// Late Data Retraction Generator
643
644/// Tracks previously emitted results for generating late data retractions.
645#[derive(Debug, Clone)]
646struct EmittedResult {
647    /// The emitted data (serialized for comparison)
648    data: Vec<u8>,
649    /// Timestamp when emitted
650    emit_time: i64,
651    /// Number of times re-emitted (for metrics)
652    version: u32,
653}
654
655/// Generates retractions for late data corrections.
656///
657/// When late data arrives and updates an already-emitted window result,
658/// this generator produces:
659/// 1. A retraction (-1 weight) for the old result
660/// 2. An insert (+1 weight) for the new result
661///
662/// # Example
663///
664/// ```rust,no_run
665/// use laminar_core::operator::changelog::LateDataRetractionGenerator;
666/// use laminar_core::operator::window::WindowId;
667///
668/// let mut gen = LateDataRetractionGenerator::new(true);
669/// let window_id = WindowId::new(0, 60000);
670///
671/// // First emission - no retraction needed
672/// let result1 = gen.check_retraction(&window_id, b"count=5", 1000);
673/// assert!(result1.is_none());
674///
675/// // Late data changes result - generates retraction
676/// let result2 = gen.check_retraction(&window_id, b"count=7", 2000);
677/// assert!(result2.is_some());
678/// let (old, new) = result2.unwrap();
679/// assert_eq!(old.as_slice(), b"count=5");
680/// assert_eq!(new.as_slice(), b"count=7");
681///
682/// // Same result - no retraction
683/// let result3 = gen.check_retraction(&window_id, b"count=7", 3000);
684/// assert!(result3.is_none());
685/// ```
686pub struct LateDataRetractionGenerator {
687    /// Previously emitted results (for generating retractions)
688    emitted_results: FxHashMap<WindowId, EmittedResult>,
689    /// Whether retraction generation is enabled
690    enabled: bool,
691    /// Metrics: total retractions generated
692    retractions_generated: u64,
693    /// Metrics: total windows tracked
694    windows_tracked: u64,
695}
696
697impl LateDataRetractionGenerator {
698    /// Creates a new generator.
699    #[must_use]
700    pub fn new(enabled: bool) -> Self {
701        Self {
702            emitted_results: FxHashMap::default(),
703            enabled,
704            retractions_generated: 0,
705            windows_tracked: 0,
706        }
707    }
708
709    /// Creates a disabled generator (no-op).
710    #[must_use]
711    pub fn disabled() -> Self {
712        Self::new(false)
713    }
714
715    /// Returns true if retraction generation is enabled.
716    #[must_use]
717    pub fn is_enabled(&self) -> bool {
718        self.enabled
719    }
720
721    /// Enables or disables retraction generation.
722    pub fn set_enabled(&mut self, enabled: bool) {
723        self.enabled = enabled;
724    }
725
726    /// Checks if we need to generate a retraction for this window.
727    ///
728    /// Returns `Some((old_data, new_data))` if the window was previously
729    /// emitted with different data. Returns `None` if this is the first
730    /// emission or the data hasn't changed.
731    pub fn check_retraction(
732        &mut self,
733        window_id: &WindowId,
734        new_data: &[u8],
735        timestamp: i64,
736    ) -> Option<(Vec<u8>, Vec<u8>)> {
737        if !self.enabled {
738            return None;
739        }
740
741        if let Some(prev) = self.emitted_results.get_mut(window_id) {
742            if prev.data != new_data {
743                let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
744                prev.emit_time = timestamp;
745                prev.version += 1;
746                self.retractions_generated += 1;
747                return Some((old_data, new_data.to_vec()));
748            }
749        } else {
750            self.emitted_results.insert(
751                *window_id,
752                EmittedResult {
753                    data: new_data.to_vec(),
754                    emit_time: timestamp,
755                    version: 1,
756                },
757            );
758            self.windows_tracked += 1;
759        }
760
761        None
762    }
763
764    /// Checks for retraction and returns borrowed slices (avoiding allocation
765    /// when no retraction is needed).
766    ///
767    /// Returns `Some(old_data)` if retraction is needed. The caller should
768    /// then emit the retraction for `old_data` and insert for `new_data`.
769    pub fn check_retraction_ref(
770        &mut self,
771        window_id: &WindowId,
772        new_data: &[u8],
773        timestamp: i64,
774    ) -> Option<Vec<u8>> {
775        if !self.enabled {
776            return None;
777        }
778
779        if let Some(prev) = self.emitted_results.get_mut(window_id) {
780            if prev.data != new_data {
781                let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
782                prev.emit_time = timestamp;
783                prev.version += 1;
784                self.retractions_generated += 1;
785                return Some(old_data);
786            }
787        } else {
788            self.emitted_results.insert(
789                *window_id,
790                EmittedResult {
791                    data: new_data.to_vec(),
792                    emit_time: timestamp,
793                    version: 1,
794                },
795            );
796            self.windows_tracked += 1;
797        }
798
799        None
800    }
801
802    /// Cleans up state for closed windows.
803    ///
804    /// Call this when a window is closed to prevent unbounded memory growth.
805    pub fn cleanup_window(&mut self, window_id: &WindowId) {
806        self.emitted_results.remove(window_id);
807    }
808
809    /// Cleans up state for windows that ended before the given watermark.
810    ///
811    /// This should be called periodically to bound memory usage.
812    pub fn cleanup_before_watermark(&mut self, watermark: i64) {
813        self.emitted_results
814            .retain(|window_id, _| window_id.end > watermark);
815    }
816
817    /// Returns the number of retractions generated.
818    #[must_use]
819    pub fn retractions_generated(&self) -> u64 {
820        self.retractions_generated
821    }
822
823    /// Returns the number of windows currently being tracked.
824    #[must_use]
825    pub fn windows_tracked(&self) -> usize {
826        self.emitted_results.len()
827    }
828
829    /// Resets all metrics.
830    pub fn reset_metrics(&mut self) {
831        self.retractions_generated = 0;
832        self.windows_tracked = 0;
833    }
834
835    /// Clears all tracked state.
836    pub fn clear(&mut self) {
837        self.emitted_results.clear();
838        self.reset_metrics();
839    }
840}
841
842impl Default for LateDataRetractionGenerator {
843    fn default() -> Self {
844        Self::new(true)
845    }
846}
847
848// CDC Envelope (Debezium-Compatible)
849
850/// Source metadata for CDC envelope.
851///
852/// Contains information about the origin of the change event.
853#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
854pub struct CdcSource {
855    /// Source name (e.g., "laminardb")
856    pub name: String,
857    /// Database/schema name
858    pub db: String,
859    /// Table/view name
860    pub table: String,
861    /// Sequence number for ordering
862    #[serde(default)]
863    pub sequence: u64,
864}
865
866impl CdcSource {
867    /// Creates a new CDC source.
868    #[must_use]
869    pub fn new(name: impl Into<String>, db: impl Into<String>, table: impl Into<String>) -> Self {
870        Self {
871            name: name.into(),
872            db: db.into(),
873            table: table.into(),
874            sequence: 0,
875        }
876    }
877
878    /// Creates a new CDC source with sequence number.
879    #[must_use]
880    pub fn with_sequence(
881        name: impl Into<String>,
882        db: impl Into<String>,
883        table: impl Into<String>,
884        sequence: u64,
885    ) -> Self {
886        Self {
887            name: name.into(),
888            db: db.into(),
889            table: table.into(),
890            sequence,
891        }
892    }
893
894    /// Increments and returns the sequence number.
895    pub fn next_sequence(&mut self) -> u64 {
896        self.sequence += 1;
897        self.sequence
898    }
899}
900
901/// CDC envelope for sink serialization.
902///
903/// Compatible with Debezium envelope format for interoperability with
904/// downstream systems (Kafka Connect, data lakes, etc.).
905///
906/// # Debezium Operation Codes
907///
908/// - `"c"`: Create (insert)
909/// - `"u"`: Update
910/// - `"d"`: Delete
911/// - `"r"`: Read (snapshot)
912///
913/// # Example
914///
915/// ```rust,no_run
916/// use laminar_core::operator::changelog::{CdcEnvelope, CdcSource};
917/// use serde_json::json;
918///
919/// let source = CdcSource::new("laminardb", "default", "orders");
920///
921/// // Insert
922/// let insert = CdcEnvelope::insert(json!({"id": 1, "amount": 100}), source.clone(), 1000);
923/// assert_eq!(insert.op, "c");
924///
925/// // Delete
926/// let delete = CdcEnvelope::delete(json!({"id": 1}), source.clone(), 2000);
927/// assert_eq!(delete.op, "d");
928///
929/// // Update
930/// let update = CdcEnvelope::update(
931///     json!({"id": 1, "amount": 100}),
932///     json!({"id": 1, "amount": 150}),
933///     source,
934///     3000,
935/// );
936/// assert_eq!(update.op, "u");
937/// ```
938#[derive(Debug, Clone, Serialize, Deserialize)]
939pub struct CdcEnvelope<T> {
940    /// Operation type: "c" (create), "u" (update), "d" (delete), "r" (read/snapshot)
941    pub op: String,
942    /// Timestamp in milliseconds since epoch
943    pub ts_ms: i64,
944    /// Source metadata
945    pub source: CdcSource,
946    /// Value before change (for updates/deletes)
947    #[serde(skip_serializing_if = "Option::is_none")]
948    pub before: Option<T>,
949    /// Value after change (for inserts/updates)
950    #[serde(skip_serializing_if = "Option::is_none")]
951    pub after: Option<T>,
952}
953
954impl<T> CdcEnvelope<T> {
955    /// Creates an insert (create) envelope.
956    #[must_use]
957    pub fn insert(after: T, source: CdcSource, ts_ms: i64) -> Self {
958        Self {
959            op: "c".to_string(),
960            ts_ms,
961            source,
962            before: None,
963            after: Some(after),
964        }
965    }
966
967    /// Creates a delete envelope.
968    #[must_use]
969    pub fn delete(before: T, source: CdcSource, ts_ms: i64) -> Self {
970        Self {
971            op: "d".to_string(),
972            ts_ms,
973            source,
974            before: Some(before),
975            after: None,
976        }
977    }
978
979    /// Creates an update envelope.
980    #[must_use]
981    pub fn update(before: T, after: T, source: CdcSource, ts_ms: i64) -> Self {
982        Self {
983            op: "u".to_string(),
984            ts_ms,
985            source,
986            before: Some(before),
987            after: Some(after),
988        }
989    }
990
991    /// Creates a read (snapshot) envelope.
992    #[must_use]
993    pub fn read(after: T, source: CdcSource, ts_ms: i64) -> Self {
994        Self {
995            op: "r".to_string(),
996            ts_ms,
997            source,
998            before: None,
999            after: Some(after),
1000        }
1001    }
1002
1003    /// Returns true if this is an insert operation.
1004    #[must_use]
1005    pub fn is_insert(&self) -> bool {
1006        self.op == "c"
1007    }
1008
1009    /// Returns true if this is a delete operation.
1010    #[must_use]
1011    pub fn is_delete(&self) -> bool {
1012        self.op == "d"
1013    }
1014
1015    /// Returns true if this is an update operation.
1016    #[must_use]
1017    pub fn is_update(&self) -> bool {
1018        self.op == "u"
1019    }
1020
1021    /// Returns the Z-set weight for this operation.
1022    ///
1023    /// - Insert/Read: +1
1024    /// - Delete: -1
1025    /// - Update: 0 (net effect of -1 for before + +1 for after)
1026    #[must_use]
1027    pub fn weight(&self) -> i32 {
1028        match self.op.as_str() {
1029            "c" | "r" => 1,
1030            "d" => -1,
1031            // "u" (update) and unknown operations have net weight of 0
1032            _ => 0,
1033        }
1034    }
1035}
1036
1037impl<T: Serialize> CdcEnvelope<T> {
1038    /// Serializes the envelope to JSON.
1039    ///
1040    /// # Errors
1041    ///
1042    /// Returns an error if serialization fails.
1043    pub fn to_json(&self) -> Result<String, serde_json::Error> {
1044        serde_json::to_string(self)
1045    }
1046
1047    /// Serializes the envelope to pretty-printed JSON.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if serialization fails.
1052    pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
1053        serde_json::to_string_pretty(self)
1054    }
1055
1056    /// Serializes the envelope to JSON bytes.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if serialization fails.
1061    pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
1062        serde_json::to_vec(self)
1063    }
1064}
1065
1066// Retractable FIRST/LAST Accumulators
1067
1068/// Retractable `FIRST_VALUE` accumulator for changelog/retraction mode.
1069///
1070/// Stores all `(timestamp, value)` entries sorted by timestamp ascending.
1071/// On retraction, removes the entry and recomputes the first value.
1072/// This is necessary for `EMIT CHANGES` with OHLC queries where the
1073/// open price may need to be retracted.
1074///
1075/// # Ring Architecture
1076///
1077/// This is a Ring 1 structure (allocates). Ring 0 uses the non-retractable
1078/// [`super::window::FirstValueAccumulator`] via the static dispatch path.
1079///
1080/// Uses `BTreeMap<i64, Vec<i64>>` (timestamp → values) for O(log n) insert/remove
1081/// and O(1) first-value access via `first_key_value()`.
1082#[derive(Debug, Clone, Default)]
1083pub struct RetractableFirstValueAccumulator {
1084    /// Timestamp to values at that timestamp (`BTreeMap` for O(log n) ops)
1085    entries: std::collections::BTreeMap<i64, Vec<i64>>,
1086    /// Total entry count across all timestamps
1087    total_count: usize,
1088}
1089
1090impl RetractableFirstValueAccumulator {
1091    /// Creates a new empty accumulator.
1092    #[must_use]
1093    pub fn new() -> Self {
1094        Self::default()
1095    }
1096
1097    /// Returns the number of stored entries.
1098    #[must_use]
1099    pub fn len(&self) -> usize {
1100        self.total_count
1101    }
1102
1103    /// Returns true if no entries are stored.
1104    #[must_use]
1105    pub fn is_empty(&self) -> bool {
1106        self.total_count == 0
1107    }
1108}
1109
1110impl RetractableAccumulator for RetractableFirstValueAccumulator {
1111    type Input = (i64, i64); // (timestamp, value)
1112    type Output = Option<i64>;
1113
1114    fn add(&mut self, (timestamp, value): (i64, i64)) {
1115        self.entries.entry(timestamp).or_default().push(value);
1116        self.total_count += 1;
1117    }
1118
1119    fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1120        if let Some(values) = self.entries.get_mut(timestamp) {
1121            if let Some(pos) = values.iter().position(|v| v == value) {
1122                values.swap_remove(pos);
1123                self.total_count -= 1;
1124                if values.is_empty() {
1125                    self.entries.remove(timestamp);
1126                }
1127            }
1128        }
1129    }
1130
1131    fn merge(&mut self, other: &Self) {
1132        for (&ts, vals) in &other.entries {
1133            self.entries.entry(ts).or_default().extend(vals);
1134        }
1135        self.total_count += other.total_count;
1136    }
1137
1138    fn result(&self) -> Option<i64> {
1139        self.entries
1140            .values()
1141            .next()
1142            .and_then(|vals| vals.first().copied())
1143    }
1144
1145    fn is_empty(&self) -> bool {
1146        self.total_count == 0
1147    }
1148
1149    fn supports_efficient_retraction(&self) -> bool {
1150        true
1151    }
1152
1153    fn reset(&mut self) {
1154        self.entries.clear();
1155        self.total_count = 0;
1156    }
1157}
1158
1159/// Retractable `LAST_VALUE` accumulator for changelog/retraction mode.
1160///
1161/// Uses `BTreeMap<i64, Vec<i64>>` (timestamp → values) for O(log n) insert/remove
1162/// and O(1) last-value access via `last_key_value()`.
1163#[derive(Debug, Clone, Default)]
1164pub struct RetractableLastValueAccumulator {
1165    /// Timestamp to values at that timestamp (`BTreeMap` for O(log n) ops)
1166    entries: std::collections::BTreeMap<i64, Vec<i64>>,
1167    /// Total entry count across all timestamps
1168    total_count: usize,
1169}
1170
1171impl RetractableLastValueAccumulator {
1172    /// Creates a new empty accumulator.
1173    #[must_use]
1174    pub fn new() -> Self {
1175        Self::default()
1176    }
1177
1178    /// Returns the number of stored entries.
1179    #[must_use]
1180    pub fn len(&self) -> usize {
1181        self.total_count
1182    }
1183
1184    /// Returns true if no entries are stored.
1185    #[must_use]
1186    pub fn is_empty(&self) -> bool {
1187        self.total_count == 0
1188    }
1189}
1190
1191impl RetractableAccumulator for RetractableLastValueAccumulator {
1192    type Input = (i64, i64); // (timestamp, value)
1193    type Output = Option<i64>;
1194
1195    fn add(&mut self, (timestamp, value): (i64, i64)) {
1196        self.entries.entry(timestamp).or_default().push(value);
1197        self.total_count += 1;
1198    }
1199
1200    fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1201        if let Some(values) = self.entries.get_mut(timestamp) {
1202            if let Some(pos) = values.iter().position(|v| v == value) {
1203                values.swap_remove(pos);
1204                self.total_count -= 1;
1205                if values.is_empty() {
1206                    self.entries.remove(timestamp);
1207                }
1208            }
1209        }
1210    }
1211
1212    fn merge(&mut self, other: &Self) {
1213        for (&ts, vals) in &other.entries {
1214            self.entries.entry(ts).or_default().extend(vals);
1215        }
1216        self.total_count += other.total_count;
1217    }
1218
1219    fn result(&self) -> Option<i64> {
1220        self.entries
1221            .values()
1222            .next_back()
1223            .and_then(|vals| vals.last().copied())
1224    }
1225
1226    fn is_empty(&self) -> bool {
1227        self.total_count == 0
1228    }
1229
1230    fn supports_efficient_retraction(&self) -> bool {
1231        true
1232    }
1233
1234    fn reset(&mut self) {
1235        self.entries.clear();
1236        self.total_count = 0;
1237    }
1238}
1239
1240/// Retractable `FIRST_VALUE` accumulator for f64 values.
1241///
1242/// Uses `f64::to_bits()` / `f64::from_bits()` for lossless i64 storage.
1243/// Uses `BTreeMap<i64, Vec<i64>>` (timestamp to `value_bits`) for O(log n) ops.
1244#[derive(Debug, Clone, Default)]
1245pub struct RetractableFirstValueF64Accumulator {
1246    /// Timestamp to `value_bits` at that timestamp
1247    entries: std::collections::BTreeMap<i64, Vec<i64>>,
1248    /// Total entry count across all timestamps
1249    total_count: usize,
1250}
1251
1252impl RetractableFirstValueF64Accumulator {
1253    /// Creates a new empty accumulator.
1254    #[must_use]
1255    pub fn new() -> Self {
1256        Self::default()
1257    }
1258
1259    /// Returns the number of stored entries.
1260    #[must_use]
1261    pub fn len(&self) -> usize {
1262        self.total_count
1263    }
1264
1265    /// Returns true if no entries are stored.
1266    #[must_use]
1267    pub fn is_empty(&self) -> bool {
1268        self.total_count == 0
1269    }
1270
1271    /// Returns the result as f64.
1272    #[must_use]
1273    #[allow(clippy::cast_sign_loss)]
1274    pub fn result_f64(&self) -> Option<f64> {
1275        self.entries
1276            .values()
1277            .next()
1278            .and_then(|vals| vals.first())
1279            .map(|bits| f64::from_bits(*bits as u64))
1280    }
1281}
1282
1283impl RetractableAccumulator for RetractableFirstValueF64Accumulator {
1284    type Input = (i64, f64); // (timestamp, value)
1285    type Output = Option<i64>; // value_bits for compatibility
1286
1287    #[allow(clippy::cast_possible_wrap)]
1288    fn add(&mut self, (timestamp, value): (i64, f64)) {
1289        let value_bits = value.to_bits() as i64;
1290        self.entries.entry(timestamp).or_default().push(value_bits);
1291        self.total_count += 1;
1292    }
1293
1294    fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1295        #[allow(clippy::cast_possible_wrap)]
1296        let value_bits = value.to_bits() as i64;
1297        if let Some(values) = self.entries.get_mut(timestamp) {
1298            if let Some(pos) = values.iter().position(|v| *v == value_bits) {
1299                values.swap_remove(pos);
1300                self.total_count -= 1;
1301                if values.is_empty() {
1302                    self.entries.remove(timestamp);
1303                }
1304            }
1305        }
1306    }
1307
1308    fn merge(&mut self, other: &Self) {
1309        for (&ts, vals) in &other.entries {
1310            self.entries.entry(ts).or_default().extend(vals);
1311        }
1312        self.total_count += other.total_count;
1313    }
1314
1315    fn result(&self) -> Option<i64> {
1316        self.entries
1317            .values()
1318            .next()
1319            .and_then(|vals| vals.first().copied())
1320    }
1321
1322    fn is_empty(&self) -> bool {
1323        self.total_count == 0
1324    }
1325
1326    fn supports_efficient_retraction(&self) -> bool {
1327        true
1328    }
1329
1330    fn reset(&mut self) {
1331        self.entries.clear();
1332        self.total_count = 0;
1333    }
1334}
1335
1336/// Retractable `LAST_VALUE` accumulator for f64 values.
1337///
1338/// Uses `f64::to_bits()` / `f64::from_bits()` for lossless i64 storage.
1339/// Uses `BTreeMap<i64, Vec<i64>>` (timestamp to `value_bits`) for O(log n) ops.
1340#[derive(Debug, Clone, Default)]
1341pub struct RetractableLastValueF64Accumulator {
1342    /// Timestamp to `value_bits` at that timestamp
1343    entries: std::collections::BTreeMap<i64, Vec<i64>>,
1344    /// Total entry count across all timestamps
1345    total_count: usize,
1346}
1347
1348impl RetractableLastValueF64Accumulator {
1349    /// Creates a new empty accumulator.
1350    #[must_use]
1351    pub fn new() -> Self {
1352        Self::default()
1353    }
1354
1355    /// Returns the number of stored entries.
1356    #[must_use]
1357    pub fn len(&self) -> usize {
1358        self.total_count
1359    }
1360
1361    /// Returns true if no entries are stored.
1362    #[must_use]
1363    pub fn is_empty(&self) -> bool {
1364        self.total_count == 0
1365    }
1366
1367    /// Returns the result as f64.
1368    #[must_use]
1369    #[allow(clippy::cast_sign_loss)]
1370    pub fn result_f64(&self) -> Option<f64> {
1371        self.entries
1372            .values()
1373            .next_back()
1374            .and_then(|vals| vals.last())
1375            .map(|bits| f64::from_bits(*bits as u64))
1376    }
1377}
1378
1379impl RetractableAccumulator for RetractableLastValueF64Accumulator {
1380    type Input = (i64, f64); // (timestamp, value)
1381    type Output = Option<i64>; // value_bits for compatibility
1382
1383    #[allow(clippy::cast_possible_wrap)]
1384    fn add(&mut self, (timestamp, value): (i64, f64)) {
1385        let value_bits = value.to_bits() as i64;
1386        self.entries.entry(timestamp).or_default().push(value_bits);
1387        self.total_count += 1;
1388    }
1389
1390    fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1391        #[allow(clippy::cast_possible_wrap)]
1392        let value_bits = value.to_bits() as i64;
1393        if let Some(values) = self.entries.get_mut(timestamp) {
1394            if let Some(pos) = values.iter().position(|v| *v == value_bits) {
1395                values.swap_remove(pos);
1396                self.total_count -= 1;
1397                if values.is_empty() {
1398                    self.entries.remove(timestamp);
1399                }
1400            }
1401        }
1402    }
1403
1404    fn merge(&mut self, other: &Self) {
1405        for (&ts, vals) in &other.entries {
1406            self.entries.entry(ts).or_default().extend(vals);
1407        }
1408        self.total_count += other.total_count;
1409    }
1410
1411    fn result(&self) -> Option<i64> {
1412        self.entries
1413            .values()
1414            .next_back()
1415            .and_then(|vals| vals.last().copied())
1416    }
1417
1418    fn is_empty(&self) -> bool {
1419        self.total_count == 0
1420    }
1421
1422    fn supports_efficient_retraction(&self) -> bool {
1423        true
1424    }
1425
1426    fn reset(&mut self) {
1427        self.entries.clear();
1428        self.total_count = 0;
1429    }
1430}
1431
1432// Tests
1433
1434#[cfg(test)]
1435mod tests {
1436    use super::*;
1437
1438    // ChangelogRef Tests
1439
1440    #[test]
1441    fn test_changelog_ref_insert() {
1442        let cr = ChangelogRef::insert(10, 5);
1443        assert_eq!(cr.batch_offset, 10);
1444        assert_eq!(cr.row_index, 5);
1445        assert_eq!(cr.weight, 1);
1446        assert_eq!(cr.operation(), CdcOperation::Insert);
1447        assert!(cr.is_insert());
1448        assert!(!cr.is_delete());
1449    }
1450
1451    #[test]
1452    fn test_changelog_ref_delete() {
1453        let cr = ChangelogRef::delete(20, 3);
1454        assert_eq!(cr.batch_offset, 20);
1455        assert_eq!(cr.row_index, 3);
1456        assert_eq!(cr.weight, -1);
1457        assert_eq!(cr.operation(), CdcOperation::Delete);
1458        assert!(!cr.is_insert());
1459        assert!(cr.is_delete());
1460    }
1461
1462    #[test]
1463    fn test_changelog_ref_update() {
1464        let before = ChangelogRef::update_before(5, 1);
1465        let after = ChangelogRef::update_after(5, 2);
1466
1467        assert_eq!(before.weight, -1);
1468        assert_eq!(after.weight, 1);
1469        assert_eq!(before.operation(), CdcOperation::UpdateBefore);
1470        assert_eq!(after.operation(), CdcOperation::UpdateAfter);
1471    }
1472
1473    #[test]
1474    fn test_changelog_ref_size() {
1475        // Verify compact size
1476        assert!(std::mem::size_of::<ChangelogRef>() <= 16);
1477    }
1478
1479    // ChangelogBuffer Tests
1480
1481    #[test]
1482    fn test_changelog_buffer_basic() {
1483        let mut buffer = ChangelogBuffer::with_capacity(10);
1484        assert!(buffer.is_empty());
1485        assert_eq!(buffer.capacity(), 10);
1486
1487        assert!(buffer.push(ChangelogRef::insert(0, 0)));
1488        assert!(buffer.push(ChangelogRef::delete(1, 0)));
1489
1490        assert_eq!(buffer.len(), 2);
1491        assert!(!buffer.is_empty());
1492    }
1493
1494    #[test]
1495    fn test_changelog_buffer_full() {
1496        let mut buffer = ChangelogBuffer::with_capacity(2);
1497
1498        assert!(buffer.push(ChangelogRef::insert(0, 0)));
1499        assert!(buffer.push(ChangelogRef::insert(1, 0)));
1500        assert!(!buffer.push(ChangelogRef::insert(2, 0))); // Full
1501
1502        assert!(buffer.is_full());
1503        assert_eq!(buffer.available(), 0);
1504    }
1505
1506    #[test]
1507    fn test_changelog_buffer_drain() {
1508        let mut buffer = ChangelogBuffer::with_capacity(10);
1509
1510        for i in 0..5 {
1511            buffer.push(ChangelogRef::insert(i, 0));
1512        }
1513
1514        let drained: Vec<_> = buffer.drain().collect();
1515        assert_eq!(drained.len(), 5);
1516        assert!(buffer.is_empty());
1517
1518        // Buffer can be reused
1519        for i in 0..3 {
1520            buffer.push(ChangelogRef::delete(i, 0));
1521        }
1522        assert_eq!(buffer.len(), 3);
1523    }
1524
1525    #[test]
1526    fn test_changelog_buffer_retraction() {
1527        let mut buffer = ChangelogBuffer::with_capacity(10);
1528
1529        assert!(buffer.push_retraction(0, 1, 2));
1530        assert_eq!(buffer.len(), 2);
1531
1532        let refs: Vec<_> = buffer.as_slice().to_vec();
1533        assert_eq!(refs[0].operation(), CdcOperation::UpdateBefore);
1534        assert_eq!(refs[0].row_index, 1);
1535        assert_eq!(refs[1].operation(), CdcOperation::UpdateAfter);
1536        assert_eq!(refs[1].row_index, 2);
1537    }
1538
1539    #[test]
1540    fn test_changelog_buffer_zero_alloc_reuse() {
1541        let mut buffer = ChangelogBuffer::with_capacity(100);
1542
1543        // First pass
1544        for i in 0..50 {
1545            buffer.push(ChangelogRef::insert(i, 0));
1546        }
1547        let _: Vec<_> = buffer.drain().collect();
1548
1549        // Second pass - should not allocate
1550        for i in 0..50 {
1551            buffer.push(ChangelogRef::insert(i, 0));
1552        }
1553
1554        assert_eq!(buffer.len(), 50);
1555    }
1556
1557    // Retractable Accumulator Tests
1558
1559    #[test]
1560    fn test_retractable_count() {
1561        let mut agg = RetractableCountAccumulator::default();
1562
1563        agg.add(());
1564        agg.add(());
1565        agg.add(());
1566        assert_eq!(agg.result(), 3);
1567
1568        agg.retract(&());
1569        assert_eq!(agg.result(), 2);
1570
1571        agg.retract(&());
1572        agg.retract(&());
1573        assert_eq!(agg.result(), 0);
1574    }
1575
1576    #[test]
1577    fn test_retractable_count_negative() {
1578        let mut agg = RetractableCountAccumulator::default();
1579
1580        agg.add(());
1581        agg.retract(&());
1582        agg.retract(&()); // Extra retraction
1583
1584        // Count can go negative (indicates an error in the pipeline)
1585        assert_eq!(agg.result(), -1);
1586    }
1587
1588    #[test]
1589    fn test_retractable_sum() {
1590        let mut agg = RetractableSumAccumulator::default();
1591
1592        agg.add(10);
1593        agg.add(20);
1594        agg.add(30);
1595        assert_eq!(agg.result(), 60);
1596
1597        agg.retract(&20);
1598        assert_eq!(agg.result(), 40);
1599
1600        agg.retract(&10);
1601        agg.retract(&30);
1602        assert_eq!(agg.result(), 0);
1603    }
1604
1605    #[test]
1606    fn test_retractable_sum_merge() {
1607        let mut agg1 = RetractableSumAccumulator::default();
1608        agg1.add(10);
1609        agg1.add(20);
1610
1611        let mut agg2 = RetractableSumAccumulator::default();
1612        agg2.add(30);
1613        agg2.retract(&5);
1614
1615        agg1.merge(&agg2);
1616        assert_eq!(agg1.result(), 55); // 10 + 20 + 30 - 5
1617    }
1618
1619    #[test]
1620    fn test_retractable_avg() {
1621        let mut agg = RetractableAvgAccumulator::default();
1622
1623        agg.add(10);
1624        agg.add(20);
1625        agg.add(30);
1626        let avg = agg.result().unwrap();
1627        assert!((avg - 20.0).abs() < f64::EPSILON);
1628
1629        agg.retract(&30);
1630        let avg = agg.result().unwrap();
1631        assert!((avg - 15.0).abs() < f64::EPSILON); // (10 + 20) / 2
1632    }
1633
1634    #[test]
1635    fn test_retractable_avg_empty() {
1636        let mut agg = RetractableAvgAccumulator::default();
1637        assert!(agg.result().is_none());
1638
1639        agg.add(10);
1640        agg.retract(&10);
1641        assert!(agg.result().is_none());
1642    }
1643
1644    #[test]
1645    fn test_retractable_min() {
1646        let mut agg = RetractableMinAccumulator::default();
1647
1648        agg.add(30);
1649        agg.add(10);
1650        agg.add(20);
1651        assert_eq!(agg.result(), Some(10));
1652
1653        // Retract the minimum
1654        agg.retract(&10);
1655        assert_eq!(agg.result(), Some(20));
1656
1657        // Retract a non-minimum
1658        agg.retract(&30);
1659        assert_eq!(agg.result(), Some(20));
1660
1661        agg.retract(&20);
1662        assert_eq!(agg.result(), None);
1663    }
1664
1665    #[test]
1666    fn test_retractable_max() {
1667        let mut agg = RetractableMaxAccumulator::default();
1668
1669        agg.add(10);
1670        agg.add(30);
1671        agg.add(20);
1672        assert_eq!(agg.result(), Some(30));
1673
1674        // Retract the maximum
1675        agg.retract(&30);
1676        assert_eq!(agg.result(), Some(20));
1677
1678        agg.retract(&20);
1679        agg.retract(&10);
1680        assert_eq!(agg.result(), None);
1681    }
1682
1683    #[test]
1684    fn test_retractable_efficiency_flags() {
1685        let count = RetractableCountAccumulator::default();
1686        let sum = RetractableSumAccumulator::default();
1687        let avg = RetractableAvgAccumulator::default();
1688        let min = RetractableMinAccumulator::default();
1689        let max = RetractableMaxAccumulator::default();
1690
1691        // Count, sum, avg have O(1) retraction
1692        assert!(count.supports_efficient_retraction());
1693        assert!(sum.supports_efficient_retraction());
1694        assert!(avg.supports_efficient_retraction());
1695
1696        // Min/max use BTreeMap — O(log n) retraction, no recomputation
1697        assert!(min.supports_efficient_retraction());
1698        assert!(max.supports_efficient_retraction());
1699    }
1700
1701    // LateDataRetractionGenerator Tests
1702
1703    #[test]
1704    fn test_late_data_retraction_first_emission() {
1705        let mut gen = LateDataRetractionGenerator::new(true);
1706        let window_id = WindowId::new(0, 60000);
1707
1708        // First emission - no retraction
1709        let result = gen.check_retraction(&window_id, b"count=5", 1000);
1710        assert!(result.is_none());
1711        assert_eq!(gen.windows_tracked(), 1);
1712    }
1713
1714    #[test]
1715    fn test_late_data_retraction_changed_result() {
1716        let mut gen = LateDataRetractionGenerator::new(true);
1717        let window_id = WindowId::new(0, 60000);
1718
1719        // First emission
1720        gen.check_retraction(&window_id, b"count=5", 1000);
1721
1722        // Late data causes different result - generates retraction
1723        let result = gen.check_retraction(&window_id, b"count=7", 2000);
1724        assert!(result.is_some());
1725
1726        let (old, new) = result.unwrap();
1727        assert_eq!(old, b"count=5");
1728        assert_eq!(new, b"count=7");
1729        assert_eq!(gen.retractions_generated(), 1);
1730    }
1731
1732    #[test]
1733    fn test_late_data_retraction_same_result() {
1734        let mut gen = LateDataRetractionGenerator::new(true);
1735        let window_id = WindowId::new(0, 60000);
1736
1737        // First emission
1738        gen.check_retraction(&window_id, b"count=5", 1000);
1739
1740        // Same result - no retraction
1741        let result = gen.check_retraction(&window_id, b"count=5", 2000);
1742        assert!(result.is_none());
1743        assert_eq!(gen.retractions_generated(), 0);
1744    }
1745
1746    #[test]
1747    fn test_late_data_retraction_disabled() {
1748        let mut gen = LateDataRetractionGenerator::new(false);
1749        let window_id = WindowId::new(0, 60000);
1750
1751        gen.check_retraction(&window_id, b"count=5", 1000);
1752        let result = gen.check_retraction(&window_id, b"count=7", 2000);
1753
1754        // No retraction when disabled
1755        assert!(result.is_none());
1756    }
1757
1758    #[test]
1759    fn test_late_data_cleanup() {
1760        let mut gen = LateDataRetractionGenerator::new(true);
1761
1762        let w1 = WindowId::new(0, 1000);
1763        let w2 = WindowId::new(1000, 2000);
1764
1765        gen.check_retraction(&w1, b"a", 100);
1766        gen.check_retraction(&w2, b"b", 200);
1767        assert_eq!(gen.windows_tracked(), 2);
1768
1769        gen.cleanup_window(&w1);
1770        assert_eq!(gen.windows_tracked(), 1);
1771
1772        gen.cleanup_before_watermark(2000);
1773        assert_eq!(gen.windows_tracked(), 0);
1774    }
1775
1776    // CdcEnvelope Tests
1777
1778    #[test]
1779    fn test_cdc_envelope_insert() {
1780        let source = CdcSource::new("laminardb", "default", "orders");
1781        let envelope = CdcEnvelope::insert(
1782            serde_json::json!({"id": 1, "amount": 100}),
1783            source,
1784            1_706_140_800_000,
1785        );
1786
1787        assert_eq!(envelope.op, "c");
1788        assert!(envelope.is_insert());
1789        assert!(envelope.before.is_none());
1790        assert!(envelope.after.is_some());
1791        assert_eq!(envelope.weight(), 1);
1792    }
1793
1794    #[test]
1795    fn test_cdc_envelope_delete() {
1796        let source = CdcSource::new("laminardb", "default", "orders");
1797        let envelope = CdcEnvelope::delete(serde_json::json!({"id": 1}), source, 1_706_140_800_000);
1798
1799        assert_eq!(envelope.op, "d");
1800        assert!(envelope.is_delete());
1801        assert!(envelope.before.is_some());
1802        assert!(envelope.after.is_none());
1803        assert_eq!(envelope.weight(), -1);
1804    }
1805
1806    #[test]
1807    fn test_cdc_envelope_update() {
1808        let source = CdcSource::new("laminardb", "default", "orders");
1809        let envelope = CdcEnvelope::update(
1810            serde_json::json!({"id": 1, "amount": 100}),
1811            serde_json::json!({"id": 1, "amount": 150}),
1812            source,
1813            1_706_140_800_000,
1814        );
1815
1816        assert_eq!(envelope.op, "u");
1817        assert!(envelope.is_update());
1818        assert!(envelope.before.is_some());
1819        assert!(envelope.after.is_some());
1820        assert_eq!(envelope.weight(), 0);
1821    }
1822
1823    #[test]
1824    fn test_cdc_envelope_json_serialization() {
1825        let source = CdcSource::new("laminardb", "default", "orders");
1826        let envelope = CdcEnvelope::insert(
1827            serde_json::json!({"id": 1, "amount": 100}),
1828            source,
1829            1_706_140_800_000,
1830        );
1831
1832        let json = envelope.to_json().unwrap();
1833        assert!(json.contains("\"op\":\"c\""));
1834        assert!(json.contains("\"after\""));
1835        assert!(!json.contains("\"before\""));
1836        assert!(json.contains("\"ts_ms\":1706140800000"));
1837    }
1838
1839    #[test]
1840    fn test_cdc_envelope_debezium_compatible() {
1841        let source = CdcSource::with_sequence("laminardb", "test_db", "users", 42);
1842        let envelope = CdcEnvelope::insert(
1843            serde_json::json!({"user_id": 123, "name": "Alice"}),
1844            source,
1845            1_706_140_800_000,
1846        );
1847
1848        let json = envelope.to_json().unwrap();
1849
1850        // Verify Debezium-compatible fields
1851        assert!(json.contains("\"op\":\"c\""));
1852        assert!(json.contains("\"source\""));
1853        assert!(json.contains("\"name\":\"laminardb\""));
1854        assert!(json.contains("\"db\":\"test_db\""));
1855        assert!(json.contains("\"table\":\"users\""));
1856        assert!(json.contains("\"sequence\":42"));
1857    }
1858
1859    #[test]
1860    fn test_cdc_source_sequence() {
1861        let mut source = CdcSource::new("laminardb", "db", "table");
1862        assert_eq!(source.sequence, 0);
1863
1864        assert_eq!(source.next_sequence(), 1);
1865        assert_eq!(source.next_sequence(), 2);
1866        assert_eq!(source.sequence, 2);
1867    }
1868
1869    // CdcOperation Tests
1870
1871    #[test]
1872    fn test_cdc_operation_roundtrip() {
1873        for op in [
1874            CdcOperation::Insert,
1875            CdcOperation::Delete,
1876            CdcOperation::UpdateBefore,
1877            CdcOperation::UpdateAfter,
1878        ] {
1879            let u8_val = op.to_u8();
1880            let restored = CdcOperation::from_u8(u8_val);
1881            assert_eq!(op, restored);
1882        }
1883    }
1884
1885    #[test]
1886    fn test_cdc_operation_unknown_u8() {
1887        // Unknown values default to Insert
1888        assert_eq!(CdcOperation::from_u8(255), CdcOperation::Insert);
1889    }
1890
1891    // ════════════════════════════════════════════════════════════════════════
1892    // Retractable FIRST/LAST Accumulator Tests
1893    // ════════════════════════════════════════════════════════════════════════
1894
1895    // ── RetractableFirstValueAccumulator ─────────────────────────────────────
1896
1897    #[test]
1898    fn test_retractable_first_value_basic() {
1899        let mut acc = RetractableFirstValueAccumulator::new();
1900        assert!(acc.is_empty());
1901        assert_eq!(acc.result(), None);
1902
1903        // Add entries out of order
1904        acc.add((200, 20));
1905        acc.add((100, 10));
1906        acc.add((300, 30));
1907
1908        assert!(!acc.is_empty());
1909        assert_eq!(acc.len(), 3);
1910        // First value = earliest timestamp (100) → value 10
1911        assert_eq!(acc.result(), Some(10));
1912    }
1913
1914    #[test]
1915    fn test_retractable_first_value_retract_non_first() {
1916        let mut acc = RetractableFirstValueAccumulator::new();
1917        acc.add((100, 10));
1918        acc.add((200, 20));
1919        acc.add((300, 30));
1920
1921        // Retract a non-first entry → first value unchanged
1922        acc.retract(&(200, 20));
1923        assert_eq!(acc.len(), 2);
1924        assert_eq!(acc.result(), Some(10));
1925    }
1926
1927    #[test]
1928    fn test_retractable_first_value_retract_first() {
1929        let mut acc = RetractableFirstValueAccumulator::new();
1930        acc.add((100, 10));
1931        acc.add((200, 20));
1932        acc.add((300, 30));
1933
1934        // Retract the first entry → next earliest becomes first
1935        acc.retract(&(100, 10));
1936        assert_eq!(acc.len(), 2);
1937        assert_eq!(acc.result(), Some(20)); // ts=200
1938    }
1939
1940    #[test]
1941    fn test_retractable_first_value_retract_all() {
1942        let mut acc = RetractableFirstValueAccumulator::new();
1943        acc.add((100, 10));
1944        acc.add((200, 20));
1945
1946        acc.retract(&(100, 10));
1947        acc.retract(&(200, 20));
1948        assert!(acc.is_empty());
1949        assert_eq!(acc.result(), None);
1950    }
1951
1952    #[test]
1953    fn test_retractable_first_value_retract_nonexistent() {
1954        let mut acc = RetractableFirstValueAccumulator::new();
1955        acc.add((100, 10));
1956
1957        // Retract something that doesn't exist → no effect
1958        acc.retract(&(999, 99));
1959        assert_eq!(acc.len(), 1);
1960        assert_eq!(acc.result(), Some(10));
1961    }
1962
1963    #[test]
1964    fn test_retractable_first_value_duplicate_timestamps() {
1965        let mut acc = RetractableFirstValueAccumulator::new();
1966        acc.add((100, 10));
1967        acc.add((100, 20)); // Same timestamp, different value
1968
1969        assert_eq!(acc.len(), 2);
1970        // First at timestamp 100, first inserted value
1971        assert_eq!(acc.result(), Some(10));
1972
1973        // Retract one → other remains
1974        acc.retract(&(100, 10));
1975        assert_eq!(acc.result(), Some(20));
1976    }
1977
1978    // ── RetractableLastValueAccumulator ──────────────────────────────────────
1979
1980    #[test]
1981    fn test_retractable_last_value_basic() {
1982        let mut acc = RetractableLastValueAccumulator::new();
1983        assert!(acc.is_empty());
1984        assert_eq!(acc.result(), None);
1985
1986        acc.add((100, 10));
1987        acc.add((300, 30));
1988        acc.add((200, 20));
1989
1990        assert_eq!(acc.len(), 3);
1991        // Last value = latest timestamp (300) → value 30
1992        assert_eq!(acc.result(), Some(30));
1993    }
1994
1995    #[test]
1996    fn test_retractable_last_value_retract_non_last() {
1997        let mut acc = RetractableLastValueAccumulator::new();
1998        acc.add((100, 10));
1999        acc.add((200, 20));
2000        acc.add((300, 30));
2001
2002        // Retract a non-last entry → last value unchanged
2003        acc.retract(&(200, 20));
2004        assert_eq!(acc.result(), Some(30));
2005    }
2006
2007    #[test]
2008    fn test_retractable_last_value_retract_last() {
2009        let mut acc = RetractableLastValueAccumulator::new();
2010        acc.add((100, 10));
2011        acc.add((200, 20));
2012        acc.add((300, 30));
2013
2014        // Retract the last entry → next latest becomes last
2015        acc.retract(&(300, 30));
2016        assert_eq!(acc.result(), Some(20)); // ts=200
2017    }
2018
2019    #[test]
2020    fn test_retractable_last_value_retract_all() {
2021        let mut acc = RetractableLastValueAccumulator::new();
2022        acc.add((100, 10));
2023        acc.retract(&(100, 10));
2024        assert!(acc.is_empty());
2025        assert_eq!(acc.result(), None);
2026    }
2027
2028    // ── Merge tests ─────────────────────────────────────────────────────────
2029
2030    #[test]
2031    fn test_retractable_first_value_merge() {
2032        let mut acc1 = RetractableFirstValueAccumulator::new();
2033        let mut acc2 = RetractableFirstValueAccumulator::new();
2034
2035        acc1.add((200, 20));
2036        acc1.add((400, 40));
2037        acc2.add((100, 10));
2038        acc2.add((300, 30));
2039
2040        acc1.merge(&acc2);
2041        assert_eq!(acc1.len(), 4);
2042        // Merged: sorted by timestamp, first = (100, 10)
2043        assert_eq!(acc1.result(), Some(10));
2044    }
2045
2046    #[test]
2047    fn test_retractable_last_value_merge() {
2048        let mut acc1 = RetractableLastValueAccumulator::new();
2049        let mut acc2 = RetractableLastValueAccumulator::new();
2050
2051        acc1.add((100, 10));
2052        acc1.add((300, 30));
2053        acc2.add((200, 20));
2054        acc2.add((400, 40));
2055
2056        acc1.merge(&acc2);
2057        assert_eq!(acc1.len(), 4);
2058        // Last = (400, 40)
2059        assert_eq!(acc1.result(), Some(40));
2060    }
2061
2062    #[test]
2063    fn test_retractable_first_value_merge_empty() {
2064        let mut acc1 = RetractableFirstValueAccumulator::new();
2065        let acc2 = RetractableFirstValueAccumulator::new();
2066
2067        acc1.add((100, 10));
2068        acc1.merge(&acc2); // Merge empty into non-empty
2069        assert_eq!(acc1.result(), Some(10));
2070
2071        let mut acc3 = RetractableFirstValueAccumulator::new();
2072        let acc4 = RetractableFirstValueAccumulator::new();
2073        acc3.merge(&acc4); // Merge empty into empty
2074        assert!(acc3.is_empty());
2075    }
2076
2077    // ── Reset/clear tests ───────────────────────────────────────────────────
2078
2079    #[test]
2080    fn test_retractable_first_value_reset() {
2081        let mut acc = RetractableFirstValueAccumulator::new();
2082        acc.add((100, 10));
2083        acc.add((200, 20));
2084        assert!(!acc.is_empty());
2085
2086        acc.reset();
2087        assert!(acc.is_empty());
2088        assert_eq!(acc.result(), None);
2089    }
2090
2091    #[test]
2092    fn test_retractable_last_value_reset() {
2093        let mut acc = RetractableLastValueAccumulator::new();
2094        acc.add((100, 10));
2095        acc.reset();
2096        assert!(acc.is_empty());
2097    }
2098
2099    // ── f64 variant tests ───────────────────────────────────────────────────
2100
2101    #[test]
2102    fn test_retractable_first_value_f64_basic() {
2103        let mut acc = RetractableFirstValueF64Accumulator::new();
2104        acc.add((200, 20.5));
2105        acc.add((100, 10.5));
2106        acc.add((300, 30.5));
2107
2108        assert_eq!(acc.len(), 3);
2109        // First = earliest timestamp (100) → value 10.5
2110        let result = acc.result_f64().unwrap();
2111        assert!((result - 10.5).abs() < f64::EPSILON);
2112    }
2113
2114    #[test]
2115    fn test_retractable_first_value_f64_retract() {
2116        let mut acc = RetractableFirstValueF64Accumulator::new();
2117        acc.add((100, 10.5));
2118        acc.add((200, 20.5));
2119
2120        // Retract first → next becomes first
2121        acc.retract(&(100, 10.5));
2122        let result = acc.result_f64().unwrap();
2123        assert!((result - 20.5).abs() < f64::EPSILON);
2124    }
2125
2126    #[test]
2127    fn test_retractable_last_value_f64_basic() {
2128        let mut acc = RetractableLastValueF64Accumulator::new();
2129        acc.add((100, 10.5));
2130        acc.add((300, 30.5));
2131        acc.add((200, 20.5));
2132
2133        let result = acc.result_f64().unwrap();
2134        assert!((result - 30.5).abs() < f64::EPSILON);
2135    }
2136
2137    #[test]
2138    fn test_retractable_last_value_f64_retract() {
2139        let mut acc = RetractableLastValueF64Accumulator::new();
2140        acc.add((100, 10.5));
2141        acc.add((200, 20.5));
2142        acc.add((300, 30.5));
2143
2144        acc.retract(&(300, 30.5));
2145        let result = acc.result_f64().unwrap();
2146        assert!((result - 20.5).abs() < f64::EPSILON);
2147    }
2148
2149    #[test]
2150    fn test_retractable_first_value_f64_merge() {
2151        let mut acc1 = RetractableFirstValueF64Accumulator::new();
2152        let mut acc2 = RetractableFirstValueF64Accumulator::new();
2153        acc1.add((200, 20.5));
2154        acc2.add((100, 10.5));
2155        acc1.merge(&acc2);
2156        let result = acc1.result_f64().unwrap();
2157        assert!((result - 10.5).abs() < f64::EPSILON);
2158    }
2159
2160    #[test]
2161    fn test_retractable_last_value_f64_merge() {
2162        let mut acc1 = RetractableLastValueF64Accumulator::new();
2163        let mut acc2 = RetractableLastValueF64Accumulator::new();
2164        acc1.add((100, 10.5));
2165        acc2.add((300, 30.5));
2166        acc1.merge(&acc2);
2167        let result = acc1.result_f64().unwrap();
2168        assert!((result - 30.5).abs() < f64::EPSILON);
2169    }
2170
2171    // ── Edge cases ──────────────────────────────────────────────────────────
2172
2173    #[test]
2174    fn test_retractable_first_value_single_entry() {
2175        let mut acc = RetractableFirstValueAccumulator::new();
2176        acc.add((100, 42));
2177        assert_eq!(acc.result(), Some(42));
2178        acc.retract(&(100, 42));
2179        assert_eq!(acc.result(), None);
2180    }
2181
2182    #[test]
2183    fn test_retractable_last_value_single_entry() {
2184        let mut acc = RetractableLastValueAccumulator::new();
2185        acc.add((100, 42));
2186        assert_eq!(acc.result(), Some(42));
2187        acc.retract(&(100, 42));
2188        assert_eq!(acc.result(), None);
2189    }
2190
2191    #[test]
2192    fn test_retractable_first_value_negative_values() {
2193        let mut acc = RetractableFirstValueAccumulator::new();
2194        acc.add((100, -10));
2195        acc.add((200, -20));
2196        assert_eq!(acc.result(), Some(-10));
2197    }
2198
2199    #[test]
2200    fn test_retractable_supports_efficient_retraction() {
2201        let acc = RetractableFirstValueAccumulator::new();
2202        assert!(acc.supports_efficient_retraction());
2203
2204        let acc2 = RetractableLastValueAccumulator::new();
2205        assert!(acc2.supports_efficient_retraction());
2206
2207        let acc3 = RetractableFirstValueF64Accumulator::new();
2208        assert!(acc3.supports_efficient_retraction());
2209
2210        let acc4 = RetractableLastValueF64Accumulator::new();
2211        assert!(acc4.supports_efficient_retraction());
2212    }
2213
2214    // ── OHLC retraction simulation ──────────────────────────────────────────
2215
2216    #[test]
2217    fn test_ohlc_retraction_simulation() {
2218        // Simulate an OHLC window where trades arrive out of order
2219        // and one needs to be retracted
2220        let mut open_acc = RetractableFirstValueAccumulator::new();
2221        let mut close_acc = RetractableLastValueAccumulator::new();
2222
2223        // Trade 1: price=100 at t=1000
2224        open_acc.add((1000, 100));
2225        close_acc.add((1000, 100));
2226
2227        // Trade 2: price=105 at t=2000
2228        open_acc.add((2000, 105));
2229        close_acc.add((2000, 105));
2230
2231        // Trade 3: price=98 at t=3000
2232        open_acc.add((3000, 98));
2233        close_acc.add((3000, 98));
2234
2235        assert_eq!(open_acc.result(), Some(100)); // Open = earliest
2236        assert_eq!(close_acc.result(), Some(98)); // Close = latest
2237
2238        // Retract trade 1 (correction: it was a bad trade)
2239        open_acc.retract(&(1000, 100));
2240        close_acc.retract(&(1000, 100));
2241
2242        // Open now = trade 2 (earliest remaining)
2243        assert_eq!(open_acc.result(), Some(105));
2244        // Close still = trade 3 (latest)
2245        assert_eq!(close_acc.result(), Some(98));
2246    }
2247
2248    #[test]
2249    fn test_ohlc_retraction_f64_simulation() {
2250        let mut open_acc = RetractableFirstValueF64Accumulator::new();
2251        let mut close_acc = RetractableLastValueF64Accumulator::new();
2252
2253        open_acc.add((1000, 100.50));
2254        close_acc.add((1000, 100.50));
2255        open_acc.add((2000, 105.25));
2256        close_acc.add((2000, 105.25));
2257        open_acc.add((3000, 98.75));
2258        close_acc.add((3000, 98.75));
2259
2260        let open = open_acc.result_f64().unwrap();
2261        let close = close_acc.result_f64().unwrap();
2262        assert!((open - 100.50).abs() < f64::EPSILON);
2263        assert!((close - 98.75).abs() < f64::EPSILON);
2264
2265        // Retract trade at t=1000
2266        open_acc.retract(&(1000, 100.50));
2267        close_acc.retract(&(1000, 100.50));
2268
2269        let open2 = open_acc.result_f64().unwrap();
2270        assert!((open2 - 105.25).abs() < f64::EPSILON);
2271    }
2272}