1use super::window::{CdcOperation, WindowId};
47use rustc_hash::FxHashMap;
48use serde::{Deserialize, Serialize};
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59#[repr(C)]
60pub struct ChangelogRef {
61 pub batch_offset: u32,
63 pub row_index: u32,
65 pub weight: i16,
67 operation_raw: u8,
69}
70
71impl ChangelogRef {
72 #[inline]
74 #[must_use]
75 pub fn new(batch_offset: u32, row_index: u32, weight: i16, operation: CdcOperation) -> Self {
76 Self {
77 batch_offset,
78 row_index,
79 weight,
80 operation_raw: operation.to_u8(),
81 }
82 }
83
84 #[inline]
86 #[must_use]
87 pub fn insert(batch_offset: u32, row_index: u32) -> Self {
88 Self::new(batch_offset, row_index, 1, CdcOperation::Insert)
89 }
90
91 #[inline]
93 #[must_use]
94 pub fn delete(batch_offset: u32, row_index: u32) -> Self {
95 Self::new(batch_offset, row_index, -1, CdcOperation::Delete)
96 }
97
98 #[inline]
100 #[must_use]
101 pub fn update_before(batch_offset: u32, row_index: u32) -> Self {
102 Self::new(batch_offset, row_index, -1, CdcOperation::UpdateBefore)
103 }
104
105 #[inline]
107 #[must_use]
108 pub fn update_after(batch_offset: u32, row_index: u32) -> Self {
109 Self::new(batch_offset, row_index, 1, CdcOperation::UpdateAfter)
110 }
111
112 #[inline]
114 #[must_use]
115 pub fn operation(&self) -> CdcOperation {
116 CdcOperation::from_u8(self.operation_raw)
117 }
118
119 #[inline]
121 #[must_use]
122 pub fn is_insert(&self) -> bool {
123 self.weight > 0
124 }
125
126 #[inline]
128 #[must_use]
129 pub fn is_delete(&self) -> bool {
130 self.weight < 0
131 }
132}
133
134pub struct ChangelogBuffer {
157 refs: Vec<ChangelogRef>,
159 len: usize,
161 capacity: usize,
163}
164
165impl ChangelogBuffer {
166 #[must_use]
168 pub fn with_capacity(capacity: usize) -> Self {
169 let mut refs = Vec::with_capacity(capacity);
170 refs.resize(
172 capacity,
173 ChangelogRef {
174 batch_offset: 0,
175 row_index: 0,
176 weight: 0,
177 operation_raw: 0,
178 },
179 );
180 Self {
181 refs,
182 len: 0,
183 capacity,
184 }
185 }
186
187 #[inline]
192 pub fn push(&mut self, changelog_ref: ChangelogRef) -> bool {
193 if self.len < self.capacity {
194 self.refs[self.len] = changelog_ref;
195 self.len += 1;
196 true
197 } else {
198 false }
200 }
201
202 #[inline]
206 pub fn push_retraction(
207 &mut self,
208 batch_offset: u32,
209 old_row_index: u32,
210 new_row_index: u32,
211 ) -> bool {
212 if self.len + 2 <= self.capacity {
213 self.refs[self.len] = ChangelogRef::update_before(batch_offset, old_row_index);
214 self.refs[self.len + 1] = ChangelogRef::update_after(batch_offset, new_row_index);
215 self.len += 2;
216 true
217 } else {
218 false
219 }
220 }
221
222 pub fn drain(&mut self) -> impl Iterator<Item = ChangelogRef> + '_ {
226 let len = self.len;
227 self.len = 0;
228 self.refs[..len].iter().copied()
229 }
230
231 #[inline]
233 #[must_use]
234 pub fn len(&self) -> usize {
235 self.len
236 }
237
238 #[inline]
240 #[must_use]
241 pub fn is_empty(&self) -> bool {
242 self.len == 0
243 }
244
245 #[inline]
247 #[must_use]
248 pub fn is_full(&self) -> bool {
249 self.len >= self.capacity
250 }
251
252 #[inline]
254 #[must_use]
255 pub fn capacity(&self) -> usize {
256 self.capacity
257 }
258
259 #[inline]
261 #[must_use]
262 pub fn available(&self) -> usize {
263 self.capacity.saturating_sub(self.len)
264 }
265
266 #[inline]
268 pub fn clear(&mut self) {
269 self.len = 0;
270 }
271
272 #[must_use]
274 pub fn as_slice(&self) -> &[ChangelogRef] {
275 &self.refs[..self.len]
276 }
277}
278
279impl Default for ChangelogBuffer {
280 fn default() -> Self {
281 Self::with_capacity(1024)
282 }
283}
284
285pub trait RetractableAccumulator: Default + Clone + Send {
300 type Input;
302 type Output;
304
305 fn add(&mut self, value: Self::Input);
307
308 fn retract(&mut self, value: &Self::Input);
315
316 fn merge(&mut self, other: &Self);
318
319 fn result(&self) -> Self::Output;
321
322 fn is_empty(&self) -> bool;
324
325 fn supports_efficient_retraction(&self) -> bool {
331 true
332 }
333
334 fn reset(&mut self);
336}
337
338#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
343pub struct RetractableCountAccumulator {
344 count: i64,
346}
347
348impl RetractableCountAccumulator {
349 #[must_use]
351 pub fn new() -> Self {
352 Self::default()
353 }
354
355 #[must_use]
357 pub fn count(&self) -> i64 {
358 self.count
359 }
360}
361
362impl RetractableAccumulator for RetractableCountAccumulator {
363 type Input = ();
364 type Output = i64;
365
366 #[inline]
367 fn add(&mut self, _value: ()) {
368 self.count += 1;
369 }
370
371 #[inline]
372 fn retract(&mut self, _value: &()) {
373 self.count -= 1;
374 }
375
376 fn merge(&mut self, other: &Self) {
377 self.count += other.count;
378 }
379
380 fn result(&self) -> i64 {
381 self.count
382 }
383
384 fn is_empty(&self) -> bool {
385 self.count == 0
386 }
387
388 fn reset(&mut self) {
389 self.count = 0;
390 }
391}
392
393#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
397pub struct RetractableSumAccumulator {
398 sum: i64,
400 count: i64,
402}
403
404impl RetractableSumAccumulator {
405 #[must_use]
407 pub fn new() -> Self {
408 Self::default()
409 }
410
411 #[must_use]
413 pub fn sum(&self) -> i64 {
414 self.sum
415 }
416}
417
418impl RetractableAccumulator for RetractableSumAccumulator {
419 type Input = i64;
420 type Output = i64;
421
422 #[inline]
423 fn add(&mut self, value: i64) {
424 self.sum += value;
425 self.count += 1;
426 }
427
428 #[inline]
429 fn retract(&mut self, value: &i64) {
430 self.sum -= value;
431 self.count -= 1;
432 }
433
434 fn merge(&mut self, other: &Self) {
435 self.sum += other.sum;
436 self.count += other.count;
437 }
438
439 fn result(&self) -> i64 {
440 self.sum
441 }
442
443 fn is_empty(&self) -> bool {
444 self.count == 0
445 }
446
447 fn reset(&mut self) {
448 self.sum = 0;
449 self.count = 0;
450 }
451}
452
453#[derive(Debug, Clone, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
457pub struct RetractableAvgAccumulator {
458 sum: i64,
460 count: i64,
462}
463
464impl RetractableAvgAccumulator {
465 #[must_use]
467 pub fn new() -> Self {
468 Self::default()
469 }
470
471 #[must_use]
473 pub fn sum(&self) -> i64 {
474 self.sum
475 }
476
477 #[must_use]
479 pub fn count(&self) -> i64 {
480 self.count
481 }
482}
483
484impl RetractableAccumulator for RetractableAvgAccumulator {
485 type Input = i64;
486 type Output = Option<f64>;
487
488 #[inline]
489 fn add(&mut self, value: i64) {
490 self.sum += value;
491 self.count += 1;
492 }
493
494 #[inline]
495 fn retract(&mut self, value: &i64) {
496 self.sum -= value;
497 self.count -= 1;
498 }
499
500 fn merge(&mut self, other: &Self) {
501 self.sum += other.sum;
502 self.count += other.count;
503 }
504
505 #[allow(clippy::cast_precision_loss)]
506 fn result(&self) -> Option<f64> {
507 if self.count > 0 {
508 Some(self.sum as f64 / self.count as f64)
509 } else {
510 None
511 }
512 }
513
514 fn is_empty(&self) -> bool {
515 self.count == 0
516 }
517
518 fn reset(&mut self) {
519 self.sum = 0;
520 self.count = 0;
521 }
522}
523
524#[derive(Debug, Clone, Default)]
530pub struct RetractableMinAccumulator {
531 counts: std::collections::BTreeMap<i64, usize>,
533}
534
535impl RetractableMinAccumulator {
536 #[must_use]
538 pub fn new() -> Self {
539 Self::default()
540 }
541}
542
543impl RetractableAccumulator for RetractableMinAccumulator {
544 type Input = i64;
545 type Output = Option<i64>;
546
547 fn add(&mut self, value: i64) {
548 *self.counts.entry(value).or_insert(0) += 1;
549 }
550
551 fn retract(&mut self, value: &i64) {
552 if let Some(count) = self.counts.get_mut(value) {
553 *count -= 1;
554 if *count == 0 {
555 self.counts.remove(value);
556 }
557 }
558 }
559
560 fn merge(&mut self, other: &Self) {
561 for (&val, &cnt) in &other.counts {
562 *self.counts.entry(val).or_insert(0) += cnt;
563 }
564 }
565
566 fn result(&self) -> Option<i64> {
567 self.counts.keys().next().copied()
568 }
569
570 fn is_empty(&self) -> bool {
571 self.counts.is_empty()
572 }
573
574 fn supports_efficient_retraction(&self) -> bool {
575 true
576 }
577
578 fn reset(&mut self) {
579 self.counts.clear();
580 }
581}
582
583#[derive(Debug, Clone, Default)]
589pub struct RetractableMaxAccumulator {
590 counts: std::collections::BTreeMap<i64, usize>,
592}
593
594impl RetractableMaxAccumulator {
595 #[must_use]
597 pub fn new() -> Self {
598 Self::default()
599 }
600}
601
602impl RetractableAccumulator for RetractableMaxAccumulator {
603 type Input = i64;
604 type Output = Option<i64>;
605
606 fn add(&mut self, value: i64) {
607 *self.counts.entry(value).or_insert(0) += 1;
608 }
609
610 fn retract(&mut self, value: &i64) {
611 if let Some(count) = self.counts.get_mut(value) {
612 *count -= 1;
613 if *count == 0 {
614 self.counts.remove(value);
615 }
616 }
617 }
618
619 fn merge(&mut self, other: &Self) {
620 for (&val, &cnt) in &other.counts {
621 *self.counts.entry(val).or_insert(0) += cnt;
622 }
623 }
624
625 fn result(&self) -> Option<i64> {
626 self.counts.keys().next_back().copied()
627 }
628
629 fn is_empty(&self) -> bool {
630 self.counts.is_empty()
631 }
632
633 fn supports_efficient_retraction(&self) -> bool {
634 true
635 }
636
637 fn reset(&mut self) {
638 self.counts.clear();
639 }
640}
641
642#[derive(Debug, Clone)]
646struct EmittedResult {
647 data: Vec<u8>,
649 emit_time: i64,
651 version: u32,
653}
654
655pub struct LateDataRetractionGenerator {
687 emitted_results: FxHashMap<WindowId, EmittedResult>,
689 enabled: bool,
691 retractions_generated: u64,
693 windows_tracked: u64,
695}
696
697impl LateDataRetractionGenerator {
698 #[must_use]
700 pub fn new(enabled: bool) -> Self {
701 Self {
702 emitted_results: FxHashMap::default(),
703 enabled,
704 retractions_generated: 0,
705 windows_tracked: 0,
706 }
707 }
708
709 #[must_use]
711 pub fn disabled() -> Self {
712 Self::new(false)
713 }
714
715 #[must_use]
717 pub fn is_enabled(&self) -> bool {
718 self.enabled
719 }
720
721 pub fn set_enabled(&mut self, enabled: bool) {
723 self.enabled = enabled;
724 }
725
726 pub fn check_retraction(
732 &mut self,
733 window_id: &WindowId,
734 new_data: &[u8],
735 timestamp: i64,
736 ) -> Option<(Vec<u8>, Vec<u8>)> {
737 if !self.enabled {
738 return None;
739 }
740
741 if let Some(prev) = self.emitted_results.get_mut(window_id) {
742 if prev.data != new_data {
743 let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
744 prev.emit_time = timestamp;
745 prev.version += 1;
746 self.retractions_generated += 1;
747 return Some((old_data, new_data.to_vec()));
748 }
749 } else {
750 self.emitted_results.insert(
751 *window_id,
752 EmittedResult {
753 data: new_data.to_vec(),
754 emit_time: timestamp,
755 version: 1,
756 },
757 );
758 self.windows_tracked += 1;
759 }
760
761 None
762 }
763
764 pub fn check_retraction_ref(
770 &mut self,
771 window_id: &WindowId,
772 new_data: &[u8],
773 timestamp: i64,
774 ) -> Option<Vec<u8>> {
775 if !self.enabled {
776 return None;
777 }
778
779 if let Some(prev) = self.emitted_results.get_mut(window_id) {
780 if prev.data != new_data {
781 let old_data = std::mem::replace(&mut prev.data, new_data.to_vec());
782 prev.emit_time = timestamp;
783 prev.version += 1;
784 self.retractions_generated += 1;
785 return Some(old_data);
786 }
787 } else {
788 self.emitted_results.insert(
789 *window_id,
790 EmittedResult {
791 data: new_data.to_vec(),
792 emit_time: timestamp,
793 version: 1,
794 },
795 );
796 self.windows_tracked += 1;
797 }
798
799 None
800 }
801
802 pub fn cleanup_window(&mut self, window_id: &WindowId) {
806 self.emitted_results.remove(window_id);
807 }
808
809 pub fn cleanup_before_watermark(&mut self, watermark: i64) {
813 self.emitted_results
814 .retain(|window_id, _| window_id.end > watermark);
815 }
816
817 #[must_use]
819 pub fn retractions_generated(&self) -> u64 {
820 self.retractions_generated
821 }
822
823 #[must_use]
825 pub fn windows_tracked(&self) -> usize {
826 self.emitted_results.len()
827 }
828
829 pub fn reset_metrics(&mut self) {
831 self.retractions_generated = 0;
832 self.windows_tracked = 0;
833 }
834
835 pub fn clear(&mut self) {
837 self.emitted_results.clear();
838 self.reset_metrics();
839 }
840}
841
842impl Default for LateDataRetractionGenerator {
843 fn default() -> Self {
844 Self::new(true)
845 }
846}
847
848#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
854pub struct CdcSource {
855 pub name: String,
857 pub db: String,
859 pub table: String,
861 #[serde(default)]
863 pub sequence: u64,
864}
865
866impl CdcSource {
867 #[must_use]
869 pub fn new(name: impl Into<String>, db: impl Into<String>, table: impl Into<String>) -> Self {
870 Self {
871 name: name.into(),
872 db: db.into(),
873 table: table.into(),
874 sequence: 0,
875 }
876 }
877
878 #[must_use]
880 pub fn with_sequence(
881 name: impl Into<String>,
882 db: impl Into<String>,
883 table: impl Into<String>,
884 sequence: u64,
885 ) -> Self {
886 Self {
887 name: name.into(),
888 db: db.into(),
889 table: table.into(),
890 sequence,
891 }
892 }
893
894 pub fn next_sequence(&mut self) -> u64 {
896 self.sequence += 1;
897 self.sequence
898 }
899}
900
901#[derive(Debug, Clone, Serialize, Deserialize)]
939pub struct CdcEnvelope<T> {
940 pub op: String,
942 pub ts_ms: i64,
944 pub source: CdcSource,
946 #[serde(skip_serializing_if = "Option::is_none")]
948 pub before: Option<T>,
949 #[serde(skip_serializing_if = "Option::is_none")]
951 pub after: Option<T>,
952}
953
954impl<T> CdcEnvelope<T> {
955 #[must_use]
957 pub fn insert(after: T, source: CdcSource, ts_ms: i64) -> Self {
958 Self {
959 op: "c".to_string(),
960 ts_ms,
961 source,
962 before: None,
963 after: Some(after),
964 }
965 }
966
967 #[must_use]
969 pub fn delete(before: T, source: CdcSource, ts_ms: i64) -> Self {
970 Self {
971 op: "d".to_string(),
972 ts_ms,
973 source,
974 before: Some(before),
975 after: None,
976 }
977 }
978
979 #[must_use]
981 pub fn update(before: T, after: T, source: CdcSource, ts_ms: i64) -> Self {
982 Self {
983 op: "u".to_string(),
984 ts_ms,
985 source,
986 before: Some(before),
987 after: Some(after),
988 }
989 }
990
991 #[must_use]
993 pub fn read(after: T, source: CdcSource, ts_ms: i64) -> Self {
994 Self {
995 op: "r".to_string(),
996 ts_ms,
997 source,
998 before: None,
999 after: Some(after),
1000 }
1001 }
1002
1003 #[must_use]
1005 pub fn is_insert(&self) -> bool {
1006 self.op == "c"
1007 }
1008
1009 #[must_use]
1011 pub fn is_delete(&self) -> bool {
1012 self.op == "d"
1013 }
1014
1015 #[must_use]
1017 pub fn is_update(&self) -> bool {
1018 self.op == "u"
1019 }
1020
1021 #[must_use]
1027 pub fn weight(&self) -> i32 {
1028 match self.op.as_str() {
1029 "c" | "r" => 1,
1030 "d" => -1,
1031 _ => 0,
1033 }
1034 }
1035}
1036
1037impl<T: Serialize> CdcEnvelope<T> {
1038 pub fn to_json(&self) -> Result<String, serde_json::Error> {
1044 serde_json::to_string(self)
1045 }
1046
1047 pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
1053 serde_json::to_string_pretty(self)
1054 }
1055
1056 pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
1062 serde_json::to_vec(self)
1063 }
1064}
1065
1066#[derive(Debug, Clone, Default)]
1083pub struct RetractableFirstValueAccumulator {
1084 entries: std::collections::BTreeMap<i64, Vec<i64>>,
1086 total_count: usize,
1088}
1089
1090impl RetractableFirstValueAccumulator {
1091 #[must_use]
1093 pub fn new() -> Self {
1094 Self::default()
1095 }
1096
1097 #[must_use]
1099 pub fn len(&self) -> usize {
1100 self.total_count
1101 }
1102
1103 #[must_use]
1105 pub fn is_empty(&self) -> bool {
1106 self.total_count == 0
1107 }
1108}
1109
1110impl RetractableAccumulator for RetractableFirstValueAccumulator {
1111 type Input = (i64, i64); type Output = Option<i64>;
1113
1114 fn add(&mut self, (timestamp, value): (i64, i64)) {
1115 self.entries.entry(timestamp).or_default().push(value);
1116 self.total_count += 1;
1117 }
1118
1119 fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1120 if let Some(values) = self.entries.get_mut(timestamp) {
1121 if let Some(pos) = values.iter().position(|v| v == value) {
1122 values.swap_remove(pos);
1123 self.total_count -= 1;
1124 if values.is_empty() {
1125 self.entries.remove(timestamp);
1126 }
1127 }
1128 }
1129 }
1130
1131 fn merge(&mut self, other: &Self) {
1132 for (&ts, vals) in &other.entries {
1133 self.entries.entry(ts).or_default().extend(vals);
1134 }
1135 self.total_count += other.total_count;
1136 }
1137
1138 fn result(&self) -> Option<i64> {
1139 self.entries
1140 .values()
1141 .next()
1142 .and_then(|vals| vals.first().copied())
1143 }
1144
1145 fn is_empty(&self) -> bool {
1146 self.total_count == 0
1147 }
1148
1149 fn supports_efficient_retraction(&self) -> bool {
1150 true
1151 }
1152
1153 fn reset(&mut self) {
1154 self.entries.clear();
1155 self.total_count = 0;
1156 }
1157}
1158
1159#[derive(Debug, Clone, Default)]
1164pub struct RetractableLastValueAccumulator {
1165 entries: std::collections::BTreeMap<i64, Vec<i64>>,
1167 total_count: usize,
1169}
1170
1171impl RetractableLastValueAccumulator {
1172 #[must_use]
1174 pub fn new() -> Self {
1175 Self::default()
1176 }
1177
1178 #[must_use]
1180 pub fn len(&self) -> usize {
1181 self.total_count
1182 }
1183
1184 #[must_use]
1186 pub fn is_empty(&self) -> bool {
1187 self.total_count == 0
1188 }
1189}
1190
1191impl RetractableAccumulator for RetractableLastValueAccumulator {
1192 type Input = (i64, i64); type Output = Option<i64>;
1194
1195 fn add(&mut self, (timestamp, value): (i64, i64)) {
1196 self.entries.entry(timestamp).or_default().push(value);
1197 self.total_count += 1;
1198 }
1199
1200 fn retract(&mut self, (timestamp, value): &(i64, i64)) {
1201 if let Some(values) = self.entries.get_mut(timestamp) {
1202 if let Some(pos) = values.iter().position(|v| v == value) {
1203 values.swap_remove(pos);
1204 self.total_count -= 1;
1205 if values.is_empty() {
1206 self.entries.remove(timestamp);
1207 }
1208 }
1209 }
1210 }
1211
1212 fn merge(&mut self, other: &Self) {
1213 for (&ts, vals) in &other.entries {
1214 self.entries.entry(ts).or_default().extend(vals);
1215 }
1216 self.total_count += other.total_count;
1217 }
1218
1219 fn result(&self) -> Option<i64> {
1220 self.entries
1221 .values()
1222 .next_back()
1223 .and_then(|vals| vals.last().copied())
1224 }
1225
1226 fn is_empty(&self) -> bool {
1227 self.total_count == 0
1228 }
1229
1230 fn supports_efficient_retraction(&self) -> bool {
1231 true
1232 }
1233
1234 fn reset(&mut self) {
1235 self.entries.clear();
1236 self.total_count = 0;
1237 }
1238}
1239
1240#[derive(Debug, Clone, Default)]
1245pub struct RetractableFirstValueF64Accumulator {
1246 entries: std::collections::BTreeMap<i64, Vec<i64>>,
1248 total_count: usize,
1250}
1251
1252impl RetractableFirstValueF64Accumulator {
1253 #[must_use]
1255 pub fn new() -> Self {
1256 Self::default()
1257 }
1258
1259 #[must_use]
1261 pub fn len(&self) -> usize {
1262 self.total_count
1263 }
1264
1265 #[must_use]
1267 pub fn is_empty(&self) -> bool {
1268 self.total_count == 0
1269 }
1270
1271 #[must_use]
1273 #[allow(clippy::cast_sign_loss)]
1274 pub fn result_f64(&self) -> Option<f64> {
1275 self.entries
1276 .values()
1277 .next()
1278 .and_then(|vals| vals.first())
1279 .map(|bits| f64::from_bits(*bits as u64))
1280 }
1281}
1282
1283impl RetractableAccumulator for RetractableFirstValueF64Accumulator {
1284 type Input = (i64, f64); type Output = Option<i64>; #[allow(clippy::cast_possible_wrap)]
1288 fn add(&mut self, (timestamp, value): (i64, f64)) {
1289 let value_bits = value.to_bits() as i64;
1290 self.entries.entry(timestamp).or_default().push(value_bits);
1291 self.total_count += 1;
1292 }
1293
1294 fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1295 #[allow(clippy::cast_possible_wrap)]
1296 let value_bits = value.to_bits() as i64;
1297 if let Some(values) = self.entries.get_mut(timestamp) {
1298 if let Some(pos) = values.iter().position(|v| *v == value_bits) {
1299 values.swap_remove(pos);
1300 self.total_count -= 1;
1301 if values.is_empty() {
1302 self.entries.remove(timestamp);
1303 }
1304 }
1305 }
1306 }
1307
1308 fn merge(&mut self, other: &Self) {
1309 for (&ts, vals) in &other.entries {
1310 self.entries.entry(ts).or_default().extend(vals);
1311 }
1312 self.total_count += other.total_count;
1313 }
1314
1315 fn result(&self) -> Option<i64> {
1316 self.entries
1317 .values()
1318 .next()
1319 .and_then(|vals| vals.first().copied())
1320 }
1321
1322 fn is_empty(&self) -> bool {
1323 self.total_count == 0
1324 }
1325
1326 fn supports_efficient_retraction(&self) -> bool {
1327 true
1328 }
1329
1330 fn reset(&mut self) {
1331 self.entries.clear();
1332 self.total_count = 0;
1333 }
1334}
1335
1336#[derive(Debug, Clone, Default)]
1341pub struct RetractableLastValueF64Accumulator {
1342 entries: std::collections::BTreeMap<i64, Vec<i64>>,
1344 total_count: usize,
1346}
1347
1348impl RetractableLastValueF64Accumulator {
1349 #[must_use]
1351 pub fn new() -> Self {
1352 Self::default()
1353 }
1354
1355 #[must_use]
1357 pub fn len(&self) -> usize {
1358 self.total_count
1359 }
1360
1361 #[must_use]
1363 pub fn is_empty(&self) -> bool {
1364 self.total_count == 0
1365 }
1366
1367 #[must_use]
1369 #[allow(clippy::cast_sign_loss)]
1370 pub fn result_f64(&self) -> Option<f64> {
1371 self.entries
1372 .values()
1373 .next_back()
1374 .and_then(|vals| vals.last())
1375 .map(|bits| f64::from_bits(*bits as u64))
1376 }
1377}
1378
1379impl RetractableAccumulator for RetractableLastValueF64Accumulator {
1380 type Input = (i64, f64); type Output = Option<i64>; #[allow(clippy::cast_possible_wrap)]
1384 fn add(&mut self, (timestamp, value): (i64, f64)) {
1385 let value_bits = value.to_bits() as i64;
1386 self.entries.entry(timestamp).or_default().push(value_bits);
1387 self.total_count += 1;
1388 }
1389
1390 fn retract(&mut self, (timestamp, value): &(i64, f64)) {
1391 #[allow(clippy::cast_possible_wrap)]
1392 let value_bits = value.to_bits() as i64;
1393 if let Some(values) = self.entries.get_mut(timestamp) {
1394 if let Some(pos) = values.iter().position(|v| *v == value_bits) {
1395 values.swap_remove(pos);
1396 self.total_count -= 1;
1397 if values.is_empty() {
1398 self.entries.remove(timestamp);
1399 }
1400 }
1401 }
1402 }
1403
1404 fn merge(&mut self, other: &Self) {
1405 for (&ts, vals) in &other.entries {
1406 self.entries.entry(ts).or_default().extend(vals);
1407 }
1408 self.total_count += other.total_count;
1409 }
1410
1411 fn result(&self) -> Option<i64> {
1412 self.entries
1413 .values()
1414 .next_back()
1415 .and_then(|vals| vals.last().copied())
1416 }
1417
1418 fn is_empty(&self) -> bool {
1419 self.total_count == 0
1420 }
1421
1422 fn supports_efficient_retraction(&self) -> bool {
1423 true
1424 }
1425
1426 fn reset(&mut self) {
1427 self.entries.clear();
1428 self.total_count = 0;
1429 }
1430}
1431
1432#[cfg(test)]
1435mod tests {
1436 use super::*;
1437
1438 #[test]
1441 fn test_changelog_ref_insert() {
1442 let cr = ChangelogRef::insert(10, 5);
1443 assert_eq!(cr.batch_offset, 10);
1444 assert_eq!(cr.row_index, 5);
1445 assert_eq!(cr.weight, 1);
1446 assert_eq!(cr.operation(), CdcOperation::Insert);
1447 assert!(cr.is_insert());
1448 assert!(!cr.is_delete());
1449 }
1450
1451 #[test]
1452 fn test_changelog_ref_delete() {
1453 let cr = ChangelogRef::delete(20, 3);
1454 assert_eq!(cr.batch_offset, 20);
1455 assert_eq!(cr.row_index, 3);
1456 assert_eq!(cr.weight, -1);
1457 assert_eq!(cr.operation(), CdcOperation::Delete);
1458 assert!(!cr.is_insert());
1459 assert!(cr.is_delete());
1460 }
1461
1462 #[test]
1463 fn test_changelog_ref_update() {
1464 let before = ChangelogRef::update_before(5, 1);
1465 let after = ChangelogRef::update_after(5, 2);
1466
1467 assert_eq!(before.weight, -1);
1468 assert_eq!(after.weight, 1);
1469 assert_eq!(before.operation(), CdcOperation::UpdateBefore);
1470 assert_eq!(after.operation(), CdcOperation::UpdateAfter);
1471 }
1472
1473 #[test]
1474 fn test_changelog_ref_size() {
1475 assert!(std::mem::size_of::<ChangelogRef>() <= 16);
1477 }
1478
1479 #[test]
1482 fn test_changelog_buffer_basic() {
1483 let mut buffer = ChangelogBuffer::with_capacity(10);
1484 assert!(buffer.is_empty());
1485 assert_eq!(buffer.capacity(), 10);
1486
1487 assert!(buffer.push(ChangelogRef::insert(0, 0)));
1488 assert!(buffer.push(ChangelogRef::delete(1, 0)));
1489
1490 assert_eq!(buffer.len(), 2);
1491 assert!(!buffer.is_empty());
1492 }
1493
1494 #[test]
1495 fn test_changelog_buffer_full() {
1496 let mut buffer = ChangelogBuffer::with_capacity(2);
1497
1498 assert!(buffer.push(ChangelogRef::insert(0, 0)));
1499 assert!(buffer.push(ChangelogRef::insert(1, 0)));
1500 assert!(!buffer.push(ChangelogRef::insert(2, 0))); assert!(buffer.is_full());
1503 assert_eq!(buffer.available(), 0);
1504 }
1505
1506 #[test]
1507 fn test_changelog_buffer_drain() {
1508 let mut buffer = ChangelogBuffer::with_capacity(10);
1509
1510 for i in 0..5 {
1511 buffer.push(ChangelogRef::insert(i, 0));
1512 }
1513
1514 let drained: Vec<_> = buffer.drain().collect();
1515 assert_eq!(drained.len(), 5);
1516 assert!(buffer.is_empty());
1517
1518 for i in 0..3 {
1520 buffer.push(ChangelogRef::delete(i, 0));
1521 }
1522 assert_eq!(buffer.len(), 3);
1523 }
1524
1525 #[test]
1526 fn test_changelog_buffer_retraction() {
1527 let mut buffer = ChangelogBuffer::with_capacity(10);
1528
1529 assert!(buffer.push_retraction(0, 1, 2));
1530 assert_eq!(buffer.len(), 2);
1531
1532 let refs: Vec<_> = buffer.as_slice().to_vec();
1533 assert_eq!(refs[0].operation(), CdcOperation::UpdateBefore);
1534 assert_eq!(refs[0].row_index, 1);
1535 assert_eq!(refs[1].operation(), CdcOperation::UpdateAfter);
1536 assert_eq!(refs[1].row_index, 2);
1537 }
1538
1539 #[test]
1540 fn test_changelog_buffer_zero_alloc_reuse() {
1541 let mut buffer = ChangelogBuffer::with_capacity(100);
1542
1543 for i in 0..50 {
1545 buffer.push(ChangelogRef::insert(i, 0));
1546 }
1547 let _: Vec<_> = buffer.drain().collect();
1548
1549 for i in 0..50 {
1551 buffer.push(ChangelogRef::insert(i, 0));
1552 }
1553
1554 assert_eq!(buffer.len(), 50);
1555 }
1556
1557 #[test]
1560 fn test_retractable_count() {
1561 let mut agg = RetractableCountAccumulator::default();
1562
1563 agg.add(());
1564 agg.add(());
1565 agg.add(());
1566 assert_eq!(agg.result(), 3);
1567
1568 agg.retract(&());
1569 assert_eq!(agg.result(), 2);
1570
1571 agg.retract(&());
1572 agg.retract(&());
1573 assert_eq!(agg.result(), 0);
1574 }
1575
1576 #[test]
1577 fn test_retractable_count_negative() {
1578 let mut agg = RetractableCountAccumulator::default();
1579
1580 agg.add(());
1581 agg.retract(&());
1582 agg.retract(&()); assert_eq!(agg.result(), -1);
1586 }
1587
1588 #[test]
1589 fn test_retractable_sum() {
1590 let mut agg = RetractableSumAccumulator::default();
1591
1592 agg.add(10);
1593 agg.add(20);
1594 agg.add(30);
1595 assert_eq!(agg.result(), 60);
1596
1597 agg.retract(&20);
1598 assert_eq!(agg.result(), 40);
1599
1600 agg.retract(&10);
1601 agg.retract(&30);
1602 assert_eq!(agg.result(), 0);
1603 }
1604
1605 #[test]
1606 fn test_retractable_sum_merge() {
1607 let mut agg1 = RetractableSumAccumulator::default();
1608 agg1.add(10);
1609 agg1.add(20);
1610
1611 let mut agg2 = RetractableSumAccumulator::default();
1612 agg2.add(30);
1613 agg2.retract(&5);
1614
1615 agg1.merge(&agg2);
1616 assert_eq!(agg1.result(), 55); }
1618
1619 #[test]
1620 fn test_retractable_avg() {
1621 let mut agg = RetractableAvgAccumulator::default();
1622
1623 agg.add(10);
1624 agg.add(20);
1625 agg.add(30);
1626 let avg = agg.result().unwrap();
1627 assert!((avg - 20.0).abs() < f64::EPSILON);
1628
1629 agg.retract(&30);
1630 let avg = agg.result().unwrap();
1631 assert!((avg - 15.0).abs() < f64::EPSILON); }
1633
1634 #[test]
1635 fn test_retractable_avg_empty() {
1636 let mut agg = RetractableAvgAccumulator::default();
1637 assert!(agg.result().is_none());
1638
1639 agg.add(10);
1640 agg.retract(&10);
1641 assert!(agg.result().is_none());
1642 }
1643
1644 #[test]
1645 fn test_retractable_min() {
1646 let mut agg = RetractableMinAccumulator::default();
1647
1648 agg.add(30);
1649 agg.add(10);
1650 agg.add(20);
1651 assert_eq!(agg.result(), Some(10));
1652
1653 agg.retract(&10);
1655 assert_eq!(agg.result(), Some(20));
1656
1657 agg.retract(&30);
1659 assert_eq!(agg.result(), Some(20));
1660
1661 agg.retract(&20);
1662 assert_eq!(agg.result(), None);
1663 }
1664
1665 #[test]
1666 fn test_retractable_max() {
1667 let mut agg = RetractableMaxAccumulator::default();
1668
1669 agg.add(10);
1670 agg.add(30);
1671 agg.add(20);
1672 assert_eq!(agg.result(), Some(30));
1673
1674 agg.retract(&30);
1676 assert_eq!(agg.result(), Some(20));
1677
1678 agg.retract(&20);
1679 agg.retract(&10);
1680 assert_eq!(agg.result(), None);
1681 }
1682
1683 #[test]
1684 fn test_retractable_efficiency_flags() {
1685 let count = RetractableCountAccumulator::default();
1686 let sum = RetractableSumAccumulator::default();
1687 let avg = RetractableAvgAccumulator::default();
1688 let min = RetractableMinAccumulator::default();
1689 let max = RetractableMaxAccumulator::default();
1690
1691 assert!(count.supports_efficient_retraction());
1693 assert!(sum.supports_efficient_retraction());
1694 assert!(avg.supports_efficient_retraction());
1695
1696 assert!(min.supports_efficient_retraction());
1698 assert!(max.supports_efficient_retraction());
1699 }
1700
1701 #[test]
1704 fn test_late_data_retraction_first_emission() {
1705 let mut gen = LateDataRetractionGenerator::new(true);
1706 let window_id = WindowId::new(0, 60000);
1707
1708 let result = gen.check_retraction(&window_id, b"count=5", 1000);
1710 assert!(result.is_none());
1711 assert_eq!(gen.windows_tracked(), 1);
1712 }
1713
1714 #[test]
1715 fn test_late_data_retraction_changed_result() {
1716 let mut gen = LateDataRetractionGenerator::new(true);
1717 let window_id = WindowId::new(0, 60000);
1718
1719 gen.check_retraction(&window_id, b"count=5", 1000);
1721
1722 let result = gen.check_retraction(&window_id, b"count=7", 2000);
1724 assert!(result.is_some());
1725
1726 let (old, new) = result.unwrap();
1727 assert_eq!(old, b"count=5");
1728 assert_eq!(new, b"count=7");
1729 assert_eq!(gen.retractions_generated(), 1);
1730 }
1731
1732 #[test]
1733 fn test_late_data_retraction_same_result() {
1734 let mut gen = LateDataRetractionGenerator::new(true);
1735 let window_id = WindowId::new(0, 60000);
1736
1737 gen.check_retraction(&window_id, b"count=5", 1000);
1739
1740 let result = gen.check_retraction(&window_id, b"count=5", 2000);
1742 assert!(result.is_none());
1743 assert_eq!(gen.retractions_generated(), 0);
1744 }
1745
1746 #[test]
1747 fn test_late_data_retraction_disabled() {
1748 let mut gen = LateDataRetractionGenerator::new(false);
1749 let window_id = WindowId::new(0, 60000);
1750
1751 gen.check_retraction(&window_id, b"count=5", 1000);
1752 let result = gen.check_retraction(&window_id, b"count=7", 2000);
1753
1754 assert!(result.is_none());
1756 }
1757
1758 #[test]
1759 fn test_late_data_cleanup() {
1760 let mut gen = LateDataRetractionGenerator::new(true);
1761
1762 let w1 = WindowId::new(0, 1000);
1763 let w2 = WindowId::new(1000, 2000);
1764
1765 gen.check_retraction(&w1, b"a", 100);
1766 gen.check_retraction(&w2, b"b", 200);
1767 assert_eq!(gen.windows_tracked(), 2);
1768
1769 gen.cleanup_window(&w1);
1770 assert_eq!(gen.windows_tracked(), 1);
1771
1772 gen.cleanup_before_watermark(2000);
1773 assert_eq!(gen.windows_tracked(), 0);
1774 }
1775
1776 #[test]
1779 fn test_cdc_envelope_insert() {
1780 let source = CdcSource::new("laminardb", "default", "orders");
1781 let envelope = CdcEnvelope::insert(
1782 serde_json::json!({"id": 1, "amount": 100}),
1783 source,
1784 1_706_140_800_000,
1785 );
1786
1787 assert_eq!(envelope.op, "c");
1788 assert!(envelope.is_insert());
1789 assert!(envelope.before.is_none());
1790 assert!(envelope.after.is_some());
1791 assert_eq!(envelope.weight(), 1);
1792 }
1793
1794 #[test]
1795 fn test_cdc_envelope_delete() {
1796 let source = CdcSource::new("laminardb", "default", "orders");
1797 let envelope = CdcEnvelope::delete(serde_json::json!({"id": 1}), source, 1_706_140_800_000);
1798
1799 assert_eq!(envelope.op, "d");
1800 assert!(envelope.is_delete());
1801 assert!(envelope.before.is_some());
1802 assert!(envelope.after.is_none());
1803 assert_eq!(envelope.weight(), -1);
1804 }
1805
1806 #[test]
1807 fn test_cdc_envelope_update() {
1808 let source = CdcSource::new("laminardb", "default", "orders");
1809 let envelope = CdcEnvelope::update(
1810 serde_json::json!({"id": 1, "amount": 100}),
1811 serde_json::json!({"id": 1, "amount": 150}),
1812 source,
1813 1_706_140_800_000,
1814 );
1815
1816 assert_eq!(envelope.op, "u");
1817 assert!(envelope.is_update());
1818 assert!(envelope.before.is_some());
1819 assert!(envelope.after.is_some());
1820 assert_eq!(envelope.weight(), 0);
1821 }
1822
1823 #[test]
1824 fn test_cdc_envelope_json_serialization() {
1825 let source = CdcSource::new("laminardb", "default", "orders");
1826 let envelope = CdcEnvelope::insert(
1827 serde_json::json!({"id": 1, "amount": 100}),
1828 source,
1829 1_706_140_800_000,
1830 );
1831
1832 let json = envelope.to_json().unwrap();
1833 assert!(json.contains("\"op\":\"c\""));
1834 assert!(json.contains("\"after\""));
1835 assert!(!json.contains("\"before\""));
1836 assert!(json.contains("\"ts_ms\":1706140800000"));
1837 }
1838
1839 #[test]
1840 fn test_cdc_envelope_debezium_compatible() {
1841 let source = CdcSource::with_sequence("laminardb", "test_db", "users", 42);
1842 let envelope = CdcEnvelope::insert(
1843 serde_json::json!({"user_id": 123, "name": "Alice"}),
1844 source,
1845 1_706_140_800_000,
1846 );
1847
1848 let json = envelope.to_json().unwrap();
1849
1850 assert!(json.contains("\"op\":\"c\""));
1852 assert!(json.contains("\"source\""));
1853 assert!(json.contains("\"name\":\"laminardb\""));
1854 assert!(json.contains("\"db\":\"test_db\""));
1855 assert!(json.contains("\"table\":\"users\""));
1856 assert!(json.contains("\"sequence\":42"));
1857 }
1858
1859 #[test]
1860 fn test_cdc_source_sequence() {
1861 let mut source = CdcSource::new("laminardb", "db", "table");
1862 assert_eq!(source.sequence, 0);
1863
1864 assert_eq!(source.next_sequence(), 1);
1865 assert_eq!(source.next_sequence(), 2);
1866 assert_eq!(source.sequence, 2);
1867 }
1868
1869 #[test]
1872 fn test_cdc_operation_roundtrip() {
1873 for op in [
1874 CdcOperation::Insert,
1875 CdcOperation::Delete,
1876 CdcOperation::UpdateBefore,
1877 CdcOperation::UpdateAfter,
1878 ] {
1879 let u8_val = op.to_u8();
1880 let restored = CdcOperation::from_u8(u8_val);
1881 assert_eq!(op, restored);
1882 }
1883 }
1884
1885 #[test]
1886 fn test_cdc_operation_unknown_u8() {
1887 assert_eq!(CdcOperation::from_u8(255), CdcOperation::Insert);
1889 }
1890
1891 #[test]
1898 fn test_retractable_first_value_basic() {
1899 let mut acc = RetractableFirstValueAccumulator::new();
1900 assert!(acc.is_empty());
1901 assert_eq!(acc.result(), None);
1902
1903 acc.add((200, 20));
1905 acc.add((100, 10));
1906 acc.add((300, 30));
1907
1908 assert!(!acc.is_empty());
1909 assert_eq!(acc.len(), 3);
1910 assert_eq!(acc.result(), Some(10));
1912 }
1913
1914 #[test]
1915 fn test_retractable_first_value_retract_non_first() {
1916 let mut acc = RetractableFirstValueAccumulator::new();
1917 acc.add((100, 10));
1918 acc.add((200, 20));
1919 acc.add((300, 30));
1920
1921 acc.retract(&(200, 20));
1923 assert_eq!(acc.len(), 2);
1924 assert_eq!(acc.result(), Some(10));
1925 }
1926
1927 #[test]
1928 fn test_retractable_first_value_retract_first() {
1929 let mut acc = RetractableFirstValueAccumulator::new();
1930 acc.add((100, 10));
1931 acc.add((200, 20));
1932 acc.add((300, 30));
1933
1934 acc.retract(&(100, 10));
1936 assert_eq!(acc.len(), 2);
1937 assert_eq!(acc.result(), Some(20)); }
1939
1940 #[test]
1941 fn test_retractable_first_value_retract_all() {
1942 let mut acc = RetractableFirstValueAccumulator::new();
1943 acc.add((100, 10));
1944 acc.add((200, 20));
1945
1946 acc.retract(&(100, 10));
1947 acc.retract(&(200, 20));
1948 assert!(acc.is_empty());
1949 assert_eq!(acc.result(), None);
1950 }
1951
1952 #[test]
1953 fn test_retractable_first_value_retract_nonexistent() {
1954 let mut acc = RetractableFirstValueAccumulator::new();
1955 acc.add((100, 10));
1956
1957 acc.retract(&(999, 99));
1959 assert_eq!(acc.len(), 1);
1960 assert_eq!(acc.result(), Some(10));
1961 }
1962
1963 #[test]
1964 fn test_retractable_first_value_duplicate_timestamps() {
1965 let mut acc = RetractableFirstValueAccumulator::new();
1966 acc.add((100, 10));
1967 acc.add((100, 20)); assert_eq!(acc.len(), 2);
1970 assert_eq!(acc.result(), Some(10));
1972
1973 acc.retract(&(100, 10));
1975 assert_eq!(acc.result(), Some(20));
1976 }
1977
1978 #[test]
1981 fn test_retractable_last_value_basic() {
1982 let mut acc = RetractableLastValueAccumulator::new();
1983 assert!(acc.is_empty());
1984 assert_eq!(acc.result(), None);
1985
1986 acc.add((100, 10));
1987 acc.add((300, 30));
1988 acc.add((200, 20));
1989
1990 assert_eq!(acc.len(), 3);
1991 assert_eq!(acc.result(), Some(30));
1993 }
1994
1995 #[test]
1996 fn test_retractable_last_value_retract_non_last() {
1997 let mut acc = RetractableLastValueAccumulator::new();
1998 acc.add((100, 10));
1999 acc.add((200, 20));
2000 acc.add((300, 30));
2001
2002 acc.retract(&(200, 20));
2004 assert_eq!(acc.result(), Some(30));
2005 }
2006
2007 #[test]
2008 fn test_retractable_last_value_retract_last() {
2009 let mut acc = RetractableLastValueAccumulator::new();
2010 acc.add((100, 10));
2011 acc.add((200, 20));
2012 acc.add((300, 30));
2013
2014 acc.retract(&(300, 30));
2016 assert_eq!(acc.result(), Some(20)); }
2018
2019 #[test]
2020 fn test_retractable_last_value_retract_all() {
2021 let mut acc = RetractableLastValueAccumulator::new();
2022 acc.add((100, 10));
2023 acc.retract(&(100, 10));
2024 assert!(acc.is_empty());
2025 assert_eq!(acc.result(), None);
2026 }
2027
2028 #[test]
2031 fn test_retractable_first_value_merge() {
2032 let mut acc1 = RetractableFirstValueAccumulator::new();
2033 let mut acc2 = RetractableFirstValueAccumulator::new();
2034
2035 acc1.add((200, 20));
2036 acc1.add((400, 40));
2037 acc2.add((100, 10));
2038 acc2.add((300, 30));
2039
2040 acc1.merge(&acc2);
2041 assert_eq!(acc1.len(), 4);
2042 assert_eq!(acc1.result(), Some(10));
2044 }
2045
2046 #[test]
2047 fn test_retractable_last_value_merge() {
2048 let mut acc1 = RetractableLastValueAccumulator::new();
2049 let mut acc2 = RetractableLastValueAccumulator::new();
2050
2051 acc1.add((100, 10));
2052 acc1.add((300, 30));
2053 acc2.add((200, 20));
2054 acc2.add((400, 40));
2055
2056 acc1.merge(&acc2);
2057 assert_eq!(acc1.len(), 4);
2058 assert_eq!(acc1.result(), Some(40));
2060 }
2061
2062 #[test]
2063 fn test_retractable_first_value_merge_empty() {
2064 let mut acc1 = RetractableFirstValueAccumulator::new();
2065 let acc2 = RetractableFirstValueAccumulator::new();
2066
2067 acc1.add((100, 10));
2068 acc1.merge(&acc2); assert_eq!(acc1.result(), Some(10));
2070
2071 let mut acc3 = RetractableFirstValueAccumulator::new();
2072 let acc4 = RetractableFirstValueAccumulator::new();
2073 acc3.merge(&acc4); assert!(acc3.is_empty());
2075 }
2076
2077 #[test]
2080 fn test_retractable_first_value_reset() {
2081 let mut acc = RetractableFirstValueAccumulator::new();
2082 acc.add((100, 10));
2083 acc.add((200, 20));
2084 assert!(!acc.is_empty());
2085
2086 acc.reset();
2087 assert!(acc.is_empty());
2088 assert_eq!(acc.result(), None);
2089 }
2090
2091 #[test]
2092 fn test_retractable_last_value_reset() {
2093 let mut acc = RetractableLastValueAccumulator::new();
2094 acc.add((100, 10));
2095 acc.reset();
2096 assert!(acc.is_empty());
2097 }
2098
2099 #[test]
2102 fn test_retractable_first_value_f64_basic() {
2103 let mut acc = RetractableFirstValueF64Accumulator::new();
2104 acc.add((200, 20.5));
2105 acc.add((100, 10.5));
2106 acc.add((300, 30.5));
2107
2108 assert_eq!(acc.len(), 3);
2109 let result = acc.result_f64().unwrap();
2111 assert!((result - 10.5).abs() < f64::EPSILON);
2112 }
2113
2114 #[test]
2115 fn test_retractable_first_value_f64_retract() {
2116 let mut acc = RetractableFirstValueF64Accumulator::new();
2117 acc.add((100, 10.5));
2118 acc.add((200, 20.5));
2119
2120 acc.retract(&(100, 10.5));
2122 let result = acc.result_f64().unwrap();
2123 assert!((result - 20.5).abs() < f64::EPSILON);
2124 }
2125
2126 #[test]
2127 fn test_retractable_last_value_f64_basic() {
2128 let mut acc = RetractableLastValueF64Accumulator::new();
2129 acc.add((100, 10.5));
2130 acc.add((300, 30.5));
2131 acc.add((200, 20.5));
2132
2133 let result = acc.result_f64().unwrap();
2134 assert!((result - 30.5).abs() < f64::EPSILON);
2135 }
2136
2137 #[test]
2138 fn test_retractable_last_value_f64_retract() {
2139 let mut acc = RetractableLastValueF64Accumulator::new();
2140 acc.add((100, 10.5));
2141 acc.add((200, 20.5));
2142 acc.add((300, 30.5));
2143
2144 acc.retract(&(300, 30.5));
2145 let result = acc.result_f64().unwrap();
2146 assert!((result - 20.5).abs() < f64::EPSILON);
2147 }
2148
2149 #[test]
2150 fn test_retractable_first_value_f64_merge() {
2151 let mut acc1 = RetractableFirstValueF64Accumulator::new();
2152 let mut acc2 = RetractableFirstValueF64Accumulator::new();
2153 acc1.add((200, 20.5));
2154 acc2.add((100, 10.5));
2155 acc1.merge(&acc2);
2156 let result = acc1.result_f64().unwrap();
2157 assert!((result - 10.5).abs() < f64::EPSILON);
2158 }
2159
2160 #[test]
2161 fn test_retractable_last_value_f64_merge() {
2162 let mut acc1 = RetractableLastValueF64Accumulator::new();
2163 let mut acc2 = RetractableLastValueF64Accumulator::new();
2164 acc1.add((100, 10.5));
2165 acc2.add((300, 30.5));
2166 acc1.merge(&acc2);
2167 let result = acc1.result_f64().unwrap();
2168 assert!((result - 30.5).abs() < f64::EPSILON);
2169 }
2170
2171 #[test]
2174 fn test_retractable_first_value_single_entry() {
2175 let mut acc = RetractableFirstValueAccumulator::new();
2176 acc.add((100, 42));
2177 assert_eq!(acc.result(), Some(42));
2178 acc.retract(&(100, 42));
2179 assert_eq!(acc.result(), None);
2180 }
2181
2182 #[test]
2183 fn test_retractable_last_value_single_entry() {
2184 let mut acc = RetractableLastValueAccumulator::new();
2185 acc.add((100, 42));
2186 assert_eq!(acc.result(), Some(42));
2187 acc.retract(&(100, 42));
2188 assert_eq!(acc.result(), None);
2189 }
2190
2191 #[test]
2192 fn test_retractable_first_value_negative_values() {
2193 let mut acc = RetractableFirstValueAccumulator::new();
2194 acc.add((100, -10));
2195 acc.add((200, -20));
2196 assert_eq!(acc.result(), Some(-10));
2197 }
2198
2199 #[test]
2200 fn test_retractable_supports_efficient_retraction() {
2201 let acc = RetractableFirstValueAccumulator::new();
2202 assert!(acc.supports_efficient_retraction());
2203
2204 let acc2 = RetractableLastValueAccumulator::new();
2205 assert!(acc2.supports_efficient_retraction());
2206
2207 let acc3 = RetractableFirstValueF64Accumulator::new();
2208 assert!(acc3.supports_efficient_retraction());
2209
2210 let acc4 = RetractableLastValueF64Accumulator::new();
2211 assert!(acc4.supports_efficient_retraction());
2212 }
2213
2214 #[test]
2217 fn test_ohlc_retraction_simulation() {
2218 let mut open_acc = RetractableFirstValueAccumulator::new();
2221 let mut close_acc = RetractableLastValueAccumulator::new();
2222
2223 open_acc.add((1000, 100));
2225 close_acc.add((1000, 100));
2226
2227 open_acc.add((2000, 105));
2229 close_acc.add((2000, 105));
2230
2231 open_acc.add((3000, 98));
2233 close_acc.add((3000, 98));
2234
2235 assert_eq!(open_acc.result(), Some(100)); assert_eq!(close_acc.result(), Some(98)); open_acc.retract(&(1000, 100));
2240 close_acc.retract(&(1000, 100));
2241
2242 assert_eq!(open_acc.result(), Some(105));
2244 assert_eq!(close_acc.result(), Some(98));
2246 }
2247
2248 #[test]
2249 fn test_ohlc_retraction_f64_simulation() {
2250 let mut open_acc = RetractableFirstValueF64Accumulator::new();
2251 let mut close_acc = RetractableLastValueF64Accumulator::new();
2252
2253 open_acc.add((1000, 100.50));
2254 close_acc.add((1000, 100.50));
2255 open_acc.add((2000, 105.25));
2256 close_acc.add((2000, 105.25));
2257 open_acc.add((3000, 98.75));
2258 close_acc.add((3000, 98.75));
2259
2260 let open = open_acc.result_f64().unwrap();
2261 let close = close_acc.result_f64().unwrap();
2262 assert!((open - 100.50).abs() < f64::EPSILON);
2263 assert!((close - 98.75).abs() < f64::EPSILON);
2264
2265 open_acc.retract(&(1000, 100.50));
2267 close_acc.retract(&(1000, 100.50));
2268
2269 let open2 = open_acc.result_f64().unwrap();
2270 assert!((open2 - 105.25).abs() < f64::EPSILON);
2271 }
2272}