1use super::{
40 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec,
41 SideOutputData, Timer,
42};
43use crate::state::{StateStore, StateStoreExt};
44use arrow_array::{Array as ArrowArray, Int64Array, RecordBatch};
45use arrow_schema::{DataType, Field, Schema, SchemaRef};
46use rkyv::{
47 api::high::{HighDeserializer, HighSerializer, HighValidator},
48 bytecheck::CheckBytes,
49 rancor::Error as RkyvError,
50 ser::allocator::ArenaHandle,
51 util::AlignedVec,
52 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
53};
54use rustc_hash::{FxHashMap, FxHashSet};
55use smallvec::SmallVec;
56use std::marker::PhantomData;
57use std::sync::atomic::{AtomicU64, Ordering};
58use std::sync::Arc;
59use std::time::Duration;
60
61#[derive(Debug, Clone, PartialEq, Eq, Default)]
87pub struct LateDataConfig {
88 side_output: Option<String>,
90}
91
92impl LateDataConfig {
93 #[must_use]
95 pub fn drop() -> Self {
96 Self { side_output: None }
97 }
98
99 #[must_use]
101 pub fn with_side_output(name: String) -> Self {
102 Self {
103 side_output: Some(name),
104 }
105 }
106
107 #[must_use]
109 pub fn side_output(&self) -> Option<&str> {
110 self.side_output.as_deref()
111 }
112
113 #[must_use]
115 pub fn should_drop(&self) -> bool {
116 self.side_output.is_none()
117 }
118}
119
120#[derive(Debug, Clone, Default)]
125#[allow(clippy::struct_field_names)]
126pub struct LateDataMetrics {
127 late_events_total: u64,
129 late_events_dropped: u64,
131 late_events_side_output: u64,
133}
134
135impl LateDataMetrics {
136 #[must_use]
138 pub fn new() -> Self {
139 Self::default()
140 }
141
142 #[must_use]
144 pub fn late_events_total(&self) -> u64 {
145 self.late_events_total
146 }
147
148 #[must_use]
150 pub fn late_events_dropped(&self) -> u64 {
151 self.late_events_dropped
152 }
153
154 #[must_use]
156 pub fn late_events_side_output(&self) -> u64 {
157 self.late_events_side_output
158 }
159
160 pub fn record_dropped(&mut self) {
162 self.late_events_total += 1;
163 self.late_events_dropped += 1;
164 }
165
166 pub fn record_side_output(&mut self) {
168 self.late_events_total += 1;
169 self.late_events_side_output += 1;
170 }
171
172 pub fn reset(&mut self) {
174 self.late_events_total = 0;
175 self.late_events_dropped = 0;
176 self.late_events_side_output = 0;
177 }
178}
179
180#[derive(Debug, Clone, Default)]
187pub struct WindowCloseMetrics {
188 windows_closed_total: u64,
190 close_latency_sum_ms: i64,
192 close_latency_max_ms: i64,
194}
195
196impl WindowCloseMetrics {
197 #[must_use]
199 pub fn new() -> Self {
200 Self::default()
201 }
202
203 #[must_use]
205 pub fn windows_closed_total(&self) -> u64 {
206 self.windows_closed_total
207 }
208
209 #[must_use]
218 pub fn avg_close_latency_ms(&self) -> i64 {
219 if self.windows_closed_total == 0 {
220 0
221 } else {
222 self.close_latency_sum_ms / i64::try_from(self.windows_closed_total).unwrap_or(i64::MAX)
223 }
224 }
225
226 #[must_use]
228 pub fn max_close_latency_ms(&self) -> i64 {
229 self.close_latency_max_ms
230 }
231
232 pub fn record_close(&mut self, window_end: i64, processing_time: i64) {
239 self.windows_closed_total += 1;
240 let latency = processing_time.saturating_sub(window_end).max(0);
241 self.close_latency_sum_ms += latency;
242 if latency > self.close_latency_max_ms {
243 self.close_latency_max_ms = latency;
244 }
245 }
246
247 pub fn reset(&mut self) {
249 self.windows_closed_total = 0;
250 self.close_latency_sum_ms = 0;
251 self.close_latency_max_ms = 0;
252 }
253}
254
255#[derive(Debug, Clone, PartialEq, Eq, Default)]
265pub enum EmitStrategy {
266 #[default]
273 OnWatermark,
274
275 Periodic(Duration),
282
283 OnUpdate,
290
291 OnWindowClose,
323
324 Changelog,
338
339 Final,
349}
350
351impl EmitStrategy {
352 #[must_use]
354 pub fn needs_periodic_timer(&self) -> bool {
355 matches!(self, Self::Periodic(_))
356 }
357
358 #[must_use]
360 pub fn periodic_interval(&self) -> Option<Duration> {
361 match self {
362 Self::Periodic(d) => Some(*d),
363 _ => None,
364 }
365 }
366
367 #[must_use]
369 pub fn emits_on_update(&self) -> bool {
370 matches!(self, Self::OnUpdate)
371 }
372
373 #[must_use]
387 pub fn emits_intermediate(&self) -> bool {
388 matches!(self, Self::OnUpdate | Self::Periodic(_))
389 }
390
391 #[must_use]
396 pub fn requires_changelog(&self) -> bool {
397 matches!(self, Self::Changelog)
398 }
399
400 #[must_use]
407 pub fn is_append_only_compatible(&self) -> bool {
408 matches!(self, Self::OnWindowClose | Self::Final)
409 }
410
411 #[must_use]
423 pub fn generates_retractions(&self) -> bool {
424 matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
425 }
426
427 #[must_use]
431 pub fn suppresses_intermediate(&self) -> bool {
432 matches!(self, Self::OnWindowClose | Self::Final)
433 }
434
435 #[must_use]
440 pub fn drops_late_data(&self) -> bool {
441 matches!(self, Self::Final)
442 }
443}
444
445#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Archive, RkyvSerialize, RkyvDeserialize)]
450pub struct WindowId {
451 pub start: i64,
453 pub end: i64,
455}
456
457impl WindowId {
458 #[must_use]
460 pub fn new(start: i64, end: i64) -> Self {
461 Self { start, end }
462 }
463
464 #[must_use]
466 pub fn duration_ms(&self) -> i64 {
467 self.end - self.start
468 }
469
470 #[inline]
475 #[must_use]
476 pub fn to_key(&self) -> super::TimerKey {
477 super::TimerKey::from(self.to_key_inline())
478 }
479
480 #[inline]
485 #[must_use]
486 pub fn to_key_inline(&self) -> [u8; 16] {
487 let mut key = [0u8; 16];
488 key[..8].copy_from_slice(&self.start.to_be_bytes());
489 key[8..16].copy_from_slice(&self.end.to_be_bytes());
490 key
491 }
492
493 #[must_use]
499 pub fn from_key(key: &[u8]) -> Option<Self> {
500 if key.len() != 16 {
501 return None;
502 }
503 let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
504 let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
505 Some(Self { start, end })
506 }
507}
508
509pub type WindowIdVec = SmallVec<[WindowId; 4]>;
515
516#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
526pub enum CdcOperation {
527 Insert,
529 Delete,
531 UpdateBefore,
533 UpdateAfter,
535}
536
537impl CdcOperation {
538 #[must_use]
543 pub fn weight(&self) -> i32 {
544 match self {
545 Self::Insert | Self::UpdateAfter => 1,
546 Self::Delete | Self::UpdateBefore => -1,
547 }
548 }
549
550 #[must_use]
552 pub fn is_insert(&self) -> bool {
553 matches!(self, Self::Insert | Self::UpdateAfter)
554 }
555
556 #[must_use]
558 pub fn is_delete(&self) -> bool {
559 matches!(self, Self::Delete | Self::UpdateBefore)
560 }
561
562 #[must_use]
568 pub fn debezium_op(&self) -> char {
569 match self {
570 Self::Insert => 'c',
571 Self::Delete => 'd',
572 Self::UpdateBefore | Self::UpdateAfter => 'u',
573 }
574 }
575
576 #[inline]
580 #[must_use]
581 pub fn to_u8(self) -> u8 {
582 match self {
583 Self::Insert => 0,
584 Self::Delete => 1,
585 Self::UpdateBefore => 2,
586 Self::UpdateAfter => 3,
587 }
588 }
589
590 #[inline]
592 #[must_use]
593 pub fn from_u8(value: u8) -> Self {
594 match value {
595 1 => Self::Delete,
596 2 => Self::UpdateBefore,
597 3 => Self::UpdateAfter,
598 _ => Self::Insert,
600 }
601 }
602}
603
604#[derive(Debug, Clone)]
639pub struct ChangelogRecord {
640 pub operation: CdcOperation,
642 pub weight: i32,
644 pub emit_timestamp: i64,
646 pub event: Event,
648}
649
650impl ChangelogRecord {
651 #[must_use]
653 pub fn insert(event: Event, emit_timestamp: i64) -> Self {
654 Self {
655 operation: CdcOperation::Insert,
656 weight: 1,
657 emit_timestamp,
658 event,
659 }
660 }
661
662 #[must_use]
664 pub fn delete(event: Event, emit_timestamp: i64) -> Self {
665 Self {
666 operation: CdcOperation::Delete,
667 weight: -1,
668 emit_timestamp,
669 event,
670 }
671 }
672
673 #[must_use]
679 pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
680 let before = Self {
681 operation: CdcOperation::UpdateBefore,
682 weight: -1,
683 emit_timestamp,
684 event: old_event,
685 };
686 let after = Self {
687 operation: CdcOperation::UpdateAfter,
688 weight: 1,
689 emit_timestamp,
690 event: new_event,
691 };
692 (before, after)
693 }
694
695 #[must_use]
697 pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
698 Self {
699 operation,
700 weight: operation.weight(),
701 emit_timestamp,
702 event,
703 }
704 }
705
706 #[must_use]
708 pub fn is_insert(&self) -> bool {
709 self.operation.is_insert()
710 }
711
712 #[must_use]
714 pub fn is_delete(&self) -> bool {
715 self.operation.is_delete()
716 }
717}
718
719pub trait WindowAssigner: Send {
721 fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
726
727 fn max_timestamp(&self, window_end: i64) -> i64 {
732 window_end - 1
733 }
734}
735
736#[derive(Debug, Clone)]
741pub struct TumblingWindowAssigner {
742 size_ms: i64,
744 offset_ms: i64,
746}
747
748impl TumblingWindowAssigner {
749 #[must_use]
759 pub fn new(size: Duration) -> Self {
760 let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
762 assert!(size_ms > 0, "Window size must be positive");
763 Self {
764 size_ms,
765 offset_ms: 0,
766 }
767 }
768
769 #[must_use]
775 pub fn from_millis(size_ms: i64) -> Self {
776 assert!(size_ms > 0, "Window size must be positive");
777 Self {
778 size_ms,
779 offset_ms: 0,
780 }
781 }
782
783 #[must_use]
785 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
786 self.offset_ms = offset_ms;
787 self
788 }
789
790 #[must_use]
792 pub fn size_ms(&self) -> i64 {
793 self.size_ms
794 }
795
796 #[must_use]
798 pub fn offset_ms(&self) -> i64 {
799 self.offset_ms
800 }
801
802 #[inline]
807 #[must_use]
808 pub fn assign(&self, timestamp: i64) -> WindowId {
809 let adjusted = timestamp - self.offset_ms;
810 let window_start = if adjusted >= 0 {
811 (adjusted / self.size_ms) * self.size_ms
812 } else {
813 ((adjusted - self.size_ms + 1) / self.size_ms) * self.size_ms
815 };
816 let window_start = window_start + self.offset_ms;
817 let window_end = window_start + self.size_ms;
818 WindowId::new(window_start, window_end)
819 }
820}
821
822impl WindowAssigner for TumblingWindowAssigner {
823 #[inline]
824 fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
825 let mut windows = WindowIdVec::new();
826 windows.push(self.assign(timestamp));
827 windows
828 }
829}
830
831pub trait ResultToI64 {
835 fn to_i64(&self) -> i64;
837}
838
839impl ResultToI64 for u64 {
840 fn to_i64(&self) -> i64 {
841 i64::try_from(*self).unwrap_or(i64::MAX)
842 }
843}
844
845impl ResultToI64 for i64 {
846 fn to_i64(&self) -> i64 {
847 *self
848 }
849}
850
851impl ResultToI64 for Option<i64> {
852 fn to_i64(&self) -> i64 {
853 self.unwrap_or(0)
854 }
855}
856
857impl ResultToI64 for Option<f64> {
858 fn to_i64(&self) -> i64 {
859 #[allow(clippy::cast_possible_truncation)]
861 self.map(|f| f as i64).unwrap_or(0)
862 }
863}
864
865pub trait ResultToArrow {
870 fn to_arrow_array(&self) -> Arc<dyn ArrowArray>;
872
873 fn arrow_data_type(&self) -> DataType;
875}
876
877impl ResultToArrow for i64 {
878 fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
879 Arc::new(Int64Array::from(vec![*self]))
880 }
881
882 fn arrow_data_type(&self) -> DataType {
883 DataType::Int64
884 }
885}
886
887impl ResultToArrow for u64 {
888 fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
889 Arc::new(Int64Array::from(vec![
890 i64::try_from(*self).unwrap_or(i64::MAX)
891 ]))
892 }
893
894 fn arrow_data_type(&self) -> DataType {
895 DataType::Int64
896 }
897}
898
899impl ResultToArrow for Option<i64> {
900 fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
901 Arc::new(Int64Array::from(vec![*self]))
902 }
903
904 fn arrow_data_type(&self) -> DataType {
905 DataType::Int64
906 }
907}
908
909impl ResultToArrow for Option<f64> {
910 fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
911 use arrow_array::Float64Array;
912 Arc::new(Float64Array::from(vec![*self]))
913 }
914
915 fn arrow_data_type(&self) -> DataType {
916 DataType::Float64
917 }
918}
919
920pub trait Accumulator: Default + Clone + Send {
928 type Input;
930 type Output: ResultToI64 + ResultToArrow;
932
933 fn add(&mut self, value: Self::Input);
935
936 fn merge(&mut self, other: &Self);
938
939 fn result(&self) -> Self::Output;
941
942 fn is_empty(&self) -> bool;
944}
945
946pub trait Aggregator: Send + Clone {
951 type Acc: Accumulator;
953
954 fn create_accumulator(&self) -> Self::Acc;
956
957 fn extract(&self, event: &Event) -> Option<<Self::Acc as Accumulator>::Input>;
961
962 fn output_data_type(&self) -> DataType {
967 DataType::Int64
968 }
969
970 fn output_nullable(&self) -> bool {
974 false
975 }
976
977 fn extract_batch(&self, event: &Event) -> SmallVec<[<Self::Acc as Accumulator>::Input; 4]> {
983 let mut values = SmallVec::new();
984 if let Some(v) = self.extract(event) {
985 values.push(v);
986 }
987 values
988 }
989}
990
991#[derive(Debug, Clone, Default)]
993pub struct CountAggregator;
994
995#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
997pub struct CountAccumulator {
998 count: u64,
999}
1000
1001impl CountAggregator {
1002 #[must_use]
1004 pub fn new() -> Self {
1005 Self
1006 }
1007}
1008
1009impl Accumulator for CountAccumulator {
1010 type Input = ();
1011 type Output = u64;
1012
1013 fn add(&mut self, _value: ()) {
1014 self.count += 1;
1015 }
1016
1017 fn merge(&mut self, other: &Self) {
1018 self.count += other.count;
1019 }
1020
1021 fn result(&self) -> u64 {
1022 self.count
1023 }
1024
1025 fn is_empty(&self) -> bool {
1026 self.count == 0
1027 }
1028}
1029
1030impl Aggregator for CountAggregator {
1031 type Acc = CountAccumulator;
1032
1033 fn create_accumulator(&self) -> CountAccumulator {
1034 CountAccumulator::default()
1035 }
1036
1037 fn extract(&self, _event: &Event) -> Option<()> {
1038 Some(())
1039 }
1040}
1041
1042#[derive(Debug, Clone)]
1044pub struct SumAggregator {
1045 column_index: usize,
1047}
1048
1049#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1051pub struct SumAccumulator {
1052 sum: i64,
1053 count: u64,
1054}
1055
1056impl SumAggregator {
1057 #[must_use]
1059 pub fn new(column_index: usize) -> Self {
1060 Self { column_index }
1061 }
1062}
1063
1064impl Accumulator for SumAccumulator {
1065 type Input = i64;
1066 type Output = i64;
1067
1068 fn add(&mut self, value: i64) {
1069 self.sum += value;
1070 self.count += 1;
1071 }
1072
1073 fn merge(&mut self, other: &Self) {
1074 self.sum += other.sum;
1075 self.count += other.count;
1076 }
1077
1078 fn result(&self) -> i64 {
1079 self.sum
1080 }
1081
1082 fn is_empty(&self) -> bool {
1083 self.count == 0
1084 }
1085}
1086
1087impl Aggregator for SumAggregator {
1088 type Acc = SumAccumulator;
1089
1090 fn create_accumulator(&self) -> SumAccumulator {
1091 SumAccumulator::default()
1092 }
1093
1094 fn extract(&self, event: &Event) -> Option<i64> {
1095 use arrow_array::cast::AsArray;
1096 use arrow_array::types::Int64Type;
1097
1098 let batch = &event.data;
1099 if self.column_index >= batch.num_columns() {
1100 return None;
1101 }
1102
1103 let column = batch.column(self.column_index);
1104 let array = column.as_primitive_opt::<Int64Type>()?;
1105
1106 Some(array.iter().flatten().sum())
1108 }
1109}
1110
1111#[derive(Debug, Clone)]
1113pub struct MinAggregator {
1114 column_index: usize,
1115}
1116
1117#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1119pub struct MinAccumulator {
1120 min: Option<i64>,
1121}
1122
1123impl MinAggregator {
1124 #[must_use]
1126 pub fn new(column_index: usize) -> Self {
1127 Self { column_index }
1128 }
1129}
1130
1131impl Accumulator for MinAccumulator {
1132 type Input = i64;
1133 type Output = Option<i64>;
1134
1135 fn add(&mut self, value: i64) {
1136 self.min = Some(self.min.map_or(value, |m| m.min(value)));
1137 }
1138
1139 fn merge(&mut self, other: &Self) {
1140 if let Some(other_min) = other.min {
1141 self.add(other_min);
1142 }
1143 }
1144
1145 fn result(&self) -> Option<i64> {
1146 self.min
1147 }
1148
1149 fn is_empty(&self) -> bool {
1150 self.min.is_none()
1151 }
1152}
1153
1154impl Aggregator for MinAggregator {
1155 type Acc = MinAccumulator;
1156
1157 fn create_accumulator(&self) -> MinAccumulator {
1158 MinAccumulator::default()
1159 }
1160
1161 fn output_nullable(&self) -> bool {
1162 true
1163 }
1164
1165 fn extract(&self, event: &Event) -> Option<i64> {
1166 use arrow_array::cast::AsArray;
1167 use arrow_array::types::Int64Type;
1168
1169 let batch = &event.data;
1170 if self.column_index >= batch.num_columns() {
1171 return None;
1172 }
1173
1174 let column = batch.column(self.column_index);
1175 let array = column.as_primitive_opt::<Int64Type>()?;
1176
1177 array.iter().flatten().min()
1178 }
1179}
1180
1181#[derive(Debug, Clone)]
1183pub struct MaxAggregator {
1184 column_index: usize,
1185}
1186
1187#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1189pub struct MaxAccumulator {
1190 max: Option<i64>,
1191}
1192
1193impl MaxAggregator {
1194 #[must_use]
1196 pub fn new(column_index: usize) -> Self {
1197 Self { column_index }
1198 }
1199}
1200
1201impl Accumulator for MaxAccumulator {
1202 type Input = i64;
1203 type Output = Option<i64>;
1204
1205 fn add(&mut self, value: i64) {
1206 self.max = Some(self.max.map_or(value, |m| m.max(value)));
1207 }
1208
1209 fn merge(&mut self, other: &Self) {
1210 if let Some(other_max) = other.max {
1211 self.add(other_max);
1212 }
1213 }
1214
1215 fn result(&self) -> Option<i64> {
1216 self.max
1217 }
1218
1219 fn is_empty(&self) -> bool {
1220 self.max.is_none()
1221 }
1222}
1223
1224impl Aggregator for MaxAggregator {
1225 type Acc = MaxAccumulator;
1226
1227 fn create_accumulator(&self) -> MaxAccumulator {
1228 MaxAccumulator::default()
1229 }
1230
1231 fn output_nullable(&self) -> bool {
1232 true
1233 }
1234
1235 fn extract(&self, event: &Event) -> Option<i64> {
1236 use arrow_array::cast::AsArray;
1237 use arrow_array::types::Int64Type;
1238
1239 let batch = &event.data;
1240 if self.column_index >= batch.num_columns() {
1241 return None;
1242 }
1243
1244 let column = batch.column(self.column_index);
1245 let array = column.as_primitive_opt::<Int64Type>()?;
1246
1247 array.iter().flatten().max()
1248 }
1249}
1250
1251#[derive(Debug, Clone)]
1253pub struct AvgAggregator {
1254 column_index: usize,
1255}
1256
1257#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1259pub struct AvgAccumulator {
1260 sum: i64,
1261 count: u64,
1262}
1263
1264impl AvgAggregator {
1265 #[must_use]
1267 pub fn new(column_index: usize) -> Self {
1268 Self { column_index }
1269 }
1270}
1271
1272impl Accumulator for AvgAccumulator {
1273 type Input = i64;
1274 type Output = Option<f64>;
1275
1276 fn add(&mut self, value: i64) {
1277 self.sum += value;
1278 self.count += 1;
1279 }
1280
1281 fn merge(&mut self, other: &Self) {
1282 self.sum += other.sum;
1283 self.count += other.count;
1284 }
1285
1286 #[allow(clippy::cast_precision_loss)]
1288 fn result(&self) -> Option<f64> {
1289 if self.count == 0 {
1290 None
1291 } else {
1292 Some(self.sum as f64 / self.count as f64)
1293 }
1294 }
1295
1296 fn is_empty(&self) -> bool {
1297 self.count == 0
1298 }
1299}
1300
1301impl Aggregator for AvgAggregator {
1302 type Acc = AvgAccumulator;
1303
1304 fn create_accumulator(&self) -> AvgAccumulator {
1305 AvgAccumulator::default()
1306 }
1307
1308 fn output_data_type(&self) -> DataType {
1309 DataType::Float64
1310 }
1311
1312 fn output_nullable(&self) -> bool {
1313 true
1314 }
1315
1316 fn extract(&self, event: &Event) -> Option<i64> {
1317 use arrow_array::cast::AsArray;
1318 use arrow_array::types::Int64Type;
1319
1320 let batch = &event.data;
1321 if self.column_index >= batch.num_columns() {
1322 return None;
1323 }
1324
1325 let column = batch.column(self.column_index);
1326 let array = column.as_primitive_opt::<Int64Type>()?;
1327
1328 array.iter().flatten().next()
1330 }
1331
1332 fn extract_batch(&self, event: &Event) -> SmallVec<[i64; 4]> {
1333 use arrow_array::cast::AsArray;
1334 use arrow_array::types::Int64Type;
1335
1336 let batch = &event.data;
1337 if self.column_index >= batch.num_columns() {
1338 return SmallVec::new();
1339 }
1340
1341 let column = batch.column(self.column_index);
1342 let Some(array) = column.as_primitive_opt::<Int64Type>() else {
1343 return SmallVec::new();
1344 };
1345
1346 array.iter().flatten().collect()
1348 }
1349}
1350
1351#[derive(Debug, Clone)]
1367pub struct FirstValueAggregator {
1368 value_column_index: usize,
1370 timestamp_column_index: usize,
1372}
1373
1374#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1378#[rkyv(compare(PartialEq), derive(Debug))]
1379pub struct FirstValueAccumulator {
1380 value: Option<i64>,
1382 timestamp: Option<i64>,
1384}
1385
1386impl FirstValueAggregator {
1387 #[must_use]
1394 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1395 Self {
1396 value_column_index,
1397 timestamp_column_index,
1398 }
1399 }
1400}
1401
1402impl Accumulator for FirstValueAccumulator {
1403 type Input = (i64, i64); type Output = Option<i64>;
1405
1406 fn add(&mut self, (value, timestamp): (i64, i64)) {
1407 match self.timestamp {
1408 None => {
1409 self.value = Some(value);
1411 self.timestamp = Some(timestamp);
1412 }
1413 Some(existing_ts) if timestamp < existing_ts => {
1414 self.value = Some(value);
1416 self.timestamp = Some(timestamp);
1417 }
1418 _ => {
1419 }
1421 }
1422 }
1423
1424 fn merge(&mut self, other: &Self) {
1425 match (self.timestamp, other.timestamp) {
1426 (None, Some(_)) => {
1427 self.value = other.value;
1428 self.timestamp = other.timestamp;
1429 }
1430 (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1431 self.value = other.value;
1432 self.timestamp = other.timestamp;
1433 }
1434 _ => {
1435 }
1437 }
1438 }
1439
1440 fn result(&self) -> Option<i64> {
1441 self.value
1442 }
1443
1444 fn is_empty(&self) -> bool {
1445 self.value.is_none()
1446 }
1447}
1448
1449impl Aggregator for FirstValueAggregator {
1450 type Acc = FirstValueAccumulator;
1451
1452 fn create_accumulator(&self) -> FirstValueAccumulator {
1453 FirstValueAccumulator::default()
1454 }
1455
1456 fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1457 use arrow_array::cast::AsArray;
1458 use arrow_array::types::Int64Type;
1459
1460 let batch = &event.data;
1461 if self.value_column_index >= batch.num_columns()
1462 || self.timestamp_column_index >= batch.num_columns()
1463 {
1464 return None;
1465 }
1466
1467 let value_col = batch.column(self.value_column_index);
1469 let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1470 let value = value_array.iter().flatten().next()?;
1471
1472 let ts_col = batch.column(self.timestamp_column_index);
1474 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1475 let timestamp = ts_array.iter().flatten().next()?;
1476
1477 Some((value, timestamp))
1478 }
1479}
1480
1481#[derive(Debug, Clone)]
1496pub struct LastValueAggregator {
1497 value_column_index: usize,
1499 timestamp_column_index: usize,
1501}
1502
1503#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1507#[rkyv(compare(PartialEq), derive(Debug))]
1508pub struct LastValueAccumulator {
1509 value: Option<i64>,
1511 timestamp: Option<i64>,
1513}
1514
1515impl LastValueAggregator {
1516 #[must_use]
1523 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1524 Self {
1525 value_column_index,
1526 timestamp_column_index,
1527 }
1528 }
1529}
1530
1531impl Accumulator for LastValueAccumulator {
1532 type Input = (i64, i64); type Output = Option<i64>;
1534
1535 fn add(&mut self, (value, timestamp): (i64, i64)) {
1536 match self.timestamp {
1537 None => {
1538 self.value = Some(value);
1540 self.timestamp = Some(timestamp);
1541 }
1542 Some(existing_ts) if timestamp > existing_ts => {
1543 self.value = Some(value);
1545 self.timestamp = Some(timestamp);
1546 }
1547 Some(existing_ts) if timestamp == existing_ts => {
1548 self.value = Some(value);
1550 }
1551 _ => {
1552 }
1554 }
1555 }
1556
1557 fn merge(&mut self, other: &Self) {
1558 match (self.timestamp, other.timestamp) {
1559 (None, Some(_)) => {
1560 self.value = other.value;
1561 self.timestamp = other.timestamp;
1562 }
1563 (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1564 self.value = other.value;
1565 self.timestamp = other.timestamp;
1566 }
1567 (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1568 self.value = other.value;
1570 }
1571 _ => {
1572 }
1574 }
1575 }
1576
1577 fn result(&self) -> Option<i64> {
1578 self.value
1579 }
1580
1581 fn is_empty(&self) -> bool {
1582 self.value.is_none()
1583 }
1584}
1585
1586impl Aggregator for LastValueAggregator {
1587 type Acc = LastValueAccumulator;
1588
1589 fn create_accumulator(&self) -> LastValueAccumulator {
1590 LastValueAccumulator::default()
1591 }
1592
1593 fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1594 use arrow_array::cast::AsArray;
1595 use arrow_array::types::Int64Type;
1596
1597 let batch = &event.data;
1598 if self.value_column_index >= batch.num_columns()
1599 || self.timestamp_column_index >= batch.num_columns()
1600 {
1601 return None;
1602 }
1603
1604 let value_col = batch.column(self.value_column_index);
1606 let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1607 let value = value_array.iter().flatten().next()?;
1608
1609 let ts_col = batch.column(self.timestamp_column_index);
1611 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1612 let timestamp = ts_array.iter().flatten().next()?;
1613
1614 Some((value, timestamp))
1615 }
1616}
1617
1618#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1622#[rkyv(compare(PartialEq), derive(Debug))]
1623pub struct FirstValueF64Accumulator {
1624 value: Option<i64>, timestamp: Option<i64>,
1628}
1629
1630impl FirstValueF64Accumulator {
1631 #[must_use]
1633 #[allow(clippy::cast_sign_loss)]
1634 pub fn result_f64(&self) -> Option<f64> {
1635 self.value.map(|bits| f64::from_bits(bits as u64))
1636 }
1637}
1638
1639impl Accumulator for FirstValueF64Accumulator {
1640 type Input = (f64, i64); type Output = Option<f64>;
1642
1643 fn add(&mut self, (value, timestamp): (f64, i64)) {
1644 #[allow(clippy::cast_possible_wrap)]
1646 let value_bits = value.to_bits() as i64;
1647 match self.timestamp {
1648 None => {
1649 self.value = Some(value_bits);
1650 self.timestamp = Some(timestamp);
1651 }
1652 Some(existing_ts) if timestamp < existing_ts => {
1653 self.value = Some(value_bits);
1654 self.timestamp = Some(timestamp);
1655 }
1656 _ => {}
1657 }
1658 }
1659
1660 fn merge(&mut self, other: &Self) {
1661 match (self.timestamp, other.timestamp) {
1662 (None, Some(_)) => {
1663 self.value = other.value;
1664 self.timestamp = other.timestamp;
1665 }
1666 (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1667 self.value = other.value;
1668 self.timestamp = other.timestamp;
1669 }
1670 _ => {}
1671 }
1672 }
1673
1674 #[allow(clippy::cast_sign_loss)]
1675 fn result(&self) -> Option<f64> {
1676 self.value.map(|bits| f64::from_bits(bits as u64))
1677 }
1678
1679 fn is_empty(&self) -> bool {
1680 self.value.is_none()
1681 }
1682}
1683
1684#[derive(Debug, Clone)]
1686pub struct FirstValueF64Aggregator {
1687 value_column_index: usize,
1689 timestamp_column_index: usize,
1691}
1692
1693impl FirstValueF64Aggregator {
1694 #[must_use]
1696 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1697 Self {
1698 value_column_index,
1699 timestamp_column_index,
1700 }
1701 }
1702}
1703
1704impl Aggregator for FirstValueF64Aggregator {
1705 type Acc = FirstValueF64Accumulator;
1706
1707 fn create_accumulator(&self) -> FirstValueF64Accumulator {
1708 FirstValueF64Accumulator::default()
1709 }
1710
1711 fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1712 use arrow_array::cast::AsArray;
1713 use arrow_array::types::{Float64Type, Int64Type};
1714
1715 let batch = &event.data;
1716 if self.value_column_index >= batch.num_columns()
1717 || self.timestamp_column_index >= batch.num_columns()
1718 {
1719 return None;
1720 }
1721
1722 let value_col = batch.column(self.value_column_index);
1724 let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1725 let value = value_array.iter().flatten().next()?;
1726
1727 let ts_col = batch.column(self.timestamp_column_index);
1729 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1730 let timestamp = ts_array.iter().flatten().next()?;
1731
1732 Some((value, timestamp))
1733 }
1734}
1735
1736#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1738#[rkyv(compare(PartialEq), derive(Debug))]
1739pub struct LastValueF64Accumulator {
1740 value: Option<i64>, timestamp: Option<i64>,
1744}
1745
1746impl LastValueF64Accumulator {
1747 #[must_use]
1749 #[allow(clippy::cast_sign_loss)]
1750 pub fn result_f64(&self) -> Option<f64> {
1751 self.value.map(|bits| f64::from_bits(bits as u64))
1752 }
1753}
1754
1755impl Accumulator for LastValueF64Accumulator {
1756 type Input = (f64, i64); type Output = Option<f64>;
1758
1759 fn add(&mut self, (value, timestamp): (f64, i64)) {
1760 #[allow(clippy::cast_possible_wrap)]
1762 let value_bits = value.to_bits() as i64;
1763 match self.timestamp {
1764 None => {
1765 self.value = Some(value_bits);
1766 self.timestamp = Some(timestamp);
1767 }
1768 Some(existing_ts) if timestamp > existing_ts => {
1769 self.value = Some(value_bits);
1770 self.timestamp = Some(timestamp);
1771 }
1772 Some(existing_ts) if timestamp == existing_ts => {
1773 self.value = Some(value_bits);
1774 }
1775 _ => {}
1776 }
1777 }
1778
1779 fn merge(&mut self, other: &Self) {
1780 match (self.timestamp, other.timestamp) {
1781 (None, Some(_)) => {
1782 self.value = other.value;
1783 self.timestamp = other.timestamp;
1784 }
1785 (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1786 self.value = other.value;
1787 self.timestamp = other.timestamp;
1788 }
1789 (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1790 self.value = other.value;
1791 }
1792 _ => {}
1793 }
1794 }
1795
1796 #[allow(clippy::cast_sign_loss)]
1797 fn result(&self) -> Option<f64> {
1798 self.value.map(|bits| f64::from_bits(bits as u64))
1799 }
1800
1801 fn is_empty(&self) -> bool {
1802 self.value.is_none()
1803 }
1804}
1805
1806#[derive(Debug, Clone)]
1808pub struct LastValueF64Aggregator {
1809 value_column_index: usize,
1811 timestamp_column_index: usize,
1813}
1814
1815impl LastValueF64Aggregator {
1816 #[must_use]
1818 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1819 Self {
1820 value_column_index,
1821 timestamp_column_index,
1822 }
1823 }
1824}
1825
1826impl Aggregator for LastValueF64Aggregator {
1827 type Acc = LastValueF64Accumulator;
1828
1829 fn create_accumulator(&self) -> LastValueF64Accumulator {
1830 LastValueF64Accumulator::default()
1831 }
1832
1833 fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1834 use arrow_array::cast::AsArray;
1835 use arrow_array::types::{Float64Type, Int64Type};
1836
1837 let batch = &event.data;
1838 if self.value_column_index >= batch.num_columns()
1839 || self.timestamp_column_index >= batch.num_columns()
1840 {
1841 return None;
1842 }
1843
1844 let value_col = batch.column(self.value_column_index);
1846 let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1847 let value = value_array.iter().flatten().next()?;
1848
1849 let ts_col = batch.column(self.timestamp_column_index);
1851 let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1852 let timestamp = ts_array.iter().flatten().next()?;
1853
1854 Some((value, timestamp))
1855 }
1856}
1857
1858#[derive(Debug, Clone, PartialEq)]
1876pub enum ScalarResult {
1877 Int64(i64),
1879 Float64(f64),
1881 UInt64(u64),
1883 OptionalInt64(Option<i64>),
1885 OptionalFloat64(Option<f64>),
1887 Null,
1889}
1890
1891impl ScalarResult {
1892 #[must_use]
1894 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
1895 pub fn to_i64_lossy(&self) -> i64 {
1896 match self {
1897 Self::Int64(v) => *v,
1898 Self::Float64(v) => *v as i64,
1899 Self::UInt64(v) => i64::try_from(*v).unwrap_or(i64::MAX),
1900 Self::OptionalInt64(v) => v.unwrap_or(0),
1901 Self::OptionalFloat64(v) => v.map(|f| f as i64).unwrap_or(0),
1902 Self::Null => 0,
1903 }
1904 }
1905
1906 #[must_use]
1908 #[allow(clippy::cast_precision_loss)]
1909 pub fn to_f64_lossy(&self) -> f64 {
1910 match self {
1911 Self::Int64(v) => *v as f64,
1912 Self::Float64(v) => *v,
1913 Self::UInt64(v) => *v as f64,
1914 Self::OptionalInt64(v) => v.map(|i| i as f64).unwrap_or(0.0),
1915 Self::OptionalFloat64(v) => v.unwrap_or(0.0),
1916 Self::Null => 0.0,
1917 }
1918 }
1919
1920 #[must_use]
1922 pub fn is_null(&self) -> bool {
1923 matches!(
1924 self,
1925 Self::Null | Self::OptionalInt64(None) | Self::OptionalFloat64(None)
1926 )
1927 }
1928
1929 #[must_use]
1931 pub fn data_type(&self) -> DataType {
1932 match self {
1933 Self::Int64(_) | Self::OptionalInt64(_) => DataType::Int64,
1934 Self::Float64(_) | Self::OptionalFloat64(_) => DataType::Float64,
1935 Self::UInt64(_) => DataType::UInt64,
1936 Self::Null => DataType::Null,
1937 }
1938 }
1939}
1940
1941pub trait DynAccumulator: Send {
1953 fn add_event(&mut self, event: &Event);
1955
1956 fn merge_dyn(&mut self, other: &dyn DynAccumulator);
1962
1963 fn result_scalar(&self) -> ScalarResult;
1965
1966 fn is_empty(&self) -> bool;
1968
1969 fn clone_box(&self) -> Box<dyn DynAccumulator>;
1971
1972 fn serialize(&self) -> Vec<u8>;
1974
1975 fn result_field(&self) -> Field;
1977
1978 fn type_tag(&self) -> &'static str;
1980
1981 fn as_any(&self) -> &dyn std::any::Any;
1983}
1984
1985pub trait DynAggregatorFactory: Send + Sync {
1990 fn create_accumulator(&self) -> Box<dyn DynAccumulator>;
1992
1993 fn result_field(&self) -> Field;
1995
1996 fn clone_box(&self) -> Box<dyn DynAggregatorFactory>;
1998
1999 fn type_tag(&self) -> &'static str;
2001}
2002
2003#[derive(Debug, Clone)]
2007pub struct SumF64Aggregator {
2008 column_index: usize,
2010}
2011
2012#[derive(Debug, Clone, Default)]
2014pub struct SumF64Accumulator {
2015 sum: f64,
2017 count: u64,
2019}
2020
2021impl SumF64Aggregator {
2022 #[must_use]
2024 pub fn new(column_index: usize) -> Self {
2025 Self { column_index }
2026 }
2027
2028 #[must_use]
2030 pub fn column_index(&self) -> usize {
2031 self.column_index
2032 }
2033}
2034
2035impl SumF64Accumulator {
2036 #[must_use]
2038 pub fn sum(&self) -> f64 {
2039 self.sum
2040 }
2041}
2042
2043impl DynAccumulator for SumF64Accumulator {
2044 fn add_event(&mut self, event: &Event) {
2045 use arrow_array::cast::AsArray;
2046 use arrow_array::types::Float64Type;
2047
2048 let batch = &event.data;
2051 if batch.num_columns() == 0 {
2052 return;
2053 }
2054 if let Some(array) = batch.column(0).as_primitive_opt::<Float64Type>() {
2056 for val in array.iter().flatten() {
2057 self.sum += val;
2058 self.count += 1;
2059 }
2060 }
2061 }
2062
2063 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2064 let data = other.serialize();
2065 if let (Some(sum_bytes), Some(count_bytes)) = (
2066 data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2067 data.get(8..16).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2068 ) {
2069 self.sum += f64::from_le_bytes(sum_bytes);
2070 self.count += u64::from_le_bytes(count_bytes);
2071 }
2072 }
2073
2074 fn result_scalar(&self) -> ScalarResult {
2075 if self.count == 0 {
2076 ScalarResult::Null
2077 } else {
2078 ScalarResult::Float64(self.sum)
2079 }
2080 }
2081
2082 fn is_empty(&self) -> bool {
2083 self.count == 0
2084 }
2085
2086 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2087 Box::new(self.clone())
2088 }
2089
2090 fn serialize(&self) -> Vec<u8> {
2091 let mut buf = Vec::with_capacity(16);
2092 buf.extend_from_slice(&self.sum.to_le_bytes());
2093 buf.extend_from_slice(&self.count.to_le_bytes());
2094 buf
2095 }
2096
2097 fn result_field(&self) -> Field {
2098 Field::new("sum_f64", DataType::Float64, true)
2099 }
2100
2101 fn type_tag(&self) -> &'static str {
2102 "sum_f64"
2103 }
2104
2105 fn as_any(&self) -> &dyn std::any::Any {
2106 self
2107 }
2108}
2109
2110#[derive(Debug, Clone)]
2112pub struct SumF64Factory {
2113 column_index: usize,
2115 field_name: String,
2117}
2118
2119impl SumF64Factory {
2120 #[must_use]
2122 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2123 Self {
2124 column_index,
2125 field_name: field_name.into(),
2126 }
2127 }
2128}
2129
2130impl DynAggregatorFactory for SumF64Factory {
2131 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2132 Box::new(SumF64IndexedAccumulator::new(self.column_index))
2133 }
2134
2135 fn result_field(&self) -> Field {
2136 Field::new(&self.field_name, DataType::Float64, true)
2137 }
2138
2139 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2140 Box::new(self.clone())
2141 }
2142
2143 fn type_tag(&self) -> &'static str {
2144 "sum_f64"
2145 }
2146}
2147
2148#[derive(Debug, Clone)]
2150pub struct SumF64IndexedAccumulator {
2151 column_index: usize,
2153 sum: f64,
2155 count: u64,
2157}
2158
2159impl SumF64IndexedAccumulator {
2160 #[must_use]
2162 pub fn new(column_index: usize) -> Self {
2163 Self {
2164 column_index,
2165 sum: 0.0,
2166 count: 0,
2167 }
2168 }
2169}
2170
2171impl DynAccumulator for SumF64IndexedAccumulator {
2172 fn add_event(&mut self, event: &Event) {
2173 use arrow_array::cast::AsArray;
2174 use arrow_array::types::Float64Type;
2175
2176 let batch = &event.data;
2177 if self.column_index >= batch.num_columns() {
2178 return;
2179 }
2180 if let Some(array) = batch
2181 .column(self.column_index)
2182 .as_primitive_opt::<Float64Type>()
2183 {
2184 for val in array.iter().flatten() {
2185 self.sum += val;
2186 self.count += 1;
2187 }
2188 }
2189 }
2190
2191 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2192 let data = other.serialize();
2193 if let (Some(sum_bytes), Some(count_bytes)) = (
2194 data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2195 data.get(8..16).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2196 ) {
2197 self.sum += f64::from_le_bytes(sum_bytes);
2198 self.count += u64::from_le_bytes(count_bytes);
2199 }
2200 }
2201
2202 fn result_scalar(&self) -> ScalarResult {
2203 if self.count == 0 {
2204 ScalarResult::Null
2205 } else {
2206 ScalarResult::Float64(self.sum)
2207 }
2208 }
2209
2210 fn is_empty(&self) -> bool {
2211 self.count == 0
2212 }
2213
2214 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2215 Box::new(self.clone())
2216 }
2217
2218 fn serialize(&self) -> Vec<u8> {
2219 let mut buf = Vec::with_capacity(16);
2220 buf.extend_from_slice(&self.sum.to_le_bytes());
2221 buf.extend_from_slice(&self.count.to_le_bytes());
2222 buf
2223 }
2224
2225 fn result_field(&self) -> Field {
2226 Field::new("sum_f64", DataType::Float64, true)
2227 }
2228
2229 fn type_tag(&self) -> &'static str {
2230 "sum_f64"
2231 }
2232
2233 fn as_any(&self) -> &dyn std::any::Any {
2234 self
2235 }
2236}
2237
2238#[derive(Debug, Clone)]
2240pub struct MinF64Factory {
2241 column_index: usize,
2243 field_name: String,
2245}
2246
2247impl MinF64Factory {
2248 #[must_use]
2250 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2251 Self {
2252 column_index,
2253 field_name: field_name.into(),
2254 }
2255 }
2256}
2257
2258impl DynAggregatorFactory for MinF64Factory {
2259 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2260 Box::new(MinF64IndexedAccumulator::new(self.column_index))
2261 }
2262
2263 fn result_field(&self) -> Field {
2264 Field::new(&self.field_name, DataType::Float64, true)
2265 }
2266
2267 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2268 Box::new(self.clone())
2269 }
2270
2271 fn type_tag(&self) -> &'static str {
2272 "min_f64"
2273 }
2274}
2275
2276#[derive(Debug, Clone)]
2278pub struct MinF64IndexedAccumulator {
2279 column_index: usize,
2281 min: Option<f64>,
2283}
2284
2285impl MinF64IndexedAccumulator {
2286 #[must_use]
2288 pub fn new(column_index: usize) -> Self {
2289 Self {
2290 column_index,
2291 min: None,
2292 }
2293 }
2294}
2295
2296impl DynAccumulator for MinF64IndexedAccumulator {
2297 fn add_event(&mut self, event: &Event) {
2298 use arrow_array::cast::AsArray;
2299 use arrow_array::types::Float64Type;
2300
2301 let batch = &event.data;
2302 if self.column_index >= batch.num_columns() {
2303 return;
2304 }
2305 if let Some(array) = batch
2306 .column(self.column_index)
2307 .as_primitive_opt::<Float64Type>()
2308 {
2309 for val in array.iter().flatten() {
2310 self.min = Some(self.min.map_or(val, |m: f64| m.min(val)));
2311 }
2312 }
2313 }
2314
2315 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2316 let data = other.serialize();
2317 if data.len() >= 9 && data[0] == 1 {
2318 if let Ok(bytes) = <[u8; 8]>::try_from(&data[1..9]) {
2319 let other_min = f64::from_le_bytes(bytes);
2320 self.min = Some(self.min.map_or(other_min, |m: f64| m.min(other_min)));
2321 }
2322 }
2323 }
2324
2325 fn result_scalar(&self) -> ScalarResult {
2326 ScalarResult::OptionalFloat64(self.min)
2327 }
2328
2329 fn is_empty(&self) -> bool {
2330 self.min.is_none()
2331 }
2332
2333 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2334 Box::new(self.clone())
2335 }
2336
2337 fn serialize(&self) -> Vec<u8> {
2338 match self.min {
2339 Some(v) => {
2340 let mut buf = Vec::with_capacity(9);
2341 buf.push(1); buf.extend_from_slice(&v.to_le_bytes());
2343 buf
2344 }
2345 None => vec![0],
2346 }
2347 }
2348
2349 fn result_field(&self) -> Field {
2350 Field::new("min_f64", DataType::Float64, true)
2351 }
2352
2353 fn type_tag(&self) -> &'static str {
2354 "min_f64"
2355 }
2356
2357 fn as_any(&self) -> &dyn std::any::Any {
2358 self
2359 }
2360}
2361
2362#[derive(Debug, Clone)]
2364pub struct MaxF64Factory {
2365 column_index: usize,
2367 field_name: String,
2369}
2370
2371impl MaxF64Factory {
2372 #[must_use]
2374 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2375 Self {
2376 column_index,
2377 field_name: field_name.into(),
2378 }
2379 }
2380}
2381
2382impl DynAggregatorFactory for MaxF64Factory {
2383 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2384 Box::new(MaxF64IndexedAccumulator::new(self.column_index))
2385 }
2386
2387 fn result_field(&self) -> Field {
2388 Field::new(&self.field_name, DataType::Float64, true)
2389 }
2390
2391 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2392 Box::new(self.clone())
2393 }
2394
2395 fn type_tag(&self) -> &'static str {
2396 "max_f64"
2397 }
2398}
2399
2400#[derive(Debug, Clone)]
2402pub struct MaxF64IndexedAccumulator {
2403 column_index: usize,
2405 max: Option<f64>,
2407}
2408
2409impl MaxF64IndexedAccumulator {
2410 #[must_use]
2412 pub fn new(column_index: usize) -> Self {
2413 Self {
2414 column_index,
2415 max: None,
2416 }
2417 }
2418}
2419
2420impl DynAccumulator for MaxF64IndexedAccumulator {
2421 fn add_event(&mut self, event: &Event) {
2422 use arrow_array::cast::AsArray;
2423 use arrow_array::types::Float64Type;
2424
2425 let batch = &event.data;
2426 if self.column_index >= batch.num_columns() {
2427 return;
2428 }
2429 if let Some(array) = batch
2430 .column(self.column_index)
2431 .as_primitive_opt::<Float64Type>()
2432 {
2433 for val in array.iter().flatten() {
2434 self.max = Some(self.max.map_or(val, |m: f64| m.max(val)));
2435 }
2436 }
2437 }
2438
2439 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2440 let data = other.serialize();
2441 if data.len() >= 9 && data[0] == 1 {
2442 if let Ok(bytes) = <[u8; 8]>::try_from(&data[1..9]) {
2443 let other_max = f64::from_le_bytes(bytes);
2444 self.max = Some(self.max.map_or(other_max, |m: f64| m.max(other_max)));
2445 }
2446 }
2447 }
2448
2449 fn result_scalar(&self) -> ScalarResult {
2450 ScalarResult::OptionalFloat64(self.max)
2451 }
2452
2453 fn is_empty(&self) -> bool {
2454 self.max.is_none()
2455 }
2456
2457 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2458 Box::new(self.clone())
2459 }
2460
2461 fn serialize(&self) -> Vec<u8> {
2462 match self.max {
2463 Some(v) => {
2464 let mut buf = Vec::with_capacity(9);
2465 buf.push(1);
2466 buf.extend_from_slice(&v.to_le_bytes());
2467 buf
2468 }
2469 None => vec![0],
2470 }
2471 }
2472
2473 fn result_field(&self) -> Field {
2474 Field::new("max_f64", DataType::Float64, true)
2475 }
2476
2477 fn type_tag(&self) -> &'static str {
2478 "max_f64"
2479 }
2480
2481 fn as_any(&self) -> &dyn std::any::Any {
2482 self
2483 }
2484}
2485
2486#[derive(Debug, Clone)]
2488pub struct AvgF64Factory {
2489 column_index: usize,
2491 field_name: String,
2493}
2494
2495impl AvgF64Factory {
2496 #[must_use]
2498 pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2499 Self {
2500 column_index,
2501 field_name: field_name.into(),
2502 }
2503 }
2504}
2505
2506impl DynAggregatorFactory for AvgF64Factory {
2507 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2508 Box::new(AvgF64IndexedAccumulator::new(self.column_index))
2509 }
2510
2511 fn result_field(&self) -> Field {
2512 Field::new(&self.field_name, DataType::Float64, true)
2513 }
2514
2515 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2516 Box::new(self.clone())
2517 }
2518
2519 fn type_tag(&self) -> &'static str {
2520 "avg_f64"
2521 }
2522}
2523
2524#[derive(Debug, Clone)]
2526pub struct AvgF64IndexedAccumulator {
2527 column_index: usize,
2529 sum: f64,
2531 count: u64,
2533}
2534
2535impl AvgF64IndexedAccumulator {
2536 #[must_use]
2538 pub fn new(column_index: usize) -> Self {
2539 Self {
2540 column_index,
2541 sum: 0.0,
2542 count: 0,
2543 }
2544 }
2545}
2546
2547impl DynAccumulator for AvgF64IndexedAccumulator {
2548 fn add_event(&mut self, event: &Event) {
2549 use arrow_array::cast::AsArray;
2550 use arrow_array::types::Float64Type;
2551
2552 let batch = &event.data;
2553 if self.column_index >= batch.num_columns() {
2554 return;
2555 }
2556 if let Some(array) = batch
2557 .column(self.column_index)
2558 .as_primitive_opt::<Float64Type>()
2559 {
2560 for val in array.iter().flatten() {
2561 self.sum += val;
2562 self.count += 1;
2563 }
2564 }
2565 }
2566
2567 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2568 let data = other.serialize();
2569 if let (Some(sum_bytes), Some(count_bytes)) = (
2570 data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2571 data.get(8..16).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2572 ) {
2573 self.sum += f64::from_le_bytes(sum_bytes);
2574 self.count += u64::from_le_bytes(count_bytes);
2575 }
2576 }
2577
2578 #[allow(clippy::cast_precision_loss)]
2580 fn result_scalar(&self) -> ScalarResult {
2581 if self.count == 0 {
2582 ScalarResult::Null
2583 } else {
2584 ScalarResult::Float64(self.sum / self.count as f64)
2585 }
2586 }
2587
2588 fn is_empty(&self) -> bool {
2589 self.count == 0
2590 }
2591
2592 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2593 Box::new(self.clone())
2594 }
2595
2596 fn serialize(&self) -> Vec<u8> {
2597 let mut buf = Vec::with_capacity(16);
2598 buf.extend_from_slice(&self.sum.to_le_bytes());
2599 buf.extend_from_slice(&self.count.to_le_bytes());
2600 buf
2601 }
2602
2603 fn result_field(&self) -> Field {
2604 Field::new("avg_f64", DataType::Float64, true)
2605 }
2606
2607 fn type_tag(&self) -> &'static str {
2608 "avg_f64"
2609 }
2610
2611 fn as_any(&self) -> &dyn std::any::Any {
2612 self
2613 }
2614}
2615
2616#[derive(Debug, Clone)]
2620pub struct CountDynFactory {
2621 field_name: String,
2623}
2624
2625impl CountDynFactory {
2626 #[must_use]
2628 pub fn new(field_name: impl Into<String>) -> Self {
2629 Self {
2630 field_name: field_name.into(),
2631 }
2632 }
2633}
2634
2635impl DynAggregatorFactory for CountDynFactory {
2636 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2637 Box::new(CountDynAccumulator::default())
2638 }
2639
2640 fn result_field(&self) -> Field {
2641 Field::new(&self.field_name, DataType::Int64, false)
2642 }
2643
2644 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2645 Box::new(self.clone())
2646 }
2647
2648 fn type_tag(&self) -> &'static str {
2649 "count"
2650 }
2651}
2652
2653#[derive(Debug, Clone, Default)]
2655pub struct CountDynAccumulator {
2656 count: u64,
2657}
2658
2659impl DynAccumulator for CountDynAccumulator {
2660 fn add_event(&mut self, event: &Event) {
2661 let rows = event.data.num_rows();
2662 self.count += rows as u64;
2663 }
2664
2665 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2666 let data = other.serialize();
2667 if let Some(bytes) = data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()) {
2668 self.count += u64::from_le_bytes(bytes);
2669 }
2670 }
2671
2672 fn result_scalar(&self) -> ScalarResult {
2673 ScalarResult::Int64(i64::try_from(self.count).unwrap_or(i64::MAX))
2674 }
2675
2676 fn is_empty(&self) -> bool {
2677 self.count == 0
2678 }
2679
2680 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2681 Box::new(self.clone())
2682 }
2683
2684 fn serialize(&self) -> Vec<u8> {
2685 self.count.to_le_bytes().to_vec()
2686 }
2687
2688 fn result_field(&self) -> Field {
2689 Field::new("count", DataType::Int64, false)
2690 }
2691
2692 fn type_tag(&self) -> &'static str {
2693 "count"
2694 }
2695
2696 fn as_any(&self) -> &dyn std::any::Any {
2697 self
2698 }
2699}
2700
2701#[derive(Debug, Clone)]
2705pub struct FirstValueF64DynFactory {
2706 value_column_index: usize,
2708 timestamp_column_index: usize,
2710 field_name: String,
2712}
2713
2714impl FirstValueF64DynFactory {
2715 #[must_use]
2717 pub fn new(
2718 value_column_index: usize,
2719 timestamp_column_index: usize,
2720 field_name: impl Into<String>,
2721 ) -> Self {
2722 Self {
2723 value_column_index,
2724 timestamp_column_index,
2725 field_name: field_name.into(),
2726 }
2727 }
2728}
2729
2730impl DynAggregatorFactory for FirstValueF64DynFactory {
2731 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2732 Box::new(FirstValueF64DynAccumulator::new(
2733 self.value_column_index,
2734 self.timestamp_column_index,
2735 ))
2736 }
2737
2738 fn result_field(&self) -> Field {
2739 Field::new(&self.field_name, DataType::Float64, true)
2740 }
2741
2742 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2743 Box::new(self.clone())
2744 }
2745
2746 fn type_tag(&self) -> &'static str {
2747 "first_value_f64"
2748 }
2749}
2750
2751#[derive(Debug, Clone)]
2753pub struct FirstValueF64DynAccumulator {
2754 value_column_index: usize,
2755 timestamp_column_index: usize,
2756 value: Option<f64>,
2757 timestamp: Option<i64>,
2758}
2759
2760impl FirstValueF64DynAccumulator {
2761 #[must_use]
2763 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2764 Self {
2765 value_column_index,
2766 timestamp_column_index,
2767 value: None,
2768 timestamp: None,
2769 }
2770 }
2771}
2772
2773impl DynAccumulator for FirstValueF64DynAccumulator {
2774 fn add_event(&mut self, event: &Event) {
2775 use arrow_array::cast::AsArray;
2776 use arrow_array::types::{Float64Type, Int64Type};
2777
2778 let batch = &event.data;
2779 if self.value_column_index >= batch.num_columns()
2780 || self.timestamp_column_index >= batch.num_columns()
2781 {
2782 return;
2783 }
2784
2785 let val_col = batch.column(self.value_column_index);
2786 let ts_col = batch.column(self.timestamp_column_index);
2787
2788 let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2789 return;
2790 };
2791 let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2792 return;
2793 };
2794
2795 for i in 0..batch.num_rows() {
2796 if val_array.is_null(i) || ts_array.is_null(i) {
2797 continue;
2798 }
2799 let val = val_array.value(i);
2800 let ts = ts_array.value(i);
2801
2802 match self.timestamp {
2803 None => {
2804 self.value = Some(val);
2805 self.timestamp = Some(ts);
2806 }
2807 Some(existing_ts) if ts < existing_ts => {
2808 self.value = Some(val);
2809 self.timestamp = Some(ts);
2810 }
2811 _ => {}
2812 }
2813 }
2814 }
2815
2816 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2817 let data = other.serialize();
2818 if data.len() >= 17 && data[0] == 1 {
2819 let (Some(val_bytes), Some(ts_bytes)) = (
2820 <[u8; 8]>::try_from(&data[1..9]).ok(),
2821 <[u8; 8]>::try_from(&data[9..17]).ok(),
2822 ) else {
2823 return;
2824 };
2825 let other_val = f64::from_le_bytes(val_bytes);
2826 let other_ts = i64::from_le_bytes(ts_bytes);
2827 match self.timestamp {
2828 None => {
2829 self.value = Some(other_val);
2830 self.timestamp = Some(other_ts);
2831 }
2832 Some(self_ts) if other_ts < self_ts => {
2833 self.value = Some(other_val);
2834 self.timestamp = Some(other_ts);
2835 }
2836 _ => {}
2837 }
2838 }
2839 }
2840
2841 fn result_scalar(&self) -> ScalarResult {
2842 ScalarResult::OptionalFloat64(self.value)
2843 }
2844
2845 fn is_empty(&self) -> bool {
2846 self.value.is_none()
2847 }
2848
2849 fn clone_box(&self) -> Box<dyn DynAccumulator> {
2850 Box::new(self.clone())
2851 }
2852
2853 fn serialize(&self) -> Vec<u8> {
2854 match (self.value, self.timestamp) {
2855 (Some(v), Some(ts)) => {
2856 let mut buf = Vec::with_capacity(17);
2857 buf.push(1);
2858 buf.extend_from_slice(&v.to_le_bytes());
2859 buf.extend_from_slice(&ts.to_le_bytes());
2860 buf
2861 }
2862 _ => vec![0],
2863 }
2864 }
2865
2866 fn result_field(&self) -> Field {
2867 Field::new("first_value_f64", DataType::Float64, true)
2868 }
2869
2870 fn type_tag(&self) -> &'static str {
2871 "first_value_f64"
2872 }
2873
2874 fn as_any(&self) -> &dyn std::any::Any {
2875 self
2876 }
2877}
2878
2879#[derive(Debug, Clone)]
2881pub struct LastValueF64DynFactory {
2882 value_column_index: usize,
2884 timestamp_column_index: usize,
2886 field_name: String,
2888}
2889
2890impl LastValueF64DynFactory {
2891 #[must_use]
2893 pub fn new(
2894 value_column_index: usize,
2895 timestamp_column_index: usize,
2896 field_name: impl Into<String>,
2897 ) -> Self {
2898 Self {
2899 value_column_index,
2900 timestamp_column_index,
2901 field_name: field_name.into(),
2902 }
2903 }
2904}
2905
2906impl DynAggregatorFactory for LastValueF64DynFactory {
2907 fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2908 Box::new(LastValueF64DynAccumulator::new(
2909 self.value_column_index,
2910 self.timestamp_column_index,
2911 ))
2912 }
2913
2914 fn result_field(&self) -> Field {
2915 Field::new(&self.field_name, DataType::Float64, true)
2916 }
2917
2918 fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2919 Box::new(self.clone())
2920 }
2921
2922 fn type_tag(&self) -> &'static str {
2923 "last_value_f64"
2924 }
2925}
2926
2927#[derive(Debug, Clone)]
2929pub struct LastValueF64DynAccumulator {
2930 value_column_index: usize,
2931 timestamp_column_index: usize,
2932 value: Option<f64>,
2933 timestamp: Option<i64>,
2934}
2935
2936impl LastValueF64DynAccumulator {
2937 #[must_use]
2939 pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2940 Self {
2941 value_column_index,
2942 timestamp_column_index,
2943 value: None,
2944 timestamp: None,
2945 }
2946 }
2947}
2948
2949impl DynAccumulator for LastValueF64DynAccumulator {
2950 fn add_event(&mut self, event: &Event) {
2951 use arrow_array::cast::AsArray;
2952 use arrow_array::types::{Float64Type, Int64Type};
2953
2954 let batch = &event.data;
2955 if self.value_column_index >= batch.num_columns()
2956 || self.timestamp_column_index >= batch.num_columns()
2957 {
2958 return;
2959 }
2960
2961 let val_col = batch.column(self.value_column_index);
2962 let ts_col = batch.column(self.timestamp_column_index);
2963
2964 let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2965 return;
2966 };
2967 let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2968 return;
2969 };
2970
2971 for i in 0..batch.num_rows() {
2972 if val_array.is_null(i) || ts_array.is_null(i) {
2973 continue;
2974 }
2975 let val = val_array.value(i);
2976 let ts = ts_array.value(i);
2977
2978 match self.timestamp {
2979 None => {
2980 self.value = Some(val);
2981 self.timestamp = Some(ts);
2982 }
2983 Some(existing_ts) if ts >= existing_ts => {
2984 self.value = Some(val);
2985 self.timestamp = Some(ts);
2986 }
2987 _ => {}
2988 }
2989 }
2990 }
2991
2992 fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2993 let data = other.serialize();
2994 if data.len() >= 17 && data[0] == 1 {
2995 let (Some(val_bytes), Some(ts_bytes)) = (
2996 <[u8; 8]>::try_from(&data[1..9]).ok(),
2997 <[u8; 8]>::try_from(&data[9..17]).ok(),
2998 ) else {
2999 return;
3000 };
3001 let other_val = f64::from_le_bytes(val_bytes);
3002 let other_ts = i64::from_le_bytes(ts_bytes);
3003 match self.timestamp {
3004 None => {
3005 self.value = Some(other_val);
3006 self.timestamp = Some(other_ts);
3007 }
3008 Some(self_ts) if other_ts >= self_ts => {
3009 self.value = Some(other_val);
3010 self.timestamp = Some(other_ts);
3011 }
3012 _ => {}
3013 }
3014 }
3015 }
3016
3017 fn result_scalar(&self) -> ScalarResult {
3018 ScalarResult::OptionalFloat64(self.value)
3019 }
3020
3021 fn is_empty(&self) -> bool {
3022 self.value.is_none()
3023 }
3024
3025 fn clone_box(&self) -> Box<dyn DynAccumulator> {
3026 Box::new(self.clone())
3027 }
3028
3029 fn serialize(&self) -> Vec<u8> {
3030 match (self.value, self.timestamp) {
3031 (Some(v), Some(ts)) => {
3032 let mut buf = Vec::with_capacity(17);
3033 buf.push(1);
3034 buf.extend_from_slice(&v.to_le_bytes());
3035 buf.extend_from_slice(&ts.to_le_bytes());
3036 buf
3037 }
3038 _ => vec![0],
3039 }
3040 }
3041
3042 fn result_field(&self) -> Field {
3043 Field::new("last_value_f64", DataType::Float64, true)
3044 }
3045
3046 fn type_tag(&self) -> &'static str {
3047 "last_value_f64"
3048 }
3049
3050 fn as_any(&self) -> &dyn std::any::Any {
3051 self
3052 }
3053}
3054
3055pub struct CompositeAggregator {
3076 factories: Vec<Box<dyn DynAggregatorFactory>>,
3078 cached_schema: SchemaRef,
3080}
3081
3082impl std::fmt::Debug for CompositeAggregator {
3083 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3084 f.debug_struct("CompositeAggregator")
3085 .field("num_factories", &self.factories.len())
3086 .finish_non_exhaustive()
3087 }
3088}
3089
3090impl CompositeAggregator {
3091 #[must_use]
3093 pub fn new(factories: Vec<Box<dyn DynAggregatorFactory>>) -> Self {
3094 let mut fields = vec![
3095 Field::new("window_start", DataType::Int64, false),
3096 Field::new("window_end", DataType::Int64, false),
3097 ];
3098 fields.extend(factories.iter().map(|f| f.result_field()));
3099 let cached_schema = Arc::new(Schema::new(fields));
3100 Self {
3101 factories,
3102 cached_schema,
3103 }
3104 }
3105
3106 #[must_use]
3108 pub fn num_aggregates(&self) -> usize {
3109 self.factories.len()
3110 }
3111
3112 #[must_use]
3114 pub fn create_accumulator(&self) -> CompositeAccumulator {
3115 let accumulators = self
3116 .factories
3117 .iter()
3118 .map(|f| f.create_accumulator())
3119 .collect();
3120 CompositeAccumulator { accumulators }
3121 }
3122
3123 #[must_use]
3125 pub fn result_fields(&self) -> Vec<Field> {
3126 self.factories.iter().map(|f| f.result_field()).collect()
3127 }
3128
3129 #[must_use]
3131 pub fn output_schema(&self) -> SchemaRef {
3132 Arc::clone(&self.cached_schema)
3133 }
3134}
3135
3136impl Clone for CompositeAggregator {
3137 fn clone(&self) -> Self {
3138 let factories: Vec<Box<dyn DynAggregatorFactory>> =
3139 self.factories.iter().map(|f| f.clone_box()).collect();
3140 Self {
3141 cached_schema: Arc::clone(&self.cached_schema),
3142 factories,
3143 }
3144 }
3145}
3146
3147pub struct CompositeAccumulator {
3152 accumulators: Vec<Box<dyn DynAccumulator>>,
3154}
3155
3156impl std::fmt::Debug for CompositeAccumulator {
3157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3158 f.debug_struct("CompositeAccumulator")
3159 .field("num_accumulators", &self.accumulators.len())
3160 .finish()
3161 }
3162}
3163
3164impl CompositeAccumulator {
3165 pub fn add_event(&mut self, event: &Event) {
3167 for acc in &mut self.accumulators {
3168 acc.add_event(event);
3169 }
3170 }
3171
3172 pub fn merge(&mut self, other: &Self) -> Result<(), &'static str> {
3179 if self.accumulators.len() != other.accumulators.len() {
3180 return Err("cannot merge composite accumulators with different sizes");
3181 }
3182 for (self_acc, other_acc) in self.accumulators.iter_mut().zip(&other.accumulators) {
3183 self_acc.merge_dyn(other_acc.as_ref());
3184 }
3185 Ok(())
3186 }
3187
3188 #[must_use]
3190 pub fn results(&self) -> Vec<ScalarResult> {
3191 self.accumulators
3192 .iter()
3193 .map(|a| a.result_scalar())
3194 .collect()
3195 }
3196
3197 #[must_use]
3199 pub fn is_empty(&self) -> bool {
3200 self.accumulators.iter().all(|a| a.is_empty())
3201 }
3202
3203 #[must_use]
3205 #[allow(clippy::cast_possible_truncation)] pub fn serialize(&self) -> Vec<u8> {
3207 let mut buf = Vec::new();
3208 let n = self.accumulators.len() as u32;
3210 buf.extend_from_slice(&n.to_le_bytes());
3211 for acc in &self.accumulators {
3212 let tag = acc.type_tag();
3213 let tag_bytes = tag.as_bytes();
3214 buf.extend_from_slice(&(tag_bytes.len() as u16).to_le_bytes());
3216 buf.extend_from_slice(tag_bytes);
3217 let data = acc.serialize();
3218 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
3219 buf.extend_from_slice(&data);
3220 }
3221 buf
3222 }
3223
3224 #[must_use]
3232 pub fn to_record_batch(&self, window_id: &WindowId, schema: &SchemaRef) -> Option<RecordBatch> {
3233 use arrow_array::{Float64Array, UInt64Array};
3234
3235 let mut columns: Vec<Arc<dyn arrow_array::Array>> = vec![
3236 Arc::new(Int64Array::from(vec![window_id.start])),
3237 Arc::new(Int64Array::from(vec![window_id.end])),
3238 ];
3239
3240 for result in self.results() {
3241 let col: Arc<dyn arrow_array::Array> = match result {
3242 ScalarResult::Int64(v) => Arc::new(Int64Array::from(vec![v])),
3243 ScalarResult::Float64(v) => Arc::new(Float64Array::from(vec![v])),
3244 ScalarResult::UInt64(v) => Arc::new(UInt64Array::from(vec![v])),
3245 ScalarResult::OptionalInt64(v) => Arc::new(Int64Array::from(vec![v])),
3246 ScalarResult::OptionalFloat64(v) => Arc::new(Float64Array::from(vec![v])),
3247 ScalarResult::Null => Arc::new(Int64Array::new_null(1)),
3248 };
3249 columns.push(col);
3250 }
3251
3252 RecordBatch::try_new(Arc::clone(schema), columns).ok()
3253 }
3254
3255 #[must_use]
3257 pub fn num_accumulators(&self) -> usize {
3258 self.accumulators.len()
3259 }
3260}
3261
3262impl Clone for CompositeAccumulator {
3263 fn clone(&self) -> Self {
3264 Self {
3265 accumulators: self.accumulators.iter().map(|a| a.clone_box()).collect(),
3266 }
3267 }
3268}
3269
3270const WINDOW_STATE_PREFIX: &[u8; 4] = b"win:";
3272
3273const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
3275
3276pub struct TumblingWindowOperator<A: Aggregator> {
3307 assigner: TumblingWindowAssigner,
3309 aggregator: A,
3311 allowed_lateness_ms: i64,
3313 registered_windows: FxHashSet<WindowId>,
3315 periodic_timer_windows: FxHashSet<WindowId>,
3317 emit_strategy: EmitStrategy,
3319 late_data_config: LateDataConfig,
3321 late_data_metrics: LateDataMetrics,
3323 window_close_metrics: WindowCloseMetrics,
3325 operator_id: String,
3327 output_schema: SchemaRef,
3329 last_emitted: FxHashMap<WindowId, Event>,
3331 side_output_arc: Option<Arc<str>>,
3333 _phantom: PhantomData<A::Acc>,
3335}
3336
3337static OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
3339
3340impl<A: Aggregator> TumblingWindowOperator<A>
3341where
3342 A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3343 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3344 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3345{
3346 #[must_use]
3357 pub fn new(
3358 assigner: TumblingWindowAssigner,
3359 aggregator: A,
3360 allowed_lateness: Duration,
3361 ) -> Self {
3362 let operator_num = OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
3363 let output_schema = Arc::new(Schema::new(vec![
3364 Field::new("window_start", DataType::Int64, false),
3365 Field::new("window_end", DataType::Int64, false),
3366 Field::new(
3367 "result",
3368 aggregator.output_data_type(),
3369 aggregator.output_nullable(),
3370 ),
3371 ]));
3372 Self {
3373 assigner,
3374 aggregator,
3375 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3377 .expect("Allowed lateness must fit in i64"),
3378 registered_windows: FxHashSet::default(),
3379 periodic_timer_windows: FxHashSet::default(),
3380 emit_strategy: EmitStrategy::default(),
3381 late_data_config: LateDataConfig::default(),
3382 late_data_metrics: LateDataMetrics::new(),
3383 window_close_metrics: WindowCloseMetrics::new(),
3384 operator_id: format!("tumbling_window_{operator_num}"),
3385 output_schema,
3386 last_emitted: FxHashMap::default(),
3387 side_output_arc: None,
3388 _phantom: PhantomData,
3389 }
3390 }
3391
3392 #[must_use]
3397 pub fn with_id(
3398 assigner: TumblingWindowAssigner,
3399 aggregator: A,
3400 allowed_lateness: Duration,
3401 operator_id: String,
3402 ) -> Self {
3403 let output_schema = Arc::new(Schema::new(vec![
3404 Field::new("window_start", DataType::Int64, false),
3405 Field::new("window_end", DataType::Int64, false),
3406 Field::new(
3407 "result",
3408 aggregator.output_data_type(),
3409 aggregator.output_nullable(),
3410 ),
3411 ]));
3412 Self {
3413 assigner,
3414 aggregator,
3415 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3417 .expect("Allowed lateness must fit in i64"),
3418 registered_windows: FxHashSet::default(),
3419 periodic_timer_windows: FxHashSet::default(),
3420 emit_strategy: EmitStrategy::default(),
3421 late_data_config: LateDataConfig::default(),
3422 late_data_metrics: LateDataMetrics::new(),
3423 window_close_metrics: WindowCloseMetrics::new(),
3424 operator_id,
3425 output_schema,
3426 last_emitted: FxHashMap::default(),
3427 side_output_arc: None,
3428 _phantom: PhantomData,
3429 }
3430 }
3431
3432 pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
3457 self.emit_strategy = strategy;
3458 }
3459
3460 #[must_use]
3462 pub fn emit_strategy(&self) -> &EmitStrategy {
3463 &self.emit_strategy
3464 }
3465
3466 pub fn set_late_data_config(&mut self, config: LateDataConfig) {
3491 self.late_data_config = config;
3492 }
3493
3494 #[must_use]
3496 pub fn late_data_config(&self) -> &LateDataConfig {
3497 &self.late_data_config
3498 }
3499
3500 #[must_use]
3504 pub fn late_data_metrics(&self) -> &LateDataMetrics {
3505 &self.late_data_metrics
3506 }
3507
3508 pub fn reset_late_data_metrics(&mut self) {
3510 self.late_data_metrics.reset();
3511 }
3512
3513 #[must_use]
3517 pub fn window_close_metrics(&self) -> &WindowCloseMetrics {
3518 &self.window_close_metrics
3519 }
3520
3521 pub fn reset_window_close_metrics(&mut self) {
3523 self.window_close_metrics.reset();
3524 }
3525
3526 #[must_use]
3528 pub fn active_windows_count(&self) -> usize {
3529 self.registered_windows.len()
3530 }
3531
3532 #[must_use]
3534 pub fn assigner(&self) -> &TumblingWindowAssigner {
3535 &self.assigner
3536 }
3537
3538 #[must_use]
3540 pub fn allowed_lateness_ms(&self) -> i64 {
3541 self.allowed_lateness_ms
3542 }
3543
3544 #[inline]
3549 fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
3550 let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
3551 key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
3552 let window_key = window_id.to_key_inline();
3553 key[4..20].copy_from_slice(&window_key);
3554 key
3555 }
3556
3557 fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
3559 let key = Self::state_key(window_id);
3560 state
3561 .get_typed::<A::Acc>(&key)
3562 .ok()
3563 .flatten()
3564 .unwrap_or_else(|| self.aggregator.create_accumulator())
3565 }
3566
3567 fn put_accumulator(
3569 window_id: &WindowId,
3570 acc: &A::Acc,
3571 state: &mut dyn StateStore,
3572 ) -> Result<(), OperatorError> {
3573 let key = Self::state_key(window_id);
3574 state
3575 .put_typed(&key, acc)
3576 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3577 }
3578
3579 fn delete_accumulator(
3581 window_id: &WindowId,
3582 state: &mut dyn StateStore,
3583 ) -> Result<(), OperatorError> {
3584 let key = Self::state_key(window_id);
3585 state
3586 .delete(&key)
3587 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3588 }
3589
3590 fn is_late(&self, event_time: i64, watermark: i64) -> bool {
3592 let window_id = self.assigner.assign(event_time);
3593 let cleanup_time = window_id.end + self.allowed_lateness_ms;
3594 watermark >= cleanup_time
3595 }
3596
3597 fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3599 if !self.registered_windows.contains(&window_id) {
3600 let trigger_time = window_id.end + self.allowed_lateness_ms;
3602 ctx.timers.register_timer(
3603 trigger_time,
3604 Some(window_id.to_key()),
3605 Some(ctx.operator_index),
3606 );
3607 self.registered_windows.insert(window_id);
3608 }
3609 }
3610
3611 fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3617 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3618 if !self.periodic_timer_windows.contains(&window_id) {
3619 let interval_ms =
3621 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3622 let trigger_time = ctx.processing_time + interval_ms;
3623
3624 let key = Self::periodic_timer_key(&window_id);
3626
3627 ctx.timers
3628 .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
3629 self.periodic_timer_windows.insert(window_id);
3630 }
3631 }
3632 }
3633
3634 #[inline]
3639 fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
3640 let mut key = window_id.to_key();
3641 if !key.is_empty() {
3643 key[0] |= 0x80;
3644 }
3645 key
3646 }
3647
3648 #[inline]
3650 fn is_periodic_timer_key(key: &[u8]) -> bool {
3651 !key.is_empty() && (key[0] & 0x80) != 0
3652 }
3653
3654 #[inline]
3656 fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
3657 if key.len() != 16 {
3658 return None;
3659 }
3660 let mut clean_key = [0u8; 16];
3661 clean_key.copy_from_slice(key);
3662 clean_key[0] &= 0x7F;
3664 WindowId::from_key(&clean_key)
3665 }
3666
3667 fn create_intermediate_result(
3675 &self,
3676 window_id: &WindowId,
3677 state: &dyn crate::state::StateStore,
3678 ) -> Option<Event> {
3679 let acc = self.get_accumulator(window_id, state);
3680
3681 if acc.is_empty() {
3682 return None;
3683 }
3684
3685 let result = acc.result();
3686 let result_array = result.to_arrow_array();
3687
3688 let batch = RecordBatch::try_new(
3689 Arc::clone(&self.output_schema),
3690 vec![
3691 Arc::new(Int64Array::from(vec![window_id.start])),
3692 Arc::new(Int64Array::from(vec![window_id.end])),
3693 result_array,
3694 ],
3695 )
3696 .ok()?;
3697
3698 Some(Event::new(window_id.end, batch))
3699 }
3700
3701 fn handle_periodic_timer(
3703 &mut self,
3704 window_id: WindowId,
3705 ctx: &mut OperatorContext,
3706 ) -> OutputVec {
3707 let mut output = OutputVec::new();
3708
3709 if !self.registered_windows.contains(&window_id) {
3711 self.periodic_timer_windows.remove(&window_id);
3713 return output;
3714 }
3715
3716 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3718 output.push(Output::Event(event));
3719 }
3720
3721 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3723 let interval_ms =
3724 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3725 let next_trigger = ctx.processing_time + interval_ms;
3726
3727 let window_close_time = window_id.end + self.allowed_lateness_ms;
3729 if next_trigger < window_close_time {
3730 let key = Self::periodic_timer_key(&window_id);
3731 ctx.timers
3732 .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
3733 }
3734 }
3735
3736 output
3737 }
3738}
3739
3740impl<A: Aggregator> Operator for TumblingWindowOperator<A>
3741where
3742 A::Acc: 'static
3743 + Archive
3744 + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3745 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3746 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3747{
3748 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
3749 let event_time = event.timestamp;
3750
3751 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
3753
3754 let current_wm = ctx.watermark_generator.current_watermark();
3757 if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
3758 let mut output = OutputVec::new();
3759
3760 if self.emit_strategy.drops_late_data() {
3762 self.late_data_metrics.record_dropped();
3763 return output; }
3765
3766 if let Some(side_output_name) = self.late_data_config.side_output() {
3768 self.late_data_metrics.record_side_output();
3770 let name_arc = self
3771 .side_output_arc
3772 .get_or_insert_with(|| Arc::from(side_output_name))
3773 .clone();
3774 output.push(Output::SideOutput(Box::new(SideOutputData {
3775 name: name_arc,
3776 event: event.clone(),
3777 })));
3778 } else {
3779 self.late_data_metrics.record_dropped();
3781 output.push(Output::LateEvent(event.clone()));
3782 }
3783 return output;
3784 }
3785
3786 let window_id = self.assigner.assign(event_time);
3788
3789 let mut state_updated = false;
3791
3792 let values = self.aggregator.extract_batch(event);
3794 if !values.is_empty() {
3795 let mut acc = self.get_accumulator(&window_id, ctx.state);
3796 for value in values {
3797 acc.add(value);
3798 }
3799 if let Err(e) = Self::put_accumulator(&window_id, &acc, ctx.state) {
3800 tracing::error!("Failed to store window state: {e}");
3802 } else {
3803 state_updated = true;
3804 }
3805 }
3806
3807 self.maybe_register_timer(window_id, ctx);
3809
3810 if !self.emit_strategy.suppresses_intermediate() {
3813 self.maybe_register_periodic_timer(window_id, ctx);
3814 }
3815
3816 let mut output = OutputVec::new();
3818 if let Some(wm) = emitted_watermark {
3819 output.push(Output::Watermark(wm.timestamp()));
3820 }
3821
3822 if state_updated {
3824 match &self.emit_strategy {
3825 EmitStrategy::OnUpdate => {
3827 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3828 output.push(Output::Event(event));
3829 }
3830 }
3831 EmitStrategy::Changelog => {
3833 if let Some(new_event) = self.create_intermediate_result(&window_id, ctx.state)
3834 {
3835 if let Some(old_event) = self.last_emitted.get(&window_id) {
3837 let delete =
3838 ChangelogRecord::delete(old_event.clone(), ctx.processing_time);
3839 output.push(Output::Changelog(delete));
3840 }
3841 let insert =
3843 ChangelogRecord::insert(new_event.clone(), ctx.processing_time);
3844 output.push(Output::Changelog(insert));
3845 self.last_emitted.insert(window_id, new_event);
3846 }
3847 }
3848 EmitStrategy::OnWatermark
3850 | EmitStrategy::Periodic(_)
3851 | EmitStrategy::OnWindowClose
3852 | EmitStrategy::Final => {}
3853 }
3854 }
3855
3856 output
3857 }
3858
3859 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
3860 if Self::is_periodic_timer_key(&timer.key) {
3862 if self.emit_strategy.suppresses_intermediate() {
3864 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3866 self.periodic_timer_windows.remove(&window_id);
3867 }
3868 return OutputVec::new();
3869 }
3870
3871 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3872 return self.handle_periodic_timer(window_id, ctx);
3873 }
3874 return OutputVec::new();
3875 }
3876
3877 let Some(window_id) = WindowId::from_key(&timer.key) else {
3879 return OutputVec::new();
3880 };
3881
3882 let acc = self.get_accumulator(&window_id, ctx.state);
3884
3885 if acc.is_empty() {
3887 let _ = Self::delete_accumulator(&window_id, ctx.state);
3889 self.registered_windows.remove(&window_id);
3890 self.periodic_timer_windows.remove(&window_id);
3891 self.last_emitted.remove(&window_id);
3892 return OutputVec::new();
3893 }
3894
3895 let result = acc.result();
3897
3898 let _ = Self::delete_accumulator(&window_id, ctx.state);
3900 self.registered_windows.remove(&window_id);
3901 self.periodic_timer_windows.remove(&window_id);
3902 self.last_emitted.remove(&window_id);
3903
3904 let result_array = result.to_arrow_array();
3906
3907 let batch = RecordBatch::try_new(
3909 Arc::clone(&self.output_schema),
3910 vec![
3911 Arc::new(Int64Array::from(vec![window_id.start])),
3912 Arc::new(Int64Array::from(vec![window_id.end])),
3913 result_array,
3914 ],
3915 );
3916
3917 let mut output = OutputVec::new();
3918 match batch {
3919 Ok(data) => {
3920 let event = Event::new(window_id.end, data);
3921
3922 self.window_close_metrics
3924 .record_close(window_id.end, ctx.processing_time);
3925
3926 match &self.emit_strategy {
3928 EmitStrategy::Changelog => {
3930 let record = ChangelogRecord::insert(event, ctx.processing_time);
3931 output.push(Output::Changelog(record));
3932 }
3933 EmitStrategy::OnWatermark
3935 | EmitStrategy::Periodic(_)
3936 | EmitStrategy::OnUpdate
3937 | EmitStrategy::OnWindowClose
3938 | EmitStrategy::Final => {
3939 output.push(Output::Event(event));
3940 }
3941 }
3942 }
3943 Err(e) => {
3944 tracing::error!("Failed to create output batch: {e}");
3945 }
3946 }
3947 output
3948 }
3949
3950 fn checkpoint(&self) -> OperatorState {
3951 let windows: Vec<_> = self.registered_windows.iter().copied().collect();
3953 let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
3954
3955 let checkpoint_data = (windows, periodic_windows);
3957 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
3958 .map(|v| v.to_vec())
3959 .unwrap_or_default();
3960
3961 OperatorState {
3962 operator_id: self.operator_id.clone(),
3963 data,
3964 }
3965 }
3966
3967 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
3968 if state.operator_id != self.operator_id {
3969 return Err(OperatorError::StateAccessFailed(format!(
3970 "Operator ID mismatch: expected {}, got {}",
3971 self.operator_id, state.operator_id
3972 )));
3973 }
3974
3975 if let Ok(archived) =
3977 rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
3978 {
3979 if let Ok((windows, periodic_windows)) =
3980 rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
3981 {
3982 self.registered_windows = windows.into_iter().collect();
3983 self.periodic_timer_windows = periodic_windows.into_iter().collect();
3984 return Ok(());
3985 }
3986 }
3987
3988 let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
3990 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3991 let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
3992 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3993
3994 self.registered_windows = windows.into_iter().collect();
3995 self.periodic_timer_windows = FxHashSet::default();
3996 Ok(())
3997 }
3998}
3999
4000#[cfg(test)]
4001mod tests;