Skip to main content

laminar_core/operator/
window.rs

1//! # Window Operators
2//!
3//! Implementation of various window types for stream processing.
4//!
5//! ## Window Types
6//!
7//! - **Tumbling**: Fixed-size, non-overlapping windows (implemented)
8//! - **Sliding**: Fixed-size, overlapping windows (future)
9//! - **Session**: Dynamic windows based on activity gaps (future)
10//!
11//! ## Emit Strategies
12//!
13//! Windows support different emission strategies via [`EmitStrategy`]:
14//!
15//! - `OnWatermark` (default): Emit results when watermark passes window end
16//! - `Periodic`: Emit intermediate results at fixed intervals
17//! - `OnUpdate`: Emit after every state change (most expensive)
18//!
19//! ## Example
20//!
21//! ```rust,no_run
22//! use laminar_core::operator::window::{
23//!     TumblingWindowAssigner, TumblingWindowOperator, CountAggregator, EmitStrategy,
24//! };
25//! use std::time::Duration;
26//!
27//! // Create a 1-minute tumbling window with count aggregation
28//! let assigner = TumblingWindowAssigner::new(Duration::from_secs(60));
29//! let mut operator = TumblingWindowOperator::new(
30//!     assigner,
31//!     CountAggregator::new(),
32//!     Duration::from_secs(5), // 5 second grace period
33//! );
34//!
35//! // Emit intermediate results every 10 seconds
36//! operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(10)));
37//! ```
38
39use super::{
40    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec,
41    SideOutputData, Timer,
42};
43use crate::state::{StateStore, StateStoreExt};
44use arrow_array::{Array as ArrowArray, Int64Array, RecordBatch};
45use arrow_schema::{DataType, Field, Schema, SchemaRef};
46use rkyv::{
47    api::high::{HighDeserializer, HighSerializer, HighValidator},
48    bytecheck::CheckBytes,
49    rancor::Error as RkyvError,
50    ser::allocator::ArenaHandle,
51    util::AlignedVec,
52    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
53};
54use rustc_hash::{FxHashMap, FxHashSet};
55use smallvec::SmallVec;
56use std::marker::PhantomData;
57use std::sync::atomic::{AtomicU64, Ordering};
58use std::sync::Arc;
59use std::time::Duration;
60
61/// Configuration for late data handling.
62///
63/// Controls what happens to events that arrive after their window has closed
64/// (i.e., after `watermark >= window_end + allowed_lateness`).
65///
66/// This is particularly important for [`EmitStrategy::OnWindowClose`], where
67/// late events are **never** re-incorporated into a closed window. The choice
68/// is between dropping them silently or routing them to a side output for
69/// separate processing (e.g., a `late_events` topic or table).
70///
71/// For [`EmitStrategy::Final`], late events are always silently dropped
72/// regardless of this configuration.
73///
74/// # Example
75///
76/// ```rust,no_run
77/// use laminar_core::operator::window::LateDataConfig;
78/// use std::time::Duration;
79///
80/// // Route late events to a side output called "late_events"
81/// let config = LateDataConfig::with_side_output("late_events".to_string());
82///
83/// // Drop late events (default behavior)
84/// let config = LateDataConfig::drop();
85/// ```
86#[derive(Debug, Clone, PartialEq, Eq, Default)]
87pub struct LateDataConfig {
88    /// Name of the side output for late data (None = drop late events)
89    side_output: Option<String>,
90}
91
92impl LateDataConfig {
93    /// Creates a config that drops late events (default behavior).
94    #[must_use]
95    pub fn drop() -> Self {
96        Self { side_output: None }
97    }
98
99    /// Creates a config that routes late events to a named side output.
100    #[must_use]
101    pub fn with_side_output(name: String) -> Self {
102        Self {
103            side_output: Some(name),
104        }
105    }
106
107    /// Returns the side output name, if configured.
108    #[must_use]
109    pub fn side_output(&self) -> Option<&str> {
110        self.side_output.as_deref()
111    }
112
113    /// Returns true if late events should be dropped.
114    #[must_use]
115    pub fn should_drop(&self) -> bool {
116        self.side_output.is_none()
117    }
118}
119
120/// Metrics for tracking late data.
121///
122/// These counters track the behavior of the late data handling system
123/// and can be used for monitoring and alerting.
124#[derive(Debug, Clone, Default)]
125#[allow(clippy::struct_field_names)]
126pub struct LateDataMetrics {
127    /// Total number of late events received
128    late_events_total: u64,
129    /// Number of late events dropped (no side output configured)
130    late_events_dropped: u64,
131    /// Number of late events routed to side output
132    late_events_side_output: u64,
133}
134
135impl LateDataMetrics {
136    /// Creates a new metrics tracker.
137    #[must_use]
138    pub fn new() -> Self {
139        Self::default()
140    }
141
142    /// Returns the total number of late events received.
143    #[must_use]
144    pub fn late_events_total(&self) -> u64 {
145        self.late_events_total
146    }
147
148    /// Returns the number of late events that were dropped.
149    #[must_use]
150    pub fn late_events_dropped(&self) -> u64 {
151        self.late_events_dropped
152    }
153
154    /// Returns the number of late events routed to side output.
155    #[must_use]
156    pub fn late_events_side_output(&self) -> u64 {
157        self.late_events_side_output
158    }
159
160    /// Records a dropped late event.
161    pub fn record_dropped(&mut self) {
162        self.late_events_total += 1;
163        self.late_events_dropped += 1;
164    }
165
166    /// Records a late event routed to side output.
167    pub fn record_side_output(&mut self) {
168        self.late_events_total += 1;
169        self.late_events_side_output += 1;
170    }
171
172    /// Resets all counters to zero.
173    pub fn reset(&mut self) {
174        self.late_events_total = 0;
175        self.late_events_dropped = 0;
176        self.late_events_side_output = 0;
177    }
178}
179
180/// Metrics for tracking window close behavior.
181///
182/// These counters track window lifecycle events and can be used for
183/// monitoring watermark lag and window throughput. Particularly useful
184/// for [`EmitStrategy::OnWindowClose`] where each window emits exactly
185/// once.
186#[derive(Debug, Clone, Default)]
187pub struct WindowCloseMetrics {
188    /// Total number of windows that have emitted and closed
189    windows_closed_total: u64,
190    /// Sum of close latencies in milliseconds (for computing averages)
191    close_latency_sum_ms: i64,
192    /// Maximum close latency observed (milliseconds)
193    close_latency_max_ms: i64,
194}
195
196impl WindowCloseMetrics {
197    /// Creates a new metrics tracker.
198    #[must_use]
199    pub fn new() -> Self {
200        Self::default()
201    }
202
203    /// Returns the total number of windows that have emitted and closed.
204    #[must_use]
205    pub fn windows_closed_total(&self) -> u64 {
206        self.windows_closed_total
207    }
208
209    /// Returns the average window close latency in milliseconds.
210    ///
211    /// Close latency measures the delay between `window_end` and the
212    /// actual emission time (`processing_time`). This reflects watermark
213    /// lag — how long after the window boundary the watermark advances
214    /// enough to trigger emission.
215    ///
216    /// Returns 0 if no windows have been closed.
217    #[must_use]
218    pub fn avg_close_latency_ms(&self) -> i64 {
219        if self.windows_closed_total == 0 {
220            0
221        } else {
222            self.close_latency_sum_ms / i64::try_from(self.windows_closed_total).unwrap_or(i64::MAX)
223        }
224    }
225
226    /// Returns the maximum close latency observed (milliseconds).
227    #[must_use]
228    pub fn max_close_latency_ms(&self) -> i64 {
229        self.close_latency_max_ms
230    }
231
232    /// Records a window close event.
233    ///
234    /// # Arguments
235    ///
236    /// * `window_end` - The exclusive upper bound of the closed window
237    /// * `processing_time` - The wall-clock time at which the window emitted
238    pub fn record_close(&mut self, window_end: i64, processing_time: i64) {
239        self.windows_closed_total += 1;
240        let latency = processing_time.saturating_sub(window_end).max(0);
241        self.close_latency_sum_ms += latency;
242        if latency > self.close_latency_max_ms {
243            self.close_latency_max_ms = latency;
244        }
245    }
246
247    /// Resets all counters to zero.
248    pub fn reset(&mut self) {
249        self.windows_closed_total = 0;
250        self.close_latency_sum_ms = 0;
251        self.close_latency_max_ms = 0;
252    }
253}
254
255/// Strategy for when window results should be emitted.
256///
257/// This controls the trade-off between result freshness and efficiency:
258/// - `OnWatermark` is most efficient but has highest latency
259/// - `Periodic` balances freshness and efficiency
260/// - `OnUpdate` provides lowest latency but highest overhead
261/// - `OnWindowClose` is for append-only sinks
262/// - `Changelog` emits Z-set weighted records for CDC
263/// - `Final` suppresses all intermediate results
264#[derive(Debug, Clone, PartialEq, Eq, Default)]
265pub enum EmitStrategy {
266    // === Existing ===
267    /// Emit final results when watermark passes window end (default).
268    ///
269    /// This is the most efficient strategy as it only emits once per window.
270    /// Results are guaranteed to be complete (within allowed lateness bounds).
271    /// May emit retractions if late data arrives within lateness bounds.
272    #[default]
273    OnWatermark,
274
275    /// Emit intermediate results at fixed intervals.
276    ///
277    /// Useful for dashboards and monitoring where periodic updates are needed
278    /// before the window closes. The final result is still emitted on watermark.
279    ///
280    /// The duration specifies the interval between periodic emissions.
281    Periodic(Duration),
282
283    /// Emit updated results after every state change.
284    ///
285    /// This provides the lowest latency for result visibility but has the
286    /// highest overhead. Each incoming event triggers an emission.
287    ///
288    /// Use with caution for high-volume streams.
289    OnUpdate,
290
291    // === New ===
292    /// Emit ONLY when watermark passes window end. No intermediate emissions.
293    ///
294    /// **Critical for append-only sinks** (Kafka, S3, Delta Lake, Iceberg).
295    /// Guarantees exactly one emission per window and strictly append-only
296    /// output — no retractions, no updates.
297    ///
298    /// # Window Close Condition
299    ///
300    /// A window closes when `watermark >= window_end + allowed_lateness`.
301    /// At that point, the timer fires, the final result is emitted, and
302    /// window state is purged immediately.
303    ///
304    /// # Late Data Policy
305    ///
306    /// Events arriving after window close are classified as **late** and
307    /// handled by [`LateDataConfig`]:
308    /// - **Default**: dropped (increment `late_events_dropped` metric)
309    /// - **With side output**: routed to named side output for separate
310    ///   processing (increment `late_events_side_output` metric)
311    ///
312    /// Late data **never** re-opens a closed window. The single emission
313    /// is final and immutable. This is the key contract that makes EOWC
314    /// safe for append-only sinks.
315    ///
316    /// # Requires
317    ///
318    /// - A watermark definition on the source (otherwise timers never fire)
319    /// - A windowed aggregation query
320    ///
321    /// SQL: `EMIT ON WINDOW CLOSE`
322    OnWindowClose,
323
324    /// Emit changelog records with Z-set weights.
325    ///
326    /// Every emission includes operation type and weight:
327    /// - Insert (+1 weight)
328    /// - Delete (-1 weight)
329    /// - Update (retraction pair: -1 old, +1 new)
330    ///
331    /// Required for:
332    /// - CDC pipelines
333    /// - Cascading materialized views
334    /// - Downstream consumers that need to track changes
335    ///
336    /// SQL: `EMIT CHANGES`
337    Changelog,
338
339    /// Suppress ALL intermediate results, emit only finalized.
340    ///
341    /// Similar to `OnWindowClose` but also suppresses:
342    /// - Periodic emissions (even if Periodic was set elsewhere)
343    /// - Late data retractions (drops late data entirely after window close)
344    ///
345    /// Use for BI reporting where only final, exact results matter.
346    ///
347    /// SQL: `EMIT FINAL`
348    Final,
349}
350
351impl EmitStrategy {
352    /// Returns true if this strategy requires periodic timer registration.
353    #[must_use]
354    pub fn needs_periodic_timer(&self) -> bool {
355        matches!(self, Self::Periodic(_))
356    }
357
358    /// Returns the periodic interval if this is a periodic strategy.
359    #[must_use]
360    pub fn periodic_interval(&self) -> Option<Duration> {
361        match self {
362            Self::Periodic(d) => Some(*d),
363            _ => None,
364        }
365    }
366
367    /// Returns true if results should be emitted on every update.
368    #[must_use]
369    pub fn emits_on_update(&self) -> bool {
370        matches!(self, Self::OnUpdate)
371    }
372
373    // === Helper Methods ===
374
375    /// Returns true if this strategy emits intermediate results.
376    ///
377    /// Strategies that emit intermediate results (before window close):
378    /// - `OnUpdate`: emits after every state change
379    /// - `Periodic`: emits at fixed intervals
380    ///
381    /// Strategies that do NOT emit intermediate results:
382    /// - `OnWatermark`: waits for watermark
383    /// - `OnWindowClose`: only emits when window closes
384    /// - `Changelog`: depends on trigger, but typically on watermark
385    /// - `Final`: only emits final result
386    #[must_use]
387    pub fn emits_intermediate(&self) -> bool {
388        matches!(self, Self::OnUpdate | Self::Periodic(_))
389    }
390
391    /// Returns true if this strategy requires changelog/Z-set support.
392    ///
393    /// The `Changelog` strategy requires the operator to track previous
394    /// values and emit insert/delete/update records with weights.
395    #[must_use]
396    pub fn requires_changelog(&self) -> bool {
397        matches!(self, Self::Changelog)
398    }
399
400    /// Returns true if this strategy is suitable for append-only sinks.
401    ///
402    /// Append-only sinks (Kafka, S3, Delta Lake, Iceberg) cannot handle
403    /// retractions or updates. Only these strategies are safe:
404    /// - `OnWindowClose`: guarantees single emission per window
405    /// - `Final`: suppresses all intermediate results
406    #[must_use]
407    pub fn is_append_only_compatible(&self) -> bool {
408        matches!(self, Self::OnWindowClose | Self::Final)
409    }
410
411    /// Returns true if late data should generate retractions.
412    ///
413    /// Strategies that generate retractions for late data:
414    /// - `OnWatermark`: may retract previous result
415    /// - `OnUpdate`: immediately emits updated result
416    /// - `Changelog`: emits -old/+new pair
417    ///
418    /// Strategies that do NOT generate retractions:
419    /// - `OnWindowClose`: drops late data (or routes to side output)
420    /// - `Final`: drops late data silently
421    /// - `Periodic`: depends on whether window is still open
422    #[must_use]
423    pub fn generates_retractions(&self) -> bool {
424        matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
425    }
426
427    /// Returns true if this strategy should suppress intermediate emissions.
428    ///
429    /// Used to override periodic timers when a suppressing strategy is active.
430    #[must_use]
431    pub fn suppresses_intermediate(&self) -> bool {
432        matches!(self, Self::OnWindowClose | Self::Final)
433    }
434
435    /// Returns true if late data should be dropped entirely.
436    ///
437    /// The `Final` strategy drops late data to ensure only exact,
438    /// finalized results are emitted.
439    #[must_use]
440    pub fn drops_late_data(&self) -> bool {
441        matches!(self, Self::Final)
442    }
443}
444
445/// Unique identifier for a window.
446///
447/// Windows are identified by their start and end timestamps (in milliseconds).
448/// For tumbling windows, these are non-overlapping intervals.
449#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Archive, RkyvSerialize, RkyvDeserialize)]
450pub struct WindowId {
451    /// Window start timestamp (inclusive, in milliseconds)
452    pub start: i64,
453    /// Window end timestamp (exclusive, in milliseconds)
454    pub end: i64,
455}
456
457impl WindowId {
458    /// Creates a new window ID.
459    #[must_use]
460    pub fn new(start: i64, end: i64) -> Self {
461        Self { start, end }
462    }
463
464    /// Returns the window duration in milliseconds.
465    #[must_use]
466    pub fn duration_ms(&self) -> i64 {
467        self.end - self.start
468    }
469
470    /// Converts the window ID to a byte key for state storage.
471    ///
472    /// Uses `TimerKey` (`SmallVec`) which stores the 16-byte key inline,
473    /// avoiding heap allocation on the hot path.
474    #[inline]
475    #[must_use]
476    pub fn to_key(&self) -> super::TimerKey {
477        super::TimerKey::from(self.to_key_inline())
478    }
479
480    /// Converts the window ID to a stack-allocated byte key.
481    ///
482    /// This is the zero-allocation version for Ring 0 hot path operations.
483    /// Returns a fixed-size array that can be used directly with state stores.
484    #[inline]
485    #[must_use]
486    pub fn to_key_inline(&self) -> [u8; 16] {
487        let mut key = [0u8; 16];
488        key[..8].copy_from_slice(&self.start.to_be_bytes());
489        key[8..16].copy_from_slice(&self.end.to_be_bytes());
490        key
491    }
492
493    /// Parses a window ID from a byte key.
494    ///
495    /// # Errors
496    ///
497    /// Returns `None` if the key is not exactly 16 bytes.
498    #[must_use]
499    pub fn from_key(key: &[u8]) -> Option<Self> {
500        if key.len() != 16 {
501            return None;
502        }
503        let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
504        let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
505        Some(Self { start, end })
506    }
507}
508
509/// Collection type for window assignments.
510///
511/// Uses `SmallVec` to avoid heap allocation for common cases:
512/// - 1 window: tumbling windows (most common)
513/// - 2-4 windows: sliding windows with small overlap
514pub type WindowIdVec = SmallVec<[WindowId; 4]>;
515
516// === Changelog/Z-Set Support ===
517
518/// CDC operation type for changelog records.
519///
520/// These map to Z-set weights:
521/// - `Insert`: +1 weight
522/// - `Delete`: -1 weight
523/// - `UpdateBefore`: -1 weight (first half of update)
524/// - `UpdateAfter`: +1 weight (second half of update)
525#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
526pub enum CdcOperation {
527    /// Insert a new record (+1 weight)
528    Insert,
529    /// Delete an existing record (-1 weight)
530    Delete,
531    /// Retraction of previous value before update (-1 weight)
532    UpdateBefore,
533    /// New value after update (+1 weight)
534    UpdateAfter,
535}
536
537impl CdcOperation {
538    /// Returns the Z-set weight for this operation.
539    ///
540    /// - Insert/UpdateAfter: +1
541    /// - Delete/UpdateBefore: -1
542    #[must_use]
543    pub fn weight(&self) -> i32 {
544        match self {
545            Self::Insert | Self::UpdateAfter => 1,
546            Self::Delete | Self::UpdateBefore => -1,
547        }
548    }
549
550    /// Returns true if this is an insert-type operation.
551    #[must_use]
552    pub fn is_insert(&self) -> bool {
553        matches!(self, Self::Insert | Self::UpdateAfter)
554    }
555
556    /// Returns true if this is a delete-type operation.
557    #[must_use]
558    pub fn is_delete(&self) -> bool {
559        matches!(self, Self::Delete | Self::UpdateBefore)
560    }
561
562    /// Returns the Debezium-compatible operation code.
563    ///
564    /// - 'c': create (insert)
565    /// - 'd': delete
566    /// - 'u': update (used for both before/after in Debezium)
567    #[must_use]
568    pub fn debezium_op(&self) -> char {
569        match self {
570            Self::Insert => 'c',
571            Self::Delete => 'd',
572            Self::UpdateBefore | Self::UpdateAfter => 'u',
573        }
574    }
575
576    /// Converts the operation to a u8 for compact storage.
577    ///
578    /// Used by `ChangelogRef` to store operation type in a single byte.
579    #[inline]
580    #[must_use]
581    pub fn to_u8(self) -> u8 {
582        match self {
583            Self::Insert => 0,
584            Self::Delete => 1,
585            Self::UpdateBefore => 2,
586            Self::UpdateAfter => 3,
587        }
588    }
589
590    /// Converts from u8 (defaults to Insert for unknown values).
591    #[inline]
592    #[must_use]
593    pub fn from_u8(value: u8) -> Self {
594        match value {
595            1 => Self::Delete,
596            2 => Self::UpdateBefore,
597            3 => Self::UpdateAfter,
598            // 0 and unknown values default to Insert
599            _ => Self::Insert,
600        }
601    }
602}
603
604/// A changelog record with Z-set weight for CDC pipelines.
605///
606/// This wraps an event with metadata needed for change data capture:
607/// - Operation type (insert/delete/update)
608/// - Z-set weight (+1/-1)
609/// - Timestamp of the change
610///
611/// Used by `EmitStrategy::Changelog` to emit structured change records
612/// that can be consumed by downstream systems expecting CDC format.
613///
614/// # Example
615///
616/// ```rust,no_run
617/// use laminar_core::operator::window::{ChangelogRecord, CdcOperation};
618/// use laminar_core::operator::Event;
619/// # use std::sync::Arc;
620/// # use arrow_array::RecordBatch;
621/// # use arrow_schema::Schema;
622/// # let schema = Arc::new(Schema::empty());
623/// # let batch = RecordBatch::new_empty(schema);
624/// # let event = Event::new(0, batch.clone());
625/// # let old_event = event.clone();
626/// # let new_event = event.clone();
627///
628/// // Create an insert record
629/// let record = ChangelogRecord::insert(event, 1000);
630/// assert_eq!(record.operation, CdcOperation::Insert);
631/// assert_eq!(record.weight, 1);
632///
633/// // Create a retraction pair for an update
634/// let (before, after) = ChangelogRecord::update(old_event, new_event, 1000);
635/// assert_eq!(before.weight, -1);  // Retract old
636/// assert_eq!(after.weight, 1);    // Insert new
637/// ```
638#[derive(Debug, Clone)]
639pub struct ChangelogRecord {
640    /// The CDC operation type
641    pub operation: CdcOperation,
642    /// Z-set weight (+1 for insert, -1 for delete)
643    pub weight: i32,
644    /// Timestamp when this change was emitted
645    pub emit_timestamp: i64,
646    /// The event data
647    pub event: Event,
648}
649
650impl ChangelogRecord {
651    /// Creates an insert changelog record.
652    #[must_use]
653    pub fn insert(event: Event, emit_timestamp: i64) -> Self {
654        Self {
655            operation: CdcOperation::Insert,
656            weight: 1,
657            emit_timestamp,
658            event,
659        }
660    }
661
662    /// Creates a delete changelog record.
663    #[must_use]
664    pub fn delete(event: Event, emit_timestamp: i64) -> Self {
665        Self {
666            operation: CdcOperation::Delete,
667            weight: -1,
668            emit_timestamp,
669            event,
670        }
671    }
672
673    /// Creates an update retraction pair (before and after records).
674    ///
675    /// Returns a tuple of (`UpdateBefore`, `UpdateAfter`) records.
676    /// The first should be emitted before the second to properly
677    /// retract the old value.
678    #[must_use]
679    pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
680        let before = Self {
681            operation: CdcOperation::UpdateBefore,
682            weight: -1,
683            emit_timestamp,
684            event: old_event,
685        };
686        let after = Self {
687            operation: CdcOperation::UpdateAfter,
688            weight: 1,
689            emit_timestamp,
690            event: new_event,
691        };
692        (before, after)
693    }
694
695    /// Creates a changelog record from raw parts.
696    #[must_use]
697    pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
698        Self {
699            operation,
700            weight: operation.weight(),
701            emit_timestamp,
702            event,
703        }
704    }
705
706    /// Returns true if this is an insert-type record.
707    #[must_use]
708    pub fn is_insert(&self) -> bool {
709        self.operation.is_insert()
710    }
711
712    /// Returns true if this is a delete-type record.
713    #[must_use]
714    pub fn is_delete(&self) -> bool {
715        self.operation.is_delete()
716    }
717}
718
719/// Trait for assigning events to windows.
720pub trait WindowAssigner: Send {
721    /// Assigns an event timestamp to zero or more windows.
722    ///
723    /// For tumbling windows, this returns exactly one window.
724    /// For sliding windows, this may return multiple windows.
725    fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
726
727    /// Returns the maximum timestamp that could still be assigned to a window
728    /// ending at `window_end`.
729    ///
730    /// Used for determining when a window can be safely triggered.
731    fn max_timestamp(&self, window_end: i64) -> i64 {
732        window_end - 1
733    }
734}
735
736/// Tumbling window assigner.
737///
738/// Assigns each event to exactly one non-overlapping window based on its timestamp.
739/// Windows are aligned to epoch (timestamp 0).
740#[derive(Debug, Clone)]
741pub struct TumblingWindowAssigner {
742    /// Window size in milliseconds
743    size_ms: i64,
744    /// Offset in milliseconds for timezone-aligned windows
745    offset_ms: i64,
746}
747
748impl TumblingWindowAssigner {
749    /// Creates a new tumbling window assigner.
750    ///
751    /// # Arguments
752    ///
753    /// * `size` - The duration of each window
754    ///
755    /// # Panics
756    ///
757    /// Panics if the size is zero.
758    #[must_use]
759    pub fn new(size: Duration) -> Self {
760        // Ensure window size fits in i64 and is positive
761        let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
762        assert!(size_ms > 0, "Window size must be positive");
763        Self {
764            size_ms,
765            offset_ms: 0,
766        }
767    }
768
769    /// Creates a new tumbling window assigner with size in milliseconds.
770    ///
771    /// # Panics
772    ///
773    /// Panics if the size is zero or negative.
774    #[must_use]
775    pub fn from_millis(size_ms: i64) -> Self {
776        assert!(size_ms > 0, "Window size must be positive");
777        Self {
778            size_ms,
779            offset_ms: 0,
780        }
781    }
782
783    /// Set window offset in milliseconds.
784    #[must_use]
785    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
786        self.offset_ms = offset_ms;
787        self
788    }
789
790    /// Returns the window size in milliseconds.
791    #[must_use]
792    pub fn size_ms(&self) -> i64 {
793        self.size_ms
794    }
795
796    /// Returns the window offset in milliseconds.
797    #[must_use]
798    pub fn offset_ms(&self) -> i64 {
799        self.offset_ms
800    }
801
802    /// Assigns a timestamp to a window.
803    ///
804    /// This is the core window assignment function with O(1) complexity.
805    /// When an offset is set, windows are shifted: `floor((ts - offset) / size) * size + offset`.
806    #[inline]
807    #[must_use]
808    pub fn assign(&self, timestamp: i64) -> WindowId {
809        let adjusted = timestamp - self.offset_ms;
810        let window_start = if adjusted >= 0 {
811            (adjusted / self.size_ms) * self.size_ms
812        } else {
813            // For negative timestamps, we need to floor divide
814            ((adjusted - self.size_ms + 1) / self.size_ms) * self.size_ms
815        };
816        let window_start = window_start + self.offset_ms;
817        let window_end = window_start + self.size_ms;
818        WindowId::new(window_start, window_end)
819    }
820}
821
822impl WindowAssigner for TumblingWindowAssigner {
823    #[inline]
824    fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
825        let mut windows = WindowIdVec::new();
826        windows.push(self.assign(timestamp));
827        windows
828    }
829}
830
831/// Trait for converting aggregation results to i64 for output.
832///
833/// This is needed to produce Arrow `RecordBatch` outputs with numeric results.
834pub trait ResultToI64 {
835    /// Converts the result to an i64 value.
836    fn to_i64(&self) -> i64;
837}
838
839impl ResultToI64 for u64 {
840    fn to_i64(&self) -> i64 {
841        i64::try_from(*self).unwrap_or(i64::MAX)
842    }
843}
844
845impl ResultToI64 for i64 {
846    fn to_i64(&self) -> i64 {
847        *self
848    }
849}
850
851impl ResultToI64 for Option<i64> {
852    fn to_i64(&self) -> i64 {
853        self.unwrap_or(0)
854    }
855}
856
857impl ResultToI64 for Option<f64> {
858    fn to_i64(&self) -> i64 {
859        // Standard SQL behavior: truncate float to int
860        #[allow(clippy::cast_possible_truncation)]
861        self.map(|f| f as i64).unwrap_or(0)
862    }
863}
864
865/// Trait for converting aggregation results to Arrow arrays.
866///
867/// This preserves the native type (e.g., `Float64` for AVG) and nullable
868/// semantics (empty windows produce null, not zero).
869pub trait ResultToArrow {
870    /// Converts the result to a single-element Arrow array.
871    fn to_arrow_array(&self) -> Arc<dyn ArrowArray>;
872
873    /// Returns the Arrow [`DataType`] for this result.
874    fn arrow_data_type(&self) -> DataType;
875}
876
877impl ResultToArrow for i64 {
878    fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
879        Arc::new(Int64Array::from(vec![*self]))
880    }
881
882    fn arrow_data_type(&self) -> DataType {
883        DataType::Int64
884    }
885}
886
887impl ResultToArrow for u64 {
888    fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
889        Arc::new(Int64Array::from(vec![
890            i64::try_from(*self).unwrap_or(i64::MAX)
891        ]))
892    }
893
894    fn arrow_data_type(&self) -> DataType {
895        DataType::Int64
896    }
897}
898
899impl ResultToArrow for Option<i64> {
900    fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
901        Arc::new(Int64Array::from(vec![*self]))
902    }
903
904    fn arrow_data_type(&self) -> DataType {
905        DataType::Int64
906    }
907}
908
909impl ResultToArrow for Option<f64> {
910    fn to_arrow_array(&self) -> Arc<dyn ArrowArray> {
911        use arrow_array::Float64Array;
912        Arc::new(Float64Array::from(vec![*self]))
913    }
914
915    fn arrow_data_type(&self) -> DataType {
916        DataType::Float64
917    }
918}
919
920/// Accumulator state for aggregations.
921///
922/// This is the state stored per window in the state store.
923/// Different aggregators store different types of accumulators.
924///
925/// Implementors should derive `rkyv::Archive`, `rkyv::Serialize`, and
926/// `rkyv::Deserialize` for zero-copy serialization on the hot path.
927pub trait Accumulator: Default + Clone + Send {
928    /// The input type for the aggregation.
929    type Input;
930    /// The output type produced by the aggregation.
931    type Output: ResultToI64 + ResultToArrow;
932
933    /// Adds a value to the accumulator.
934    fn add(&mut self, value: Self::Input);
935
936    /// Merges another accumulator into this one.
937    fn merge(&mut self, other: &Self);
938
939    /// Extracts the final result from the accumulator.
940    fn result(&self) -> Self::Output;
941
942    /// Returns true if the accumulator is empty (no values added).
943    fn is_empty(&self) -> bool;
944}
945
946/// Trait for window aggregation functions.
947///
948/// Aggregators define how events are combined within a window.
949/// They must be serializable for checkpointing.
950pub trait Aggregator: Send + Clone {
951    /// The accumulator type used by this aggregator.
952    type Acc: Accumulator;
953
954    /// Creates a new empty accumulator.
955    fn create_accumulator(&self) -> Self::Acc;
956
957    /// Extracts a value from an event to be aggregated.
958    ///
959    /// Returns `None` if the event should be skipped.
960    fn extract(&self, event: &Event) -> Option<<Self::Acc as Accumulator>::Input>;
961
962    /// Returns the Arrow [`DataType`] for this aggregator's output.
963    ///
964    /// Defaults to `Int64`. Override for aggregators that produce different types
965    /// (e.g., `AvgAggregator` returns `Float64`).
966    fn output_data_type(&self) -> DataType {
967        DataType::Int64
968    }
969
970    /// Returns whether the output is nullable (e.g., empty windows produce null).
971    ///
972    /// Defaults to `false`. Override for aggregators like MIN/MAX/AVG.
973    fn output_nullable(&self) -> bool {
974        false
975    }
976
977    /// Extracts all values from a multi-row batch event.
978    ///
979    /// The default implementation calls [`extract()`](Self::extract) and wraps
980    /// the single result. Override this in aggregators that need to process
981    /// all rows in a batch (e.g., `AvgAggregator`).
982    fn extract_batch(&self, event: &Event) -> SmallVec<[<Self::Acc as Accumulator>::Input; 4]> {
983        let mut values = SmallVec::new();
984        if let Some(v) = self.extract(event) {
985            values.push(v);
986        }
987        values
988    }
989}
990
991/// Count aggregator - counts the number of events in a window.
992#[derive(Debug, Clone, Default)]
993pub struct CountAggregator;
994
995/// Accumulator for count aggregation.
996#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
997pub struct CountAccumulator {
998    count: u64,
999}
1000
1001impl CountAggregator {
1002    /// Creates a new count aggregator.
1003    #[must_use]
1004    pub fn new() -> Self {
1005        Self
1006    }
1007}
1008
1009impl Accumulator for CountAccumulator {
1010    type Input = ();
1011    type Output = u64;
1012
1013    fn add(&mut self, _value: ()) {
1014        self.count += 1;
1015    }
1016
1017    fn merge(&mut self, other: &Self) {
1018        self.count += other.count;
1019    }
1020
1021    fn result(&self) -> u64 {
1022        self.count
1023    }
1024
1025    fn is_empty(&self) -> bool {
1026        self.count == 0
1027    }
1028}
1029
1030impl Aggregator for CountAggregator {
1031    type Acc = CountAccumulator;
1032
1033    fn create_accumulator(&self) -> CountAccumulator {
1034        CountAccumulator::default()
1035    }
1036
1037    fn extract(&self, _event: &Event) -> Option<()> {
1038        Some(())
1039    }
1040}
1041
1042/// Sum aggregator - sums i64 values from events.
1043#[derive(Debug, Clone)]
1044pub struct SumAggregator {
1045    /// Column index to sum (0-based)
1046    column_index: usize,
1047}
1048
1049/// Accumulator for sum aggregation.
1050#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1051pub struct SumAccumulator {
1052    sum: i64,
1053    count: u64,
1054}
1055
1056impl SumAggregator {
1057    /// Creates a new sum aggregator for the specified column.
1058    #[must_use]
1059    pub fn new(column_index: usize) -> Self {
1060        Self { column_index }
1061    }
1062}
1063
1064impl Accumulator for SumAccumulator {
1065    type Input = i64;
1066    type Output = i64;
1067
1068    fn add(&mut self, value: i64) {
1069        self.sum += value;
1070        self.count += 1;
1071    }
1072
1073    fn merge(&mut self, other: &Self) {
1074        self.sum += other.sum;
1075        self.count += other.count;
1076    }
1077
1078    fn result(&self) -> i64 {
1079        self.sum
1080    }
1081
1082    fn is_empty(&self) -> bool {
1083        self.count == 0
1084    }
1085}
1086
1087impl Aggregator for SumAggregator {
1088    type Acc = SumAccumulator;
1089
1090    fn create_accumulator(&self) -> SumAccumulator {
1091        SumAccumulator::default()
1092    }
1093
1094    fn extract(&self, event: &Event) -> Option<i64> {
1095        use arrow_array::cast::AsArray;
1096        use arrow_array::types::Int64Type;
1097
1098        let batch = &event.data;
1099        if self.column_index >= batch.num_columns() {
1100            return None;
1101        }
1102
1103        let column = batch.column(self.column_index);
1104        let array = column.as_primitive_opt::<Int64Type>()?;
1105
1106        // Sum all values in the array
1107        Some(array.iter().flatten().sum())
1108    }
1109}
1110
1111/// Min aggregator - tracks minimum i64 value.
1112#[derive(Debug, Clone)]
1113pub struct MinAggregator {
1114    column_index: usize,
1115}
1116
1117/// Accumulator for min aggregation.
1118#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1119pub struct MinAccumulator {
1120    min: Option<i64>,
1121}
1122
1123impl MinAggregator {
1124    /// Creates a new min aggregator for the specified column.
1125    #[must_use]
1126    pub fn new(column_index: usize) -> Self {
1127        Self { column_index }
1128    }
1129}
1130
1131impl Accumulator for MinAccumulator {
1132    type Input = i64;
1133    type Output = Option<i64>;
1134
1135    fn add(&mut self, value: i64) {
1136        self.min = Some(self.min.map_or(value, |m| m.min(value)));
1137    }
1138
1139    fn merge(&mut self, other: &Self) {
1140        if let Some(other_min) = other.min {
1141            self.add(other_min);
1142        }
1143    }
1144
1145    fn result(&self) -> Option<i64> {
1146        self.min
1147    }
1148
1149    fn is_empty(&self) -> bool {
1150        self.min.is_none()
1151    }
1152}
1153
1154impl Aggregator for MinAggregator {
1155    type Acc = MinAccumulator;
1156
1157    fn create_accumulator(&self) -> MinAccumulator {
1158        MinAccumulator::default()
1159    }
1160
1161    fn output_nullable(&self) -> bool {
1162        true
1163    }
1164
1165    fn extract(&self, event: &Event) -> Option<i64> {
1166        use arrow_array::cast::AsArray;
1167        use arrow_array::types::Int64Type;
1168
1169        let batch = &event.data;
1170        if self.column_index >= batch.num_columns() {
1171            return None;
1172        }
1173
1174        let column = batch.column(self.column_index);
1175        let array = column.as_primitive_opt::<Int64Type>()?;
1176
1177        array.iter().flatten().min()
1178    }
1179}
1180
1181/// Max aggregator - tracks maximum i64 value.
1182#[derive(Debug, Clone)]
1183pub struct MaxAggregator {
1184    column_index: usize,
1185}
1186
1187/// Accumulator for max aggregation.
1188#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1189pub struct MaxAccumulator {
1190    max: Option<i64>,
1191}
1192
1193impl MaxAggregator {
1194    /// Creates a new max aggregator for the specified column.
1195    #[must_use]
1196    pub fn new(column_index: usize) -> Self {
1197        Self { column_index }
1198    }
1199}
1200
1201impl Accumulator for MaxAccumulator {
1202    type Input = i64;
1203    type Output = Option<i64>;
1204
1205    fn add(&mut self, value: i64) {
1206        self.max = Some(self.max.map_or(value, |m| m.max(value)));
1207    }
1208
1209    fn merge(&mut self, other: &Self) {
1210        if let Some(other_max) = other.max {
1211            self.add(other_max);
1212        }
1213    }
1214
1215    fn result(&self) -> Option<i64> {
1216        self.max
1217    }
1218
1219    fn is_empty(&self) -> bool {
1220        self.max.is_none()
1221    }
1222}
1223
1224impl Aggregator for MaxAggregator {
1225    type Acc = MaxAccumulator;
1226
1227    fn create_accumulator(&self) -> MaxAccumulator {
1228        MaxAccumulator::default()
1229    }
1230
1231    fn output_nullable(&self) -> bool {
1232        true
1233    }
1234
1235    fn extract(&self, event: &Event) -> Option<i64> {
1236        use arrow_array::cast::AsArray;
1237        use arrow_array::types::Int64Type;
1238
1239        let batch = &event.data;
1240        if self.column_index >= batch.num_columns() {
1241            return None;
1242        }
1243
1244        let column = batch.column(self.column_index);
1245        let array = column.as_primitive_opt::<Int64Type>()?;
1246
1247        array.iter().flatten().max()
1248    }
1249}
1250
1251/// Average aggregator - computes average of i64 values.
1252#[derive(Debug, Clone)]
1253pub struct AvgAggregator {
1254    column_index: usize,
1255}
1256
1257/// Accumulator for average aggregation.
1258#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1259pub struct AvgAccumulator {
1260    sum: i64,
1261    count: u64,
1262}
1263
1264impl AvgAggregator {
1265    /// Creates a new average aggregator for the specified column.
1266    #[must_use]
1267    pub fn new(column_index: usize) -> Self {
1268        Self { column_index }
1269    }
1270}
1271
1272impl Accumulator for AvgAccumulator {
1273    type Input = i64;
1274    type Output = Option<f64>;
1275
1276    fn add(&mut self, value: i64) {
1277        self.sum += value;
1278        self.count += 1;
1279    }
1280
1281    fn merge(&mut self, other: &Self) {
1282        self.sum += other.sum;
1283        self.count += other.count;
1284    }
1285
1286    // Precision loss is acceptable for arithmetic mean
1287    #[allow(clippy::cast_precision_loss)]
1288    fn result(&self) -> Option<f64> {
1289        if self.count == 0 {
1290            None
1291        } else {
1292            Some(self.sum as f64 / self.count as f64)
1293        }
1294    }
1295
1296    fn is_empty(&self) -> bool {
1297        self.count == 0
1298    }
1299}
1300
1301impl Aggregator for AvgAggregator {
1302    type Acc = AvgAccumulator;
1303
1304    fn create_accumulator(&self) -> AvgAccumulator {
1305        AvgAccumulator::default()
1306    }
1307
1308    fn output_data_type(&self) -> DataType {
1309        DataType::Float64
1310    }
1311
1312    fn output_nullable(&self) -> bool {
1313        true
1314    }
1315
1316    fn extract(&self, event: &Event) -> Option<i64> {
1317        use arrow_array::cast::AsArray;
1318        use arrow_array::types::Int64Type;
1319
1320        let batch = &event.data;
1321        if self.column_index >= batch.num_columns() {
1322            return None;
1323        }
1324
1325        let column = batch.column(self.column_index);
1326        let array = column.as_primitive_opt::<Int64Type>()?;
1327
1328        // Return the first value for single-row batches
1329        array.iter().flatten().next()
1330    }
1331
1332    fn extract_batch(&self, event: &Event) -> SmallVec<[i64; 4]> {
1333        use arrow_array::cast::AsArray;
1334        use arrow_array::types::Int64Type;
1335
1336        let batch = &event.data;
1337        if self.column_index >= batch.num_columns() {
1338            return SmallVec::new();
1339        }
1340
1341        let column = batch.column(self.column_index);
1342        let Some(array) = column.as_primitive_opt::<Int64Type>() else {
1343            return SmallVec::new();
1344        };
1345
1346        // Return ALL values from the batch for correct multi-row averaging
1347        array.iter().flatten().collect()
1348    }
1349}
1350
1351// FIRST_VALUE / LAST_VALUE Aggregators
1352
1353/// `FIRST_VALUE` aggregator - returns the first value seen in a window.
1354///
1355/// Tracks the value with the earliest timestamp in the window.
1356/// For deterministic results, uses event timestamp, not arrival order.
1357///
1358/// # Example
1359///
1360/// ```rust,no_run
1361/// use laminar_core::operator::window::FirstValueAggregator;
1362///
1363/// // Track first price by timestamp
1364/// let first_price = FirstValueAggregator::new(0, 1); // price col 0, timestamp col 1
1365/// ```
1366#[derive(Debug, Clone)]
1367pub struct FirstValueAggregator {
1368    /// Column index to extract value from
1369    value_column_index: usize,
1370    /// Column index for event timestamp (for ordering)
1371    timestamp_column_index: usize,
1372}
1373
1374/// Accumulator for `FIRST_VALUE` aggregation.
1375///
1376/// Stores the value with the earliest timestamp seen so far.
1377#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1378#[rkyv(compare(PartialEq), derive(Debug))]
1379pub struct FirstValueAccumulator {
1380    /// The first value seen (None if no values yet)
1381    value: Option<i64>,
1382    /// Timestamp of the first value (for merge ordering)
1383    timestamp: Option<i64>,
1384}
1385
1386impl FirstValueAggregator {
1387    /// Creates a new `FIRST_VALUE` aggregator.
1388    ///
1389    /// # Arguments
1390    ///
1391    /// * `value_column_index` - Column to extract value from
1392    /// * `timestamp_column_index` - Column for event timestamp ordering
1393    #[must_use]
1394    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1395        Self {
1396            value_column_index,
1397            timestamp_column_index,
1398        }
1399    }
1400}
1401
1402impl Accumulator for FirstValueAccumulator {
1403    type Input = (i64, i64); // (value, timestamp)
1404    type Output = Option<i64>;
1405
1406    fn add(&mut self, (value, timestamp): (i64, i64)) {
1407        match self.timestamp {
1408            None => {
1409                // First value
1410                self.value = Some(value);
1411                self.timestamp = Some(timestamp);
1412            }
1413            Some(existing_ts) if timestamp < existing_ts => {
1414                // Earlier timestamp - replace
1415                self.value = Some(value);
1416                self.timestamp = Some(timestamp);
1417            }
1418            _ => {
1419                // Later or equal timestamp - keep existing
1420            }
1421        }
1422    }
1423
1424    fn merge(&mut self, other: &Self) {
1425        match (self.timestamp, other.timestamp) {
1426            (None, Some(_)) => {
1427                self.value = other.value;
1428                self.timestamp = other.timestamp;
1429            }
1430            (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1431                self.value = other.value;
1432                self.timestamp = other.timestamp;
1433            }
1434            _ => {
1435                // Keep self
1436            }
1437        }
1438    }
1439
1440    fn result(&self) -> Option<i64> {
1441        self.value
1442    }
1443
1444    fn is_empty(&self) -> bool {
1445        self.value.is_none()
1446    }
1447}
1448
1449impl Aggregator for FirstValueAggregator {
1450    type Acc = FirstValueAccumulator;
1451
1452    fn create_accumulator(&self) -> FirstValueAccumulator {
1453        FirstValueAccumulator::default()
1454    }
1455
1456    fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1457        use arrow_array::cast::AsArray;
1458        use arrow_array::types::Int64Type;
1459
1460        let batch = &event.data;
1461        if self.value_column_index >= batch.num_columns()
1462            || self.timestamp_column_index >= batch.num_columns()
1463        {
1464            return None;
1465        }
1466
1467        // Extract value
1468        let value_col = batch.column(self.value_column_index);
1469        let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1470        let value = value_array.iter().flatten().next()?;
1471
1472        // Extract timestamp
1473        let ts_col = batch.column(self.timestamp_column_index);
1474        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1475        let timestamp = ts_array.iter().flatten().next()?;
1476
1477        Some((value, timestamp))
1478    }
1479}
1480
1481/// `LAST_VALUE` aggregator - returns the last value seen in a window.
1482///
1483/// Tracks the value with the latest timestamp in the window.
1484/// For deterministic results, uses event timestamp, not arrival order.
1485/// When timestamps are equal, the later arrival wins.
1486///
1487/// # Example
1488///
1489/// ```rust,no_run
1490/// use laminar_core::operator::window::LastValueAggregator;
1491///
1492/// // Track last (closing) price by timestamp
1493/// let last_price = LastValueAggregator::new(0, 1); // price col 0, timestamp col 1
1494/// ```
1495#[derive(Debug, Clone)]
1496pub struct LastValueAggregator {
1497    /// Column index to extract value from
1498    value_column_index: usize,
1499    /// Column index for event timestamp (for ordering)
1500    timestamp_column_index: usize,
1501}
1502
1503/// Accumulator for `LAST_VALUE` aggregation.
1504///
1505/// Stores the value with the latest timestamp seen so far.
1506#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1507#[rkyv(compare(PartialEq), derive(Debug))]
1508pub struct LastValueAccumulator {
1509    /// The last value seen (None if no values yet)
1510    value: Option<i64>,
1511    /// Timestamp of the last value (for merge ordering)
1512    timestamp: Option<i64>,
1513}
1514
1515impl LastValueAggregator {
1516    /// Creates a new `LAST_VALUE` aggregator.
1517    ///
1518    /// # Arguments
1519    ///
1520    /// * `value_column_index` - Column to extract value from
1521    /// * `timestamp_column_index` - Column for event timestamp ordering
1522    #[must_use]
1523    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1524        Self {
1525            value_column_index,
1526            timestamp_column_index,
1527        }
1528    }
1529}
1530
1531impl Accumulator for LastValueAccumulator {
1532    type Input = (i64, i64); // (value, timestamp)
1533    type Output = Option<i64>;
1534
1535    fn add(&mut self, (value, timestamp): (i64, i64)) {
1536        match self.timestamp {
1537            None => {
1538                // First value
1539                self.value = Some(value);
1540                self.timestamp = Some(timestamp);
1541            }
1542            Some(existing_ts) if timestamp > existing_ts => {
1543                // Later timestamp - replace
1544                self.value = Some(value);
1545                self.timestamp = Some(timestamp);
1546            }
1547            Some(existing_ts) if timestamp == existing_ts => {
1548                // Same timestamp - keep latest arrival (replace)
1549                self.value = Some(value);
1550            }
1551            _ => {
1552                // Earlier timestamp - keep existing
1553            }
1554        }
1555    }
1556
1557    fn merge(&mut self, other: &Self) {
1558        match (self.timestamp, other.timestamp) {
1559            (None, Some(_)) => {
1560                self.value = other.value;
1561                self.timestamp = other.timestamp;
1562            }
1563            (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1564                self.value = other.value;
1565                self.timestamp = other.timestamp;
1566            }
1567            (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1568                // Same timestamp - use other (simulate later arrival)
1569                self.value = other.value;
1570            }
1571            _ => {
1572                // Keep self
1573            }
1574        }
1575    }
1576
1577    fn result(&self) -> Option<i64> {
1578        self.value
1579    }
1580
1581    fn is_empty(&self) -> bool {
1582        self.value.is_none()
1583    }
1584}
1585
1586impl Aggregator for LastValueAggregator {
1587    type Acc = LastValueAccumulator;
1588
1589    fn create_accumulator(&self) -> LastValueAccumulator {
1590        LastValueAccumulator::default()
1591    }
1592
1593    fn extract(&self, event: &Event) -> Option<(i64, i64)> {
1594        use arrow_array::cast::AsArray;
1595        use arrow_array::types::Int64Type;
1596
1597        let batch = &event.data;
1598        if self.value_column_index >= batch.num_columns()
1599            || self.timestamp_column_index >= batch.num_columns()
1600        {
1601            return None;
1602        }
1603
1604        // Extract value
1605        let value_col = batch.column(self.value_column_index);
1606        let value_array = value_col.as_primitive_opt::<Int64Type>()?;
1607        let value = value_array.iter().flatten().next()?;
1608
1609        // Extract timestamp
1610        let ts_col = batch.column(self.timestamp_column_index);
1611        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1612        let timestamp = ts_array.iter().flatten().next()?;
1613
1614        Some((value, timestamp))
1615    }
1616}
1617
1618// FIRST_VALUE / LAST_VALUE for Float64
1619
1620/// Accumulator for `FIRST_VALUE` aggregation on f64 values.
1621#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1622#[rkyv(compare(PartialEq), derive(Debug))]
1623pub struct FirstValueF64Accumulator {
1624    /// The first value seen (None if no values yet)
1625    value: Option<i64>, // Store as bits for rkyv compatibility
1626    /// Timestamp of the first value (for merge ordering)
1627    timestamp: Option<i64>,
1628}
1629
1630impl FirstValueF64Accumulator {
1631    /// Gets the result as f64.
1632    #[must_use]
1633    #[allow(clippy::cast_sign_loss)]
1634    pub fn result_f64(&self) -> Option<f64> {
1635        self.value.map(|bits| f64::from_bits(bits as u64))
1636    }
1637}
1638
1639impl Accumulator for FirstValueF64Accumulator {
1640    type Input = (f64, i64); // (value, timestamp)
1641    type Output = Option<f64>;
1642
1643    fn add(&mut self, (value, timestamp): (f64, i64)) {
1644        // SAFETY: We strictly use this as storage bits and convert back via from_bits
1645        #[allow(clippy::cast_possible_wrap)]
1646        let value_bits = value.to_bits() as i64;
1647        match self.timestamp {
1648            None => {
1649                self.value = Some(value_bits);
1650                self.timestamp = Some(timestamp);
1651            }
1652            Some(existing_ts) if timestamp < existing_ts => {
1653                self.value = Some(value_bits);
1654                self.timestamp = Some(timestamp);
1655            }
1656            _ => {}
1657        }
1658    }
1659
1660    fn merge(&mut self, other: &Self) {
1661        match (self.timestamp, other.timestamp) {
1662            (None, Some(_)) => {
1663                self.value = other.value;
1664                self.timestamp = other.timestamp;
1665            }
1666            (Some(self_ts), Some(other_ts)) if other_ts < self_ts => {
1667                self.value = other.value;
1668                self.timestamp = other.timestamp;
1669            }
1670            _ => {}
1671        }
1672    }
1673
1674    #[allow(clippy::cast_sign_loss)]
1675    fn result(&self) -> Option<f64> {
1676        self.value.map(|bits| f64::from_bits(bits as u64))
1677    }
1678
1679    fn is_empty(&self) -> bool {
1680        self.value.is_none()
1681    }
1682}
1683
1684/// `FIRST_VALUE` aggregator for f64 columns.
1685#[derive(Debug, Clone)]
1686pub struct FirstValueF64Aggregator {
1687    /// Column index to extract value from
1688    value_column_index: usize,
1689    /// Column index for event timestamp (for ordering)
1690    timestamp_column_index: usize,
1691}
1692
1693impl FirstValueF64Aggregator {
1694    /// Creates a new `FIRST_VALUE` aggregator for f64 columns.
1695    #[must_use]
1696    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1697        Self {
1698            value_column_index,
1699            timestamp_column_index,
1700        }
1701    }
1702}
1703
1704impl Aggregator for FirstValueF64Aggregator {
1705    type Acc = FirstValueF64Accumulator;
1706
1707    fn create_accumulator(&self) -> FirstValueF64Accumulator {
1708        FirstValueF64Accumulator::default()
1709    }
1710
1711    fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1712        use arrow_array::cast::AsArray;
1713        use arrow_array::types::{Float64Type, Int64Type};
1714
1715        let batch = &event.data;
1716        if self.value_column_index >= batch.num_columns()
1717            || self.timestamp_column_index >= batch.num_columns()
1718        {
1719            return None;
1720        }
1721
1722        // Extract value as f64
1723        let value_col = batch.column(self.value_column_index);
1724        let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1725        let value = value_array.iter().flatten().next()?;
1726
1727        // Extract timestamp
1728        let ts_col = batch.column(self.timestamp_column_index);
1729        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1730        let timestamp = ts_array.iter().flatten().next()?;
1731
1732        Some((value, timestamp))
1733    }
1734}
1735
1736/// Accumulator for `LAST_VALUE` aggregation on f64 values.
1737#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
1738#[rkyv(compare(PartialEq), derive(Debug))]
1739pub struct LastValueF64Accumulator {
1740    /// The last value seen (None if no values yet)
1741    value: Option<i64>, // Store as bits for rkyv compatibility
1742    /// Timestamp of the last value (for merge ordering)
1743    timestamp: Option<i64>,
1744}
1745
1746impl LastValueF64Accumulator {
1747    /// Gets the result as f64.
1748    #[must_use]
1749    #[allow(clippy::cast_sign_loss)]
1750    pub fn result_f64(&self) -> Option<f64> {
1751        self.value.map(|bits| f64::from_bits(bits as u64))
1752    }
1753}
1754
1755impl Accumulator for LastValueF64Accumulator {
1756    type Input = (f64, i64); // (value, timestamp)
1757    type Output = Option<f64>;
1758
1759    fn add(&mut self, (value, timestamp): (f64, i64)) {
1760        // SAFETY: We strictly use this as storage bits and convert back via from_bits
1761        #[allow(clippy::cast_possible_wrap)]
1762        let value_bits = value.to_bits() as i64;
1763        match self.timestamp {
1764            None => {
1765                self.value = Some(value_bits);
1766                self.timestamp = Some(timestamp);
1767            }
1768            Some(existing_ts) if timestamp > existing_ts => {
1769                self.value = Some(value_bits);
1770                self.timestamp = Some(timestamp);
1771            }
1772            Some(existing_ts) if timestamp == existing_ts => {
1773                self.value = Some(value_bits);
1774            }
1775            _ => {}
1776        }
1777    }
1778
1779    fn merge(&mut self, other: &Self) {
1780        match (self.timestamp, other.timestamp) {
1781            (None, Some(_)) => {
1782                self.value = other.value;
1783                self.timestamp = other.timestamp;
1784            }
1785            (Some(self_ts), Some(other_ts)) if other_ts > self_ts => {
1786                self.value = other.value;
1787                self.timestamp = other.timestamp;
1788            }
1789            (Some(self_ts), Some(other_ts)) if other_ts == self_ts => {
1790                self.value = other.value;
1791            }
1792            _ => {}
1793        }
1794    }
1795
1796    #[allow(clippy::cast_sign_loss)]
1797    fn result(&self) -> Option<f64> {
1798        self.value.map(|bits| f64::from_bits(bits as u64))
1799    }
1800
1801    fn is_empty(&self) -> bool {
1802        self.value.is_none()
1803    }
1804}
1805
1806/// `LAST_VALUE` aggregator for f64 columns.
1807#[derive(Debug, Clone)]
1808pub struct LastValueF64Aggregator {
1809    /// Column index to extract value from
1810    value_column_index: usize,
1811    /// Column index for event timestamp (for ordering)
1812    timestamp_column_index: usize,
1813}
1814
1815impl LastValueF64Aggregator {
1816    /// Creates a new `LAST_VALUE` aggregator for f64 columns.
1817    #[must_use]
1818    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
1819        Self {
1820            value_column_index,
1821            timestamp_column_index,
1822        }
1823    }
1824}
1825
1826impl Aggregator for LastValueF64Aggregator {
1827    type Acc = LastValueF64Accumulator;
1828
1829    fn create_accumulator(&self) -> LastValueF64Accumulator {
1830        LastValueF64Accumulator::default()
1831    }
1832
1833    fn extract(&self, event: &Event) -> Option<(f64, i64)> {
1834        use arrow_array::cast::AsArray;
1835        use arrow_array::types::{Float64Type, Int64Type};
1836
1837        let batch = &event.data;
1838        if self.value_column_index >= batch.num_columns()
1839            || self.timestamp_column_index >= batch.num_columns()
1840        {
1841            return None;
1842        }
1843
1844        // Extract value as f64
1845        let value_col = batch.column(self.value_column_index);
1846        let value_array = value_col.as_primitive_opt::<Float64Type>()?;
1847        let value = value_array.iter().flatten().next()?;
1848
1849        // Extract timestamp
1850        let ts_col = batch.column(self.timestamp_column_index);
1851        let ts_array = ts_col.as_primitive_opt::<Int64Type>()?;
1852        let timestamp = ts_array.iter().flatten().next()?;
1853
1854        Some((value, timestamp))
1855    }
1856}
1857
1858// Composite Aggregator & f64 Type Support
1859
1860/// Scalar result type supporting multiple numeric types.
1861///
1862/// Used by [`DynAccumulator`] for dynamic-dispatch aggregation where
1863/// the result type is determined at runtime. This enables composite
1864/// aggregation (multiple aggregates per window) with mixed types.
1865///
1866/// # Example
1867///
1868/// ```rust,no_run
1869/// use laminar_core::operator::window::ScalarResult;
1870///
1871/// let r = ScalarResult::Float64(3.14);
1872/// assert_eq!(r.to_i64_lossy(), 3);
1873/// assert_eq!(r.to_f64_lossy(), 3.14);
1874/// ```
1875#[derive(Debug, Clone, PartialEq)]
1876pub enum ScalarResult {
1877    /// 64-bit signed integer
1878    Int64(i64),
1879    /// 64-bit floating point
1880    Float64(f64),
1881    /// 64-bit unsigned integer
1882    UInt64(u64),
1883    /// Optional 64-bit signed integer
1884    OptionalInt64(Option<i64>),
1885    /// Optional 64-bit floating point
1886    OptionalFloat64(Option<f64>),
1887    /// Null / no value
1888    Null,
1889}
1890
1891impl ScalarResult {
1892    /// Converts to i64, truncating floats and saturating unsigned values.
1893    #[must_use]
1894    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
1895    pub fn to_i64_lossy(&self) -> i64 {
1896        match self {
1897            Self::Int64(v) => *v,
1898            Self::Float64(v) => *v as i64,
1899            Self::UInt64(v) => i64::try_from(*v).unwrap_or(i64::MAX),
1900            Self::OptionalInt64(v) => v.unwrap_or(0),
1901            Self::OptionalFloat64(v) => v.map(|f| f as i64).unwrap_or(0),
1902            Self::Null => 0,
1903        }
1904    }
1905
1906    /// Converts to f64, with potential precision loss for large integers.
1907    #[must_use]
1908    #[allow(clippy::cast_precision_loss)]
1909    pub fn to_f64_lossy(&self) -> f64 {
1910        match self {
1911            Self::Int64(v) => *v as f64,
1912            Self::Float64(v) => *v,
1913            Self::UInt64(v) => *v as f64,
1914            Self::OptionalInt64(v) => v.map(|i| i as f64).unwrap_or(0.0),
1915            Self::OptionalFloat64(v) => v.unwrap_or(0.0),
1916            Self::Null => 0.0,
1917        }
1918    }
1919
1920    /// Returns true if this is a null or None value.
1921    #[must_use]
1922    pub fn is_null(&self) -> bool {
1923        matches!(
1924            self,
1925            Self::Null | Self::OptionalInt64(None) | Self::OptionalFloat64(None)
1926        )
1927    }
1928
1929    /// Returns the Arrow [`DataType`] for this result.
1930    #[must_use]
1931    pub fn data_type(&self) -> DataType {
1932        match self {
1933            Self::Int64(_) | Self::OptionalInt64(_) => DataType::Int64,
1934            Self::Float64(_) | Self::OptionalFloat64(_) => DataType::Float64,
1935            Self::UInt64(_) => DataType::UInt64,
1936            Self::Null => DataType::Null,
1937        }
1938    }
1939}
1940
1941/// Dynamic accumulator trait for composite aggregation.
1942///
1943/// Unlike the static [`Accumulator`] trait, this works with events directly
1944/// and returns [`ScalarResult`] for type-flexible output. Used by
1945/// [`CompositeAggregator`] to combine multiple aggregates per window.
1946///
1947/// # Ring Architecture
1948///
1949/// Dynamic dispatch has overhead (~2-5ns per vtable call), so composite
1950/// aggregation is intended for Ring 1 workloads. Ring 0 continues to use
1951/// the static [`Aggregator`] + [`Accumulator`] path.
1952pub trait DynAccumulator: Send {
1953    /// Adds an event to the accumulator.
1954    fn add_event(&mut self, event: &Event);
1955
1956    /// Merges another accumulator of the same type into this one.
1957    ///
1958    /// # Panics
1959    ///
1960    /// May panic if `other` is not the same concrete type.
1961    fn merge_dyn(&mut self, other: &dyn DynAccumulator);
1962
1963    /// Returns the current aggregate result.
1964    fn result_scalar(&self) -> ScalarResult;
1965
1966    /// Returns true if no values have been accumulated.
1967    fn is_empty(&self) -> bool;
1968
1969    /// Creates a boxed clone of this accumulator.
1970    fn clone_box(&self) -> Box<dyn DynAccumulator>;
1971
1972    /// Serializes the accumulator state to bytes (for checkpointing).
1973    fn serialize(&self) -> Vec<u8>;
1974
1975    /// Returns the Arrow field descriptor for this accumulator's output.
1976    fn result_field(&self) -> Field;
1977
1978    /// Returns a type tag for deserialization dispatch.
1979    fn type_tag(&self) -> &'static str;
1980
1981    /// Returns self as `Any` for downcasting (used by `DataFusion` bridge).
1982    fn as_any(&self) -> &dyn std::any::Any;
1983}
1984
1985/// Factory trait for creating [`DynAccumulator`] instances.
1986///
1987/// Each factory corresponds to one aggregate function (e.g., SUM, COUNT).
1988/// The [`CompositeAggregator`] holds multiple factories.
1989pub trait DynAggregatorFactory: Send + Sync {
1990    /// Creates a new empty accumulator.
1991    fn create_accumulator(&self) -> Box<dyn DynAccumulator>;
1992
1993    /// Returns the Arrow field descriptor for results.
1994    fn result_field(&self) -> Field;
1995
1996    /// Creates a boxed clone of this factory.
1997    fn clone_box(&self) -> Box<dyn DynAggregatorFactory>;
1998
1999    /// Returns a type tag for deserialization dispatch.
2000    fn type_tag(&self) -> &'static str;
2001}
2002
2003// ── f64 Aggregators ─────────────────────────────────────────────────────────
2004
2005/// Sum aggregator for f64 columns.
2006#[derive(Debug, Clone)]
2007pub struct SumF64Aggregator {
2008    /// Column index to sum
2009    column_index: usize,
2010}
2011
2012/// Accumulator for f64 sum aggregation.
2013#[derive(Debug, Clone, Default)]
2014pub struct SumF64Accumulator {
2015    /// Running sum
2016    sum: f64,
2017    /// Count of values for `is_empty` check
2018    count: u64,
2019}
2020
2021impl SumF64Aggregator {
2022    /// Creates a new f64 sum aggregator for the specified column.
2023    #[must_use]
2024    pub fn new(column_index: usize) -> Self {
2025        Self { column_index }
2026    }
2027
2028    /// Returns the column index.
2029    #[must_use]
2030    pub fn column_index(&self) -> usize {
2031        self.column_index
2032    }
2033}
2034
2035impl SumF64Accumulator {
2036    /// Returns the current sum.
2037    #[must_use]
2038    pub fn sum(&self) -> f64 {
2039        self.sum
2040    }
2041}
2042
2043impl DynAccumulator for SumF64Accumulator {
2044    fn add_event(&mut self, event: &Event) {
2045        use arrow_array::cast::AsArray;
2046        use arrow_array::types::Float64Type;
2047
2048        // Extract from first column by default (factory sets column_index)
2049        // Note: column_index is embedded in the accumulator at construction
2050        let batch = &event.data;
2051        if batch.num_columns() == 0 {
2052            return;
2053        }
2054        // Try first column as f64
2055        if let Some(array) = batch.column(0).as_primitive_opt::<Float64Type>() {
2056            for val in array.iter().flatten() {
2057                self.sum += val;
2058                self.count += 1;
2059            }
2060        }
2061    }
2062
2063    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2064        let data = other.serialize();
2065        if let (Some(sum_bytes), Some(count_bytes)) = (
2066            data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2067            data.get(8..16).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2068        ) {
2069            self.sum += f64::from_le_bytes(sum_bytes);
2070            self.count += u64::from_le_bytes(count_bytes);
2071        }
2072    }
2073
2074    fn result_scalar(&self) -> ScalarResult {
2075        if self.count == 0 {
2076            ScalarResult::Null
2077        } else {
2078            ScalarResult::Float64(self.sum)
2079        }
2080    }
2081
2082    fn is_empty(&self) -> bool {
2083        self.count == 0
2084    }
2085
2086    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2087        Box::new(self.clone())
2088    }
2089
2090    fn serialize(&self) -> Vec<u8> {
2091        let mut buf = Vec::with_capacity(16);
2092        buf.extend_from_slice(&self.sum.to_le_bytes());
2093        buf.extend_from_slice(&self.count.to_le_bytes());
2094        buf
2095    }
2096
2097    fn result_field(&self) -> Field {
2098        Field::new("sum_f64", DataType::Float64, true)
2099    }
2100
2101    fn type_tag(&self) -> &'static str {
2102        "sum_f64"
2103    }
2104
2105    fn as_any(&self) -> &dyn std::any::Any {
2106        self
2107    }
2108}
2109
2110/// Factory for [`SumF64Accumulator`].
2111#[derive(Debug, Clone)]
2112pub struct SumF64Factory {
2113    /// Column index to sum
2114    column_index: usize,
2115    /// Output field name
2116    field_name: String,
2117}
2118
2119impl SumF64Factory {
2120    /// Creates a new f64 sum factory.
2121    #[must_use]
2122    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2123        Self {
2124            column_index,
2125            field_name: field_name.into(),
2126        }
2127    }
2128}
2129
2130impl DynAggregatorFactory for SumF64Factory {
2131    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2132        Box::new(SumF64IndexedAccumulator::new(self.column_index))
2133    }
2134
2135    fn result_field(&self) -> Field {
2136        Field::new(&self.field_name, DataType::Float64, true)
2137    }
2138
2139    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2140        Box::new(self.clone())
2141    }
2142
2143    fn type_tag(&self) -> &'static str {
2144        "sum_f64"
2145    }
2146}
2147
2148/// f64 sum accumulator with embedded column index.
2149#[derive(Debug, Clone)]
2150pub struct SumF64IndexedAccumulator {
2151    /// Column index to extract from
2152    column_index: usize,
2153    /// Running sum
2154    sum: f64,
2155    /// Count of values
2156    count: u64,
2157}
2158
2159impl SumF64IndexedAccumulator {
2160    /// Creates a new indexed sum accumulator.
2161    #[must_use]
2162    pub fn new(column_index: usize) -> Self {
2163        Self {
2164            column_index,
2165            sum: 0.0,
2166            count: 0,
2167        }
2168    }
2169}
2170
2171impl DynAccumulator for SumF64IndexedAccumulator {
2172    fn add_event(&mut self, event: &Event) {
2173        use arrow_array::cast::AsArray;
2174        use arrow_array::types::Float64Type;
2175
2176        let batch = &event.data;
2177        if self.column_index >= batch.num_columns() {
2178            return;
2179        }
2180        if let Some(array) = batch
2181            .column(self.column_index)
2182            .as_primitive_opt::<Float64Type>()
2183        {
2184            for val in array.iter().flatten() {
2185                self.sum += val;
2186                self.count += 1;
2187            }
2188        }
2189    }
2190
2191    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2192        let data = other.serialize();
2193        if let (Some(sum_bytes), Some(count_bytes)) = (
2194            data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2195            data.get(8..16).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2196        ) {
2197            self.sum += f64::from_le_bytes(sum_bytes);
2198            self.count += u64::from_le_bytes(count_bytes);
2199        }
2200    }
2201
2202    fn result_scalar(&self) -> ScalarResult {
2203        if self.count == 0 {
2204            ScalarResult::Null
2205        } else {
2206            ScalarResult::Float64(self.sum)
2207        }
2208    }
2209
2210    fn is_empty(&self) -> bool {
2211        self.count == 0
2212    }
2213
2214    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2215        Box::new(self.clone())
2216    }
2217
2218    fn serialize(&self) -> Vec<u8> {
2219        let mut buf = Vec::with_capacity(16);
2220        buf.extend_from_slice(&self.sum.to_le_bytes());
2221        buf.extend_from_slice(&self.count.to_le_bytes());
2222        buf
2223    }
2224
2225    fn result_field(&self) -> Field {
2226        Field::new("sum_f64", DataType::Float64, true)
2227    }
2228
2229    fn type_tag(&self) -> &'static str {
2230        "sum_f64"
2231    }
2232
2233    fn as_any(&self) -> &dyn std::any::Any {
2234        self
2235    }
2236}
2237
2238/// Min aggregator for f64 columns.
2239#[derive(Debug, Clone)]
2240pub struct MinF64Factory {
2241    /// Column index
2242    column_index: usize,
2243    /// Output field name
2244    field_name: String,
2245}
2246
2247impl MinF64Factory {
2248    /// Creates a new f64 min factory.
2249    #[must_use]
2250    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2251        Self {
2252            column_index,
2253            field_name: field_name.into(),
2254        }
2255    }
2256}
2257
2258impl DynAggregatorFactory for MinF64Factory {
2259    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2260        Box::new(MinF64IndexedAccumulator::new(self.column_index))
2261    }
2262
2263    fn result_field(&self) -> Field {
2264        Field::new(&self.field_name, DataType::Float64, true)
2265    }
2266
2267    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2268        Box::new(self.clone())
2269    }
2270
2271    fn type_tag(&self) -> &'static str {
2272        "min_f64"
2273    }
2274}
2275
2276/// f64 min accumulator with embedded column index.
2277#[derive(Debug, Clone)]
2278pub struct MinF64IndexedAccumulator {
2279    /// Column index
2280    column_index: usize,
2281    /// Current minimum
2282    min: Option<f64>,
2283}
2284
2285impl MinF64IndexedAccumulator {
2286    /// Creates a new indexed min accumulator.
2287    #[must_use]
2288    pub fn new(column_index: usize) -> Self {
2289        Self {
2290            column_index,
2291            min: None,
2292        }
2293    }
2294}
2295
2296impl DynAccumulator for MinF64IndexedAccumulator {
2297    fn add_event(&mut self, event: &Event) {
2298        use arrow_array::cast::AsArray;
2299        use arrow_array::types::Float64Type;
2300
2301        let batch = &event.data;
2302        if self.column_index >= batch.num_columns() {
2303            return;
2304        }
2305        if let Some(array) = batch
2306            .column(self.column_index)
2307            .as_primitive_opt::<Float64Type>()
2308        {
2309            for val in array.iter().flatten() {
2310                self.min = Some(self.min.map_or(val, |m: f64| m.min(val)));
2311            }
2312        }
2313    }
2314
2315    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2316        let data = other.serialize();
2317        if data.len() >= 9 && data[0] == 1 {
2318            if let Ok(bytes) = <[u8; 8]>::try_from(&data[1..9]) {
2319                let other_min = f64::from_le_bytes(bytes);
2320                self.min = Some(self.min.map_or(other_min, |m: f64| m.min(other_min)));
2321            }
2322        }
2323    }
2324
2325    fn result_scalar(&self) -> ScalarResult {
2326        ScalarResult::OptionalFloat64(self.min)
2327    }
2328
2329    fn is_empty(&self) -> bool {
2330        self.min.is_none()
2331    }
2332
2333    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2334        Box::new(self.clone())
2335    }
2336
2337    fn serialize(&self) -> Vec<u8> {
2338        match self.min {
2339            Some(v) => {
2340                let mut buf = Vec::with_capacity(9);
2341                buf.push(1); // has value marker
2342                buf.extend_from_slice(&v.to_le_bytes());
2343                buf
2344            }
2345            None => vec![0],
2346        }
2347    }
2348
2349    fn result_field(&self) -> Field {
2350        Field::new("min_f64", DataType::Float64, true)
2351    }
2352
2353    fn type_tag(&self) -> &'static str {
2354        "min_f64"
2355    }
2356
2357    fn as_any(&self) -> &dyn std::any::Any {
2358        self
2359    }
2360}
2361
2362/// Max aggregator for f64 columns.
2363#[derive(Debug, Clone)]
2364pub struct MaxF64Factory {
2365    /// Column index
2366    column_index: usize,
2367    /// Output field name
2368    field_name: String,
2369}
2370
2371impl MaxF64Factory {
2372    /// Creates a new f64 max factory.
2373    #[must_use]
2374    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2375        Self {
2376            column_index,
2377            field_name: field_name.into(),
2378        }
2379    }
2380}
2381
2382impl DynAggregatorFactory for MaxF64Factory {
2383    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2384        Box::new(MaxF64IndexedAccumulator::new(self.column_index))
2385    }
2386
2387    fn result_field(&self) -> Field {
2388        Field::new(&self.field_name, DataType::Float64, true)
2389    }
2390
2391    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2392        Box::new(self.clone())
2393    }
2394
2395    fn type_tag(&self) -> &'static str {
2396        "max_f64"
2397    }
2398}
2399
2400/// f64 max accumulator with embedded column index.
2401#[derive(Debug, Clone)]
2402pub struct MaxF64IndexedAccumulator {
2403    /// Column index
2404    column_index: usize,
2405    /// Current maximum
2406    max: Option<f64>,
2407}
2408
2409impl MaxF64IndexedAccumulator {
2410    /// Creates a new indexed max accumulator.
2411    #[must_use]
2412    pub fn new(column_index: usize) -> Self {
2413        Self {
2414            column_index,
2415            max: None,
2416        }
2417    }
2418}
2419
2420impl DynAccumulator for MaxF64IndexedAccumulator {
2421    fn add_event(&mut self, event: &Event) {
2422        use arrow_array::cast::AsArray;
2423        use arrow_array::types::Float64Type;
2424
2425        let batch = &event.data;
2426        if self.column_index >= batch.num_columns() {
2427            return;
2428        }
2429        if let Some(array) = batch
2430            .column(self.column_index)
2431            .as_primitive_opt::<Float64Type>()
2432        {
2433            for val in array.iter().flatten() {
2434                self.max = Some(self.max.map_or(val, |m: f64| m.max(val)));
2435            }
2436        }
2437    }
2438
2439    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2440        let data = other.serialize();
2441        if data.len() >= 9 && data[0] == 1 {
2442            if let Ok(bytes) = <[u8; 8]>::try_from(&data[1..9]) {
2443                let other_max = f64::from_le_bytes(bytes);
2444                self.max = Some(self.max.map_or(other_max, |m: f64| m.max(other_max)));
2445            }
2446        }
2447    }
2448
2449    fn result_scalar(&self) -> ScalarResult {
2450        ScalarResult::OptionalFloat64(self.max)
2451    }
2452
2453    fn is_empty(&self) -> bool {
2454        self.max.is_none()
2455    }
2456
2457    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2458        Box::new(self.clone())
2459    }
2460
2461    fn serialize(&self) -> Vec<u8> {
2462        match self.max {
2463            Some(v) => {
2464                let mut buf = Vec::with_capacity(9);
2465                buf.push(1);
2466                buf.extend_from_slice(&v.to_le_bytes());
2467                buf
2468            }
2469            None => vec![0],
2470        }
2471    }
2472
2473    fn result_field(&self) -> Field {
2474        Field::new("max_f64", DataType::Float64, true)
2475    }
2476
2477    fn type_tag(&self) -> &'static str {
2478        "max_f64"
2479    }
2480
2481    fn as_any(&self) -> &dyn std::any::Any {
2482        self
2483    }
2484}
2485
2486/// Avg aggregator for f64 columns.
2487#[derive(Debug, Clone)]
2488pub struct AvgF64Factory {
2489    /// Column index
2490    column_index: usize,
2491    /// Output field name
2492    field_name: String,
2493}
2494
2495impl AvgF64Factory {
2496    /// Creates a new f64 avg factory.
2497    #[must_use]
2498    pub fn new(column_index: usize, field_name: impl Into<String>) -> Self {
2499        Self {
2500            column_index,
2501            field_name: field_name.into(),
2502        }
2503    }
2504}
2505
2506impl DynAggregatorFactory for AvgF64Factory {
2507    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2508        Box::new(AvgF64IndexedAccumulator::new(self.column_index))
2509    }
2510
2511    fn result_field(&self) -> Field {
2512        Field::new(&self.field_name, DataType::Float64, true)
2513    }
2514
2515    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2516        Box::new(self.clone())
2517    }
2518
2519    fn type_tag(&self) -> &'static str {
2520        "avg_f64"
2521    }
2522}
2523
2524/// f64 avg accumulator with embedded column index.
2525#[derive(Debug, Clone)]
2526pub struct AvgF64IndexedAccumulator {
2527    /// Column index
2528    column_index: usize,
2529    /// Running sum
2530    sum: f64,
2531    /// Count
2532    count: u64,
2533}
2534
2535impl AvgF64IndexedAccumulator {
2536    /// Creates a new indexed avg accumulator.
2537    #[must_use]
2538    pub fn new(column_index: usize) -> Self {
2539        Self {
2540            column_index,
2541            sum: 0.0,
2542            count: 0,
2543        }
2544    }
2545}
2546
2547impl DynAccumulator for AvgF64IndexedAccumulator {
2548    fn add_event(&mut self, event: &Event) {
2549        use arrow_array::cast::AsArray;
2550        use arrow_array::types::Float64Type;
2551
2552        let batch = &event.data;
2553        if self.column_index >= batch.num_columns() {
2554            return;
2555        }
2556        if let Some(array) = batch
2557            .column(self.column_index)
2558            .as_primitive_opt::<Float64Type>()
2559        {
2560            for val in array.iter().flatten() {
2561                self.sum += val;
2562                self.count += 1;
2563            }
2564        }
2565    }
2566
2567    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2568        let data = other.serialize();
2569        if let (Some(sum_bytes), Some(count_bytes)) = (
2570            data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2571            data.get(8..16).and_then(|s| <[u8; 8]>::try_from(s).ok()),
2572        ) {
2573            self.sum += f64::from_le_bytes(sum_bytes);
2574            self.count += u64::from_le_bytes(count_bytes);
2575        }
2576    }
2577
2578    // Precision loss is acceptable for arithmetic mean
2579    #[allow(clippy::cast_precision_loss)]
2580    fn result_scalar(&self) -> ScalarResult {
2581        if self.count == 0 {
2582            ScalarResult::Null
2583        } else {
2584            ScalarResult::Float64(self.sum / self.count as f64)
2585        }
2586    }
2587
2588    fn is_empty(&self) -> bool {
2589        self.count == 0
2590    }
2591
2592    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2593        Box::new(self.clone())
2594    }
2595
2596    fn serialize(&self) -> Vec<u8> {
2597        let mut buf = Vec::with_capacity(16);
2598        buf.extend_from_slice(&self.sum.to_le_bytes());
2599        buf.extend_from_slice(&self.count.to_le_bytes());
2600        buf
2601    }
2602
2603    fn result_field(&self) -> Field {
2604        Field::new("avg_f64", DataType::Float64, true)
2605    }
2606
2607    fn type_tag(&self) -> &'static str {
2608        "avg_f64"
2609    }
2610
2611    fn as_any(&self) -> &dyn std::any::Any {
2612        self
2613    }
2614}
2615
2616// ── Count DynAccumulator ────────────────────────────────────────────────────
2617
2618/// Count factory for [`DynAccumulator`].
2619#[derive(Debug, Clone)]
2620pub struct CountDynFactory {
2621    /// Output field name
2622    field_name: String,
2623}
2624
2625impl CountDynFactory {
2626    /// Creates a new count factory.
2627    #[must_use]
2628    pub fn new(field_name: impl Into<String>) -> Self {
2629        Self {
2630            field_name: field_name.into(),
2631        }
2632    }
2633}
2634
2635impl DynAggregatorFactory for CountDynFactory {
2636    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2637        Box::new(CountDynAccumulator::default())
2638    }
2639
2640    fn result_field(&self) -> Field {
2641        Field::new(&self.field_name, DataType::Int64, false)
2642    }
2643
2644    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2645        Box::new(self.clone())
2646    }
2647
2648    fn type_tag(&self) -> &'static str {
2649        "count"
2650    }
2651}
2652
2653/// Count accumulator implementing [`DynAccumulator`].
2654#[derive(Debug, Clone, Default)]
2655pub struct CountDynAccumulator {
2656    count: u64,
2657}
2658
2659impl DynAccumulator for CountDynAccumulator {
2660    fn add_event(&mut self, event: &Event) {
2661        let rows = event.data.num_rows();
2662        self.count += rows as u64;
2663    }
2664
2665    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2666        let data = other.serialize();
2667        if let Some(bytes) = data.get(..8).and_then(|s| <[u8; 8]>::try_from(s).ok()) {
2668            self.count += u64::from_le_bytes(bytes);
2669        }
2670    }
2671
2672    fn result_scalar(&self) -> ScalarResult {
2673        ScalarResult::Int64(i64::try_from(self.count).unwrap_or(i64::MAX))
2674    }
2675
2676    fn is_empty(&self) -> bool {
2677        self.count == 0
2678    }
2679
2680    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2681        Box::new(self.clone())
2682    }
2683
2684    fn serialize(&self) -> Vec<u8> {
2685        self.count.to_le_bytes().to_vec()
2686    }
2687
2688    fn result_field(&self) -> Field {
2689        Field::new("count", DataType::Int64, false)
2690    }
2691
2692    fn type_tag(&self) -> &'static str {
2693        "count"
2694    }
2695
2696    fn as_any(&self) -> &dyn std::any::Any {
2697        self
2698    }
2699}
2700
2701// ── FirstValue / LastValue DynAccumulator ───────────────────────────────────
2702
2703/// `FIRST_VALUE` factory for f64 columns via [`DynAccumulator`].
2704#[derive(Debug, Clone)]
2705pub struct FirstValueF64DynFactory {
2706    /// Column index to extract value from
2707    value_column_index: usize,
2708    /// Column index for event timestamp
2709    timestamp_column_index: usize,
2710    /// Output field name
2711    field_name: String,
2712}
2713
2714impl FirstValueF64DynFactory {
2715    /// Creates a new `FIRST_VALUE` factory for f64 columns.
2716    #[must_use]
2717    pub fn new(
2718        value_column_index: usize,
2719        timestamp_column_index: usize,
2720        field_name: impl Into<String>,
2721    ) -> Self {
2722        Self {
2723            value_column_index,
2724            timestamp_column_index,
2725            field_name: field_name.into(),
2726        }
2727    }
2728}
2729
2730impl DynAggregatorFactory for FirstValueF64DynFactory {
2731    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2732        Box::new(FirstValueF64DynAccumulator::new(
2733            self.value_column_index,
2734            self.timestamp_column_index,
2735        ))
2736    }
2737
2738    fn result_field(&self) -> Field {
2739        Field::new(&self.field_name, DataType::Float64, true)
2740    }
2741
2742    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2743        Box::new(self.clone())
2744    }
2745
2746    fn type_tag(&self) -> &'static str {
2747        "first_value_f64"
2748    }
2749}
2750
2751/// `FIRST_VALUE` accumulator for f64 columns via [`DynAccumulator`].
2752#[derive(Debug, Clone)]
2753pub struct FirstValueF64DynAccumulator {
2754    value_column_index: usize,
2755    timestamp_column_index: usize,
2756    value: Option<f64>,
2757    timestamp: Option<i64>,
2758}
2759
2760impl FirstValueF64DynAccumulator {
2761    /// Creates a new `FIRST_VALUE` dyn accumulator.
2762    #[must_use]
2763    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2764        Self {
2765            value_column_index,
2766            timestamp_column_index,
2767            value: None,
2768            timestamp: None,
2769        }
2770    }
2771}
2772
2773impl DynAccumulator for FirstValueF64DynAccumulator {
2774    fn add_event(&mut self, event: &Event) {
2775        use arrow_array::cast::AsArray;
2776        use arrow_array::types::{Float64Type, Int64Type};
2777
2778        let batch = &event.data;
2779        if self.value_column_index >= batch.num_columns()
2780            || self.timestamp_column_index >= batch.num_columns()
2781        {
2782            return;
2783        }
2784
2785        let val_col = batch.column(self.value_column_index);
2786        let ts_col = batch.column(self.timestamp_column_index);
2787
2788        let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2789            return;
2790        };
2791        let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2792            return;
2793        };
2794
2795        for i in 0..batch.num_rows() {
2796            if val_array.is_null(i) || ts_array.is_null(i) {
2797                continue;
2798            }
2799            let val = val_array.value(i);
2800            let ts = ts_array.value(i);
2801
2802            match self.timestamp {
2803                None => {
2804                    self.value = Some(val);
2805                    self.timestamp = Some(ts);
2806                }
2807                Some(existing_ts) if ts < existing_ts => {
2808                    self.value = Some(val);
2809                    self.timestamp = Some(ts);
2810                }
2811                _ => {}
2812            }
2813        }
2814    }
2815
2816    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2817        let data = other.serialize();
2818        if data.len() >= 17 && data[0] == 1 {
2819            let (Some(val_bytes), Some(ts_bytes)) = (
2820                <[u8; 8]>::try_from(&data[1..9]).ok(),
2821                <[u8; 8]>::try_from(&data[9..17]).ok(),
2822            ) else {
2823                return;
2824            };
2825            let other_val = f64::from_le_bytes(val_bytes);
2826            let other_ts = i64::from_le_bytes(ts_bytes);
2827            match self.timestamp {
2828                None => {
2829                    self.value = Some(other_val);
2830                    self.timestamp = Some(other_ts);
2831                }
2832                Some(self_ts) if other_ts < self_ts => {
2833                    self.value = Some(other_val);
2834                    self.timestamp = Some(other_ts);
2835                }
2836                _ => {}
2837            }
2838        }
2839    }
2840
2841    fn result_scalar(&self) -> ScalarResult {
2842        ScalarResult::OptionalFloat64(self.value)
2843    }
2844
2845    fn is_empty(&self) -> bool {
2846        self.value.is_none()
2847    }
2848
2849    fn clone_box(&self) -> Box<dyn DynAccumulator> {
2850        Box::new(self.clone())
2851    }
2852
2853    fn serialize(&self) -> Vec<u8> {
2854        match (self.value, self.timestamp) {
2855            (Some(v), Some(ts)) => {
2856                let mut buf = Vec::with_capacity(17);
2857                buf.push(1);
2858                buf.extend_from_slice(&v.to_le_bytes());
2859                buf.extend_from_slice(&ts.to_le_bytes());
2860                buf
2861            }
2862            _ => vec![0],
2863        }
2864    }
2865
2866    fn result_field(&self) -> Field {
2867        Field::new("first_value_f64", DataType::Float64, true)
2868    }
2869
2870    fn type_tag(&self) -> &'static str {
2871        "first_value_f64"
2872    }
2873
2874    fn as_any(&self) -> &dyn std::any::Any {
2875        self
2876    }
2877}
2878
2879/// `LAST_VALUE` factory for f64 columns via [`DynAccumulator`].
2880#[derive(Debug, Clone)]
2881pub struct LastValueF64DynFactory {
2882    /// Column index to extract value from
2883    value_column_index: usize,
2884    /// Column index for event timestamp
2885    timestamp_column_index: usize,
2886    /// Output field name
2887    field_name: String,
2888}
2889
2890impl LastValueF64DynFactory {
2891    /// Creates a new `LAST_VALUE` factory for f64 columns.
2892    #[must_use]
2893    pub fn new(
2894        value_column_index: usize,
2895        timestamp_column_index: usize,
2896        field_name: impl Into<String>,
2897    ) -> Self {
2898        Self {
2899            value_column_index,
2900            timestamp_column_index,
2901            field_name: field_name.into(),
2902        }
2903    }
2904}
2905
2906impl DynAggregatorFactory for LastValueF64DynFactory {
2907    fn create_accumulator(&self) -> Box<dyn DynAccumulator> {
2908        Box::new(LastValueF64DynAccumulator::new(
2909            self.value_column_index,
2910            self.timestamp_column_index,
2911        ))
2912    }
2913
2914    fn result_field(&self) -> Field {
2915        Field::new(&self.field_name, DataType::Float64, true)
2916    }
2917
2918    fn clone_box(&self) -> Box<dyn DynAggregatorFactory> {
2919        Box::new(self.clone())
2920    }
2921
2922    fn type_tag(&self) -> &'static str {
2923        "last_value_f64"
2924    }
2925}
2926
2927/// `LAST_VALUE` accumulator for f64 columns via [`DynAccumulator`].
2928#[derive(Debug, Clone)]
2929pub struct LastValueF64DynAccumulator {
2930    value_column_index: usize,
2931    timestamp_column_index: usize,
2932    value: Option<f64>,
2933    timestamp: Option<i64>,
2934}
2935
2936impl LastValueF64DynAccumulator {
2937    /// Creates a new `LAST_VALUE` dyn accumulator.
2938    #[must_use]
2939    pub fn new(value_column_index: usize, timestamp_column_index: usize) -> Self {
2940        Self {
2941            value_column_index,
2942            timestamp_column_index,
2943            value: None,
2944            timestamp: None,
2945        }
2946    }
2947}
2948
2949impl DynAccumulator for LastValueF64DynAccumulator {
2950    fn add_event(&mut self, event: &Event) {
2951        use arrow_array::cast::AsArray;
2952        use arrow_array::types::{Float64Type, Int64Type};
2953
2954        let batch = &event.data;
2955        if self.value_column_index >= batch.num_columns()
2956            || self.timestamp_column_index >= batch.num_columns()
2957        {
2958            return;
2959        }
2960
2961        let val_col = batch.column(self.value_column_index);
2962        let ts_col = batch.column(self.timestamp_column_index);
2963
2964        let Some(val_array) = val_col.as_primitive_opt::<Float64Type>() else {
2965            return;
2966        };
2967        let Some(ts_array) = ts_col.as_primitive_opt::<Int64Type>() else {
2968            return;
2969        };
2970
2971        for i in 0..batch.num_rows() {
2972            if val_array.is_null(i) || ts_array.is_null(i) {
2973                continue;
2974            }
2975            let val = val_array.value(i);
2976            let ts = ts_array.value(i);
2977
2978            match self.timestamp {
2979                None => {
2980                    self.value = Some(val);
2981                    self.timestamp = Some(ts);
2982                }
2983                Some(existing_ts) if ts >= existing_ts => {
2984                    self.value = Some(val);
2985                    self.timestamp = Some(ts);
2986                }
2987                _ => {}
2988            }
2989        }
2990    }
2991
2992    fn merge_dyn(&mut self, other: &dyn DynAccumulator) {
2993        let data = other.serialize();
2994        if data.len() >= 17 && data[0] == 1 {
2995            let (Some(val_bytes), Some(ts_bytes)) = (
2996                <[u8; 8]>::try_from(&data[1..9]).ok(),
2997                <[u8; 8]>::try_from(&data[9..17]).ok(),
2998            ) else {
2999                return;
3000            };
3001            let other_val = f64::from_le_bytes(val_bytes);
3002            let other_ts = i64::from_le_bytes(ts_bytes);
3003            match self.timestamp {
3004                None => {
3005                    self.value = Some(other_val);
3006                    self.timestamp = Some(other_ts);
3007                }
3008                Some(self_ts) if other_ts >= self_ts => {
3009                    self.value = Some(other_val);
3010                    self.timestamp = Some(other_ts);
3011                }
3012                _ => {}
3013            }
3014        }
3015    }
3016
3017    fn result_scalar(&self) -> ScalarResult {
3018        ScalarResult::OptionalFloat64(self.value)
3019    }
3020
3021    fn is_empty(&self) -> bool {
3022        self.value.is_none()
3023    }
3024
3025    fn clone_box(&self) -> Box<dyn DynAccumulator> {
3026        Box::new(self.clone())
3027    }
3028
3029    fn serialize(&self) -> Vec<u8> {
3030        match (self.value, self.timestamp) {
3031            (Some(v), Some(ts)) => {
3032                let mut buf = Vec::with_capacity(17);
3033                buf.push(1);
3034                buf.extend_from_slice(&v.to_le_bytes());
3035                buf.extend_from_slice(&ts.to_le_bytes());
3036                buf
3037            }
3038            _ => vec![0],
3039        }
3040    }
3041
3042    fn result_field(&self) -> Field {
3043        Field::new("last_value_f64", DataType::Float64, true)
3044    }
3045
3046    fn type_tag(&self) -> &'static str {
3047        "last_value_f64"
3048    }
3049
3050    fn as_any(&self) -> &dyn std::any::Any {
3051        self
3052    }
3053}
3054
3055// ── Composite Aggregator ────────────────────────────────────────────────────
3056
3057/// Composite aggregator combining multiple [`DynAggregatorFactory`] instances.
3058///
3059/// Produces multi-column output: `window_start, window_end, field_0, field_1, ...`
3060///
3061/// # Example
3062///
3063/// ```rust,no_run
3064/// use laminar_core::operator::window::{
3065///     CompositeAggregator, CountDynFactory, MaxF64Factory, MinF64Factory,
3066/// };
3067///
3068/// let agg = CompositeAggregator::new(vec![
3069///     Box::new(CountDynFactory::new("trade_count")),
3070///     Box::new(MinF64Factory::new(1, "low")),
3071///     Box::new(MaxF64Factory::new(1, "high")),
3072/// ]);
3073/// assert_eq!(agg.num_aggregates(), 3);
3074/// ```
3075pub struct CompositeAggregator {
3076    /// Factories for creating sub-accumulators
3077    factories: Vec<Box<dyn DynAggregatorFactory>>,
3078    /// Cached output schema (built once in constructor)
3079    cached_schema: SchemaRef,
3080}
3081
3082impl std::fmt::Debug for CompositeAggregator {
3083    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3084        f.debug_struct("CompositeAggregator")
3085            .field("num_factories", &self.factories.len())
3086            .finish_non_exhaustive()
3087    }
3088}
3089
3090impl CompositeAggregator {
3091    /// Creates a new composite aggregator from a list of factories.
3092    #[must_use]
3093    pub fn new(factories: Vec<Box<dyn DynAggregatorFactory>>) -> Self {
3094        let mut fields = vec![
3095            Field::new("window_start", DataType::Int64, false),
3096            Field::new("window_end", DataType::Int64, false),
3097        ];
3098        fields.extend(factories.iter().map(|f| f.result_field()));
3099        let cached_schema = Arc::new(Schema::new(fields));
3100        Self {
3101            factories,
3102            cached_schema,
3103        }
3104    }
3105
3106    /// Returns the number of sub-aggregates.
3107    #[must_use]
3108    pub fn num_aggregates(&self) -> usize {
3109        self.factories.len()
3110    }
3111
3112    /// Creates a new composite accumulator with all sub-accumulators.
3113    #[must_use]
3114    pub fn create_accumulator(&self) -> CompositeAccumulator {
3115        let accumulators = self
3116            .factories
3117            .iter()
3118            .map(|f| f.create_accumulator())
3119            .collect();
3120        CompositeAccumulator { accumulators }
3121    }
3122
3123    /// Returns the result fields for all sub-aggregates.
3124    #[must_use]
3125    pub fn result_fields(&self) -> Vec<Field> {
3126        self.factories.iter().map(|f| f.result_field()).collect()
3127    }
3128
3129    /// Creates the output schema: `window_start, window_end, [aggregate fields]`.
3130    #[must_use]
3131    pub fn output_schema(&self) -> SchemaRef {
3132        Arc::clone(&self.cached_schema)
3133    }
3134}
3135
3136impl Clone for CompositeAggregator {
3137    fn clone(&self) -> Self {
3138        let factories: Vec<Box<dyn DynAggregatorFactory>> =
3139            self.factories.iter().map(|f| f.clone_box()).collect();
3140        Self {
3141            cached_schema: Arc::clone(&self.cached_schema),
3142            factories,
3143        }
3144    }
3145}
3146
3147/// Composite accumulator holding multiple [`DynAccumulator`] instances.
3148///
3149/// Fans out each event to all sub-accumulators and collects results
3150/// as a multi-column [`RecordBatch`].
3151pub struct CompositeAccumulator {
3152    /// Sub-accumulators (one per aggregate function)
3153    accumulators: Vec<Box<dyn DynAccumulator>>,
3154}
3155
3156impl std::fmt::Debug for CompositeAccumulator {
3157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3158        f.debug_struct("CompositeAccumulator")
3159            .field("num_accumulators", &self.accumulators.len())
3160            .finish()
3161    }
3162}
3163
3164impl CompositeAccumulator {
3165    /// Adds an event to all sub-accumulators.
3166    pub fn add_event(&mut self, event: &Event) {
3167        for acc in &mut self.accumulators {
3168            acc.add_event(event);
3169        }
3170    }
3171
3172    /// Merges another composite accumulator into this one.
3173    ///
3174    /// # Errors
3175    ///
3176    /// Returns an error if the accumulators have different numbers of
3177    /// sub-accumulators (mismatched window definitions).
3178    pub fn merge(&mut self, other: &Self) -> Result<(), &'static str> {
3179        if self.accumulators.len() != other.accumulators.len() {
3180            return Err("cannot merge composite accumulators with different sizes");
3181        }
3182        for (self_acc, other_acc) in self.accumulators.iter_mut().zip(&other.accumulators) {
3183            self_acc.merge_dyn(other_acc.as_ref());
3184        }
3185        Ok(())
3186    }
3187
3188    /// Returns all results as [`ScalarResult`] values.
3189    #[must_use]
3190    pub fn results(&self) -> Vec<ScalarResult> {
3191        self.accumulators
3192            .iter()
3193            .map(|a| a.result_scalar())
3194            .collect()
3195    }
3196
3197    /// Returns true if all sub-accumulators are empty.
3198    #[must_use]
3199    pub fn is_empty(&self) -> bool {
3200        self.accumulators.iter().all(|a| a.is_empty())
3201    }
3202
3203    /// Serializes all sub-accumulators for checkpointing.
3204    #[must_use]
3205    #[allow(clippy::cast_possible_truncation)] // Wire format uses fixed-width integers
3206    pub fn serialize(&self) -> Vec<u8> {
3207        let mut buf = Vec::new();
3208        // Header: number of accumulators (u32)
3209        let n = self.accumulators.len() as u32;
3210        buf.extend_from_slice(&n.to_le_bytes());
3211        for acc in &self.accumulators {
3212            let tag = acc.type_tag();
3213            let tag_bytes = tag.as_bytes();
3214            // Tag length (u16) + tag + data length (u32) + data
3215            buf.extend_from_slice(&(tag_bytes.len() as u16).to_le_bytes());
3216            buf.extend_from_slice(tag_bytes);
3217            let data = acc.serialize();
3218            buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
3219            buf.extend_from_slice(&data);
3220        }
3221        buf
3222    }
3223
3224    /// Creates a multi-column [`RecordBatch`] from the results.
3225    ///
3226    /// The batch has columns: `window_start, window_end, [aggregate results]`.
3227    ///
3228    /// # Errors
3229    ///
3230    /// Returns `None` if the batch cannot be created.
3231    #[must_use]
3232    pub fn to_record_batch(&self, window_id: &WindowId, schema: &SchemaRef) -> Option<RecordBatch> {
3233        use arrow_array::{Float64Array, UInt64Array};
3234
3235        let mut columns: Vec<Arc<dyn arrow_array::Array>> = vec![
3236            Arc::new(Int64Array::from(vec![window_id.start])),
3237            Arc::new(Int64Array::from(vec![window_id.end])),
3238        ];
3239
3240        for result in self.results() {
3241            let col: Arc<dyn arrow_array::Array> = match result {
3242                ScalarResult::Int64(v) => Arc::new(Int64Array::from(vec![v])),
3243                ScalarResult::Float64(v) => Arc::new(Float64Array::from(vec![v])),
3244                ScalarResult::UInt64(v) => Arc::new(UInt64Array::from(vec![v])),
3245                ScalarResult::OptionalInt64(v) => Arc::new(Int64Array::from(vec![v])),
3246                ScalarResult::OptionalFloat64(v) => Arc::new(Float64Array::from(vec![v])),
3247                ScalarResult::Null => Arc::new(Int64Array::new_null(1)),
3248            };
3249            columns.push(col);
3250        }
3251
3252        RecordBatch::try_new(Arc::clone(schema), columns).ok()
3253    }
3254
3255    /// Returns the number of sub-accumulators.
3256    #[must_use]
3257    pub fn num_accumulators(&self) -> usize {
3258        self.accumulators.len()
3259    }
3260}
3261
3262impl Clone for CompositeAccumulator {
3263    fn clone(&self) -> Self {
3264        Self {
3265            accumulators: self.accumulators.iter().map(|a| a.clone_box()).collect(),
3266        }
3267    }
3268}
3269
3270/// State key prefix for window accumulators (4 bytes)
3271const WINDOW_STATE_PREFIX: &[u8; 4] = b"win:";
3272
3273/// Total size of window state key: prefix (4) + `WindowId` (16) = 20 bytes
3274const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
3275
3276/// Tumbling window operator.
3277///
3278/// Processes events through non-overlapping, fixed-size time windows.
3279/// Events are assigned to windows based on their timestamps, aggregated,
3280/// and results are emitted based on the configured [`EmitStrategy`].
3281///
3282/// # Emit Strategies
3283///
3284/// - `OnWatermark` (default): Emit when watermark passes window end
3285/// - `Periodic`: Emit intermediate results at intervals, final on watermark
3286/// - `OnUpdate`: Emit after every state update
3287///
3288/// # Late Data Handling
3289///
3290/// Events that arrive after `window_end + allowed_lateness` are considered late.
3291/// Their behavior is controlled by [`LateDataConfig`]:
3292/// - Drop the event (default)
3293/// - Route to a named side output for separate processing
3294///
3295/// # State Management
3296///
3297/// Window state is stored in the operator context's state store using
3298/// prefixed keys:
3299/// - `win:<window_id>` - Accumulator state
3300/// - `meta:<window_id>` - Window metadata (registration status, etc.)
3301///
3302/// # Watermark Triggering
3303///
3304/// Windows are triggered when the watermark advances past `window_end + allowed_lateness`.
3305/// This ensures late data within the grace period is still processed.
3306pub struct TumblingWindowOperator<A: Aggregator> {
3307    /// Window assigner
3308    assigner: TumblingWindowAssigner,
3309    /// Aggregator function
3310    aggregator: A,
3311    /// Allowed lateness for late data
3312    allowed_lateness_ms: i64,
3313    /// Track registered timers to avoid duplicates
3314    registered_windows: FxHashSet<WindowId>,
3315    /// Track windows with registered periodic timers
3316    periodic_timer_windows: FxHashSet<WindowId>,
3317    /// Emit strategy for controlling when results are output
3318    emit_strategy: EmitStrategy,
3319    /// Late data handling configuration
3320    late_data_config: LateDataConfig,
3321    /// Metrics for late data tracking
3322    late_data_metrics: LateDataMetrics,
3323    /// Metrics for window close tracking
3324    window_close_metrics: WindowCloseMetrics,
3325    /// Operator ID for checkpointing
3326    operator_id: String,
3327    /// Cached output schema (avoids allocation on every emit)
3328    output_schema: SchemaRef,
3329    /// Last emitted events per window for changelog retraction
3330    last_emitted: FxHashMap<WindowId, Event>,
3331    /// Cached Arc for late-data side output name (avoids allocation per late event)
3332    side_output_arc: Option<Arc<str>>,
3333    /// Phantom data for accumulator type
3334    _phantom: PhantomData<A::Acc>,
3335}
3336
3337/// Static counter for generating unique operator IDs without allocation.
3338static OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
3339
3340impl<A: Aggregator> TumblingWindowOperator<A>
3341where
3342    A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3343    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3344        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3345{
3346    /// Creates a new tumbling window operator.
3347    ///
3348    /// # Arguments
3349    ///
3350    /// * `assigner` - Window assigner for determining window boundaries
3351    /// * `aggregator` - Aggregation function to apply within windows
3352    /// * `allowed_lateness` - Grace period for late data after window close
3353    /// # Panics
3354    ///
3355    /// Panics if allowed lateness does not fit in i64.
3356    #[must_use]
3357    pub fn new(
3358        assigner: TumblingWindowAssigner,
3359        aggregator: A,
3360        allowed_lateness: Duration,
3361    ) -> Self {
3362        let operator_num = OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
3363        let output_schema = Arc::new(Schema::new(vec![
3364            Field::new("window_start", DataType::Int64, false),
3365            Field::new("window_end", DataType::Int64, false),
3366            Field::new(
3367                "result",
3368                aggregator.output_data_type(),
3369                aggregator.output_nullable(),
3370            ),
3371        ]));
3372        Self {
3373            assigner,
3374            aggregator,
3375            // Ensure lateness fits in i64
3376            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3377                .expect("Allowed lateness must fit in i64"),
3378            registered_windows: FxHashSet::default(),
3379            periodic_timer_windows: FxHashSet::default(),
3380            emit_strategy: EmitStrategy::default(),
3381            late_data_config: LateDataConfig::default(),
3382            late_data_metrics: LateDataMetrics::new(),
3383            window_close_metrics: WindowCloseMetrics::new(),
3384            operator_id: format!("tumbling_window_{operator_num}"),
3385            output_schema,
3386            last_emitted: FxHashMap::default(),
3387            side_output_arc: None,
3388            _phantom: PhantomData,
3389        }
3390    }
3391
3392    /// Creates a new tumbling window operator with a custom operator ID.
3393    /// # Panics
3394    ///
3395    /// Panics if allowed lateness does not fit in i64.
3396    #[must_use]
3397    pub fn with_id(
3398        assigner: TumblingWindowAssigner,
3399        aggregator: A,
3400        allowed_lateness: Duration,
3401        operator_id: String,
3402    ) -> Self {
3403        let output_schema = Arc::new(Schema::new(vec![
3404            Field::new("window_start", DataType::Int64, false),
3405            Field::new("window_end", DataType::Int64, false),
3406            Field::new(
3407                "result",
3408                aggregator.output_data_type(),
3409                aggregator.output_nullable(),
3410            ),
3411        ]));
3412        Self {
3413            assigner,
3414            aggregator,
3415            // Ensure lateness fits in i64
3416            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
3417                .expect("Allowed lateness must fit in i64"),
3418            registered_windows: FxHashSet::default(),
3419            periodic_timer_windows: FxHashSet::default(),
3420            emit_strategy: EmitStrategy::default(),
3421            late_data_config: LateDataConfig::default(),
3422            late_data_metrics: LateDataMetrics::new(),
3423            window_close_metrics: WindowCloseMetrics::new(),
3424            operator_id,
3425            output_schema,
3426            last_emitted: FxHashMap::default(),
3427            side_output_arc: None,
3428            _phantom: PhantomData,
3429        }
3430    }
3431
3432    /// Sets the emit strategy for this window operator.
3433    ///
3434    /// # Arguments
3435    ///
3436    /// * `strategy` - The emit strategy to use
3437    ///
3438    /// # Example
3439    ///
3440    /// ```rust,no_run
3441    /// use laminar_core::operator::window::{
3442    ///     TumblingWindowAssigner, TumblingWindowOperator, CountAggregator, EmitStrategy,
3443    /// };
3444    /// use std::time::Duration;
3445    ///
3446    /// let assigner = TumblingWindowAssigner::new(Duration::from_secs(60));
3447    /// let mut operator = TumblingWindowOperator::new(
3448    ///     assigner,
3449    ///     CountAggregator::new(),
3450    ///     Duration::from_secs(5),
3451    /// );
3452    ///
3453    /// // Emit every 10 seconds instead of waiting for watermark
3454    /// operator.set_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(10)));
3455    /// ```
3456    pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
3457        self.emit_strategy = strategy;
3458    }
3459
3460    /// Returns the current emit strategy.
3461    #[must_use]
3462    pub fn emit_strategy(&self) -> &EmitStrategy {
3463        &self.emit_strategy
3464    }
3465
3466    /// Sets the late data handling configuration.
3467    ///
3468    /// # Arguments
3469    ///
3470    /// * `config` - The late data configuration to use
3471    ///
3472    /// # Example
3473    ///
3474    /// ```rust,no_run
3475    /// use laminar_core::operator::window::{
3476    ///     TumblingWindowAssigner, TumblingWindowOperator, CountAggregator, LateDataConfig,
3477    /// };
3478    /// use std::time::Duration;
3479    ///
3480    /// let assigner = TumblingWindowAssigner::new(Duration::from_secs(60));
3481    /// let mut operator = TumblingWindowOperator::new(
3482    ///     assigner,
3483    ///     CountAggregator::new(),
3484    ///     Duration::from_secs(5),
3485    /// );
3486    ///
3487    /// // Route late events to a side output
3488    /// operator.set_late_data_config(LateDataConfig::with_side_output("late_events".to_string()));
3489    /// ```
3490    pub fn set_late_data_config(&mut self, config: LateDataConfig) {
3491        self.late_data_config = config;
3492    }
3493
3494    /// Returns the current late data configuration.
3495    #[must_use]
3496    pub fn late_data_config(&self) -> &LateDataConfig {
3497        &self.late_data_config
3498    }
3499
3500    /// Returns the late data metrics.
3501    ///
3502    /// Use this to monitor late data behavior and set up alerts.
3503    #[must_use]
3504    pub fn late_data_metrics(&self) -> &LateDataMetrics {
3505        &self.late_data_metrics
3506    }
3507
3508    /// Resets the late data metrics counters.
3509    pub fn reset_late_data_metrics(&mut self) {
3510        self.late_data_metrics.reset();
3511    }
3512
3513    /// Returns the window close metrics.
3514    ///
3515    /// Use this to monitor window close throughput and watermark lag.
3516    #[must_use]
3517    pub fn window_close_metrics(&self) -> &WindowCloseMetrics {
3518        &self.window_close_metrics
3519    }
3520
3521    /// Resets the window close metrics counters.
3522    pub fn reset_window_close_metrics(&mut self) {
3523        self.window_close_metrics.reset();
3524    }
3525
3526    /// Returns the number of windows currently accumulating events.
3527    #[must_use]
3528    pub fn active_windows_count(&self) -> usize {
3529        self.registered_windows.len()
3530    }
3531
3532    /// Returns the window assigner.
3533    #[must_use]
3534    pub fn assigner(&self) -> &TumblingWindowAssigner {
3535        &self.assigner
3536    }
3537
3538    /// Returns the allowed lateness in milliseconds.
3539    #[must_use]
3540    pub fn allowed_lateness_ms(&self) -> i64 {
3541        self.allowed_lateness_ms
3542    }
3543
3544    /// Generates the state key for a window's accumulator.
3545    ///
3546    /// Returns a stack-allocated fixed-size array to avoid heap allocation
3547    /// on the hot path. This is critical for Ring 0 performance.
3548    #[inline]
3549    fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
3550        let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
3551        key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
3552        let window_key = window_id.to_key_inline();
3553        key[4..20].copy_from_slice(&window_key);
3554        key
3555    }
3556
3557    /// Gets the accumulator for a window, creating a new one if needed.
3558    fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
3559        let key = Self::state_key(window_id);
3560        state
3561            .get_typed::<A::Acc>(&key)
3562            .ok()
3563            .flatten()
3564            .unwrap_or_else(|| self.aggregator.create_accumulator())
3565    }
3566
3567    /// Stores the accumulator for a window.
3568    fn put_accumulator(
3569        window_id: &WindowId,
3570        acc: &A::Acc,
3571        state: &mut dyn StateStore,
3572    ) -> Result<(), OperatorError> {
3573        let key = Self::state_key(window_id);
3574        state
3575            .put_typed(&key, acc)
3576            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3577    }
3578
3579    /// Deletes the accumulator for a window.
3580    fn delete_accumulator(
3581        window_id: &WindowId,
3582        state: &mut dyn StateStore,
3583    ) -> Result<(), OperatorError> {
3584        let key = Self::state_key(window_id);
3585        state
3586            .delete(&key)
3587            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
3588    }
3589
3590    /// Checks if an event is late (after window close + allowed lateness).
3591    fn is_late(&self, event_time: i64, watermark: i64) -> bool {
3592        let window_id = self.assigner.assign(event_time);
3593        let cleanup_time = window_id.end + self.allowed_lateness_ms;
3594        watermark >= cleanup_time
3595    }
3596
3597    /// Registers a timer for window triggering if not already registered.
3598    fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3599        if !self.registered_windows.contains(&window_id) {
3600            // Register timer at window_end + allowed_lateness
3601            let trigger_time = window_id.end + self.allowed_lateness_ms;
3602            ctx.timers.register_timer(
3603                trigger_time,
3604                Some(window_id.to_key()),
3605                Some(ctx.operator_index),
3606            );
3607            self.registered_windows.insert(window_id);
3608        }
3609    }
3610
3611    /// Registers a periodic timer for intermediate emissions.
3612    ///
3613    /// The timer key uses a special encoding to distinguish from final timers:
3614    /// - Final timers: raw `WindowId` bytes (16 bytes)
3615    /// - Periodic timers: `WindowId` with high bit set in first byte
3616    fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
3617        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3618            if !self.periodic_timer_windows.contains(&window_id) {
3619                // Register first periodic timer at processing_time + interval
3620                let interval_ms =
3621                    i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3622                let trigger_time = ctx.processing_time + interval_ms;
3623
3624                // Create a key with high bit set to distinguish from final timers
3625                let key = Self::periodic_timer_key(&window_id);
3626
3627                ctx.timers
3628                    .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
3629                self.periodic_timer_windows.insert(window_id);
3630            }
3631        }
3632    }
3633
3634    /// Creates a periodic timer key from a window ID.
3635    ///
3636    /// Uses the high bit of the first byte as a marker to distinguish
3637    /// periodic timers from final watermark timers.
3638    #[inline]
3639    fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
3640        let mut key = window_id.to_key();
3641        // Set the high bit of the first byte to mark as periodic
3642        if !key.is_empty() {
3643            key[0] |= 0x80;
3644        }
3645        key
3646    }
3647
3648    /// Checks if a timer key is for a periodic timer.
3649    #[inline]
3650    fn is_periodic_timer_key(key: &[u8]) -> bool {
3651        !key.is_empty() && (key[0] & 0x80) != 0
3652    }
3653
3654    /// Extracts the window ID from a periodic timer key.
3655    #[inline]
3656    fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
3657        if key.len() != 16 {
3658            return None;
3659        }
3660        let mut clean_key = [0u8; 16];
3661        clean_key.copy_from_slice(key);
3662        // Clear the high bit to get the original window ID
3663        clean_key[0] &= 0x7F;
3664        WindowId::from_key(&clean_key)
3665    }
3666
3667    /// Creates an intermediate result for a window without cleaning up state.
3668    ///
3669    /// Returns `None` if the window is empty.
3670    /// Creates an intermediate result for a window without cleaning up state.
3671    ///
3672    /// Returns `None` if the window is empty. Uses `ResultToArrow` for
3673    /// type-preserving output.
3674    fn create_intermediate_result(
3675        &self,
3676        window_id: &WindowId,
3677        state: &dyn crate::state::StateStore,
3678    ) -> Option<Event> {
3679        let acc = self.get_accumulator(window_id, state);
3680
3681        if acc.is_empty() {
3682            return None;
3683        }
3684
3685        let result = acc.result();
3686        let result_array = result.to_arrow_array();
3687
3688        let batch = RecordBatch::try_new(
3689            Arc::clone(&self.output_schema),
3690            vec![
3691                Arc::new(Int64Array::from(vec![window_id.start])),
3692                Arc::new(Int64Array::from(vec![window_id.end])),
3693                result_array,
3694            ],
3695        )
3696        .ok()?;
3697
3698        Some(Event::new(window_id.end, batch))
3699    }
3700
3701    /// Handles periodic timer expiration for intermediate emissions.
3702    fn handle_periodic_timer(
3703        &mut self,
3704        window_id: WindowId,
3705        ctx: &mut OperatorContext,
3706    ) -> OutputVec {
3707        let mut output = OutputVec::new();
3708
3709        // Check if window is still valid (not yet closed by watermark)
3710        if !self.registered_windows.contains(&window_id) {
3711            // Window already closed, remove from periodic tracking
3712            self.periodic_timer_windows.remove(&window_id);
3713            return output;
3714        }
3715
3716        // Emit intermediate result
3717        if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3718            output.push(Output::Event(event));
3719        }
3720
3721        // Schedule next periodic timer if still within window
3722        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
3723            let interval_ms =
3724                i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
3725            let next_trigger = ctx.processing_time + interval_ms;
3726
3727            // Only schedule if the window hasn't closed yet
3728            let window_close_time = window_id.end + self.allowed_lateness_ms;
3729            if next_trigger < window_close_time {
3730                let key = Self::periodic_timer_key(&window_id);
3731                ctx.timers
3732                    .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
3733            }
3734        }
3735
3736        output
3737    }
3738}
3739
3740impl<A: Aggregator> Operator for TumblingWindowOperator<A>
3741where
3742    A::Acc: 'static
3743        + Archive
3744        + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
3745    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
3746        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
3747{
3748    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
3749        let event_time = event.timestamp;
3750
3751        // Update watermark with the new event and get any emitted watermark
3752        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
3753
3754        // Check if this event is too late (beyond allowed lateness)
3755        // Use the current watermark (not just the newly emitted one) for the check
3756        let current_wm = ctx.watermark_generator.current_watermark();
3757        if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
3758            let mut output = OutputVec::new();
3759
3760            // EMIT FINAL drops late data entirely
3761            if self.emit_strategy.drops_late_data() {
3762                self.late_data_metrics.record_dropped();
3763                return output; // Silently drop - no LateEvent output
3764            }
3765
3766            // Handle late event based on configuration
3767            if let Some(side_output_name) = self.late_data_config.side_output() {
3768                // Route to named side output (cached Arc avoids allocation per late event)
3769                self.late_data_metrics.record_side_output();
3770                let name_arc = self
3771                    .side_output_arc
3772                    .get_or_insert_with(|| Arc::from(side_output_name))
3773                    .clone();
3774                output.push(Output::SideOutput(Box::new(SideOutputData {
3775                    name: name_arc,
3776                    event: event.clone(),
3777                })));
3778            } else {
3779                // No side output configured - emit as LateEvent (may be dropped by downstream)
3780                self.late_data_metrics.record_dropped();
3781                output.push(Output::LateEvent(event.clone()));
3782            }
3783            return output;
3784        }
3785
3786        // Assign event to window
3787        let window_id = self.assigner.assign(event_time);
3788
3789        // Track if state was updated (for OnUpdate and Changelog strategies)
3790        let mut state_updated = false;
3791
3792        // Extract values and update accumulator (handles multi-row batches)
3793        let values = self.aggregator.extract_batch(event);
3794        if !values.is_empty() {
3795            let mut acc = self.get_accumulator(&window_id, ctx.state);
3796            for value in values {
3797                acc.add(value);
3798            }
3799            if let Err(e) = Self::put_accumulator(&window_id, &acc, ctx.state) {
3800                // Log error but don't fail - we'll retry on next event
3801                tracing::error!("Failed to store window state: {e}");
3802            } else {
3803                state_updated = true;
3804            }
3805        }
3806
3807        // Register timer for this window (watermark-based final emission)
3808        self.maybe_register_timer(window_id, ctx);
3809
3810        // OnWindowClose and Final suppress intermediate emissions
3811        // Don't register periodic timers for these strategies
3812        if !self.emit_strategy.suppresses_intermediate() {
3813            self.maybe_register_periodic_timer(window_id, ctx);
3814        }
3815
3816        // Emit watermark update if generated
3817        let mut output = OutputVec::new();
3818        if let Some(wm) = emitted_watermark {
3819            output.push(Output::Watermark(wm.timestamp()));
3820        }
3821
3822        // Handle different emit strategies
3823        if state_updated {
3824            match &self.emit_strategy {
3825                // OnUpdate: emit intermediate result as regular event
3826                EmitStrategy::OnUpdate => {
3827                    if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
3828                        output.push(Output::Event(event));
3829                    }
3830                }
3831                // Changelog: emit retraction for old value, then insert new value
3832                EmitStrategy::Changelog => {
3833                    if let Some(new_event) = self.create_intermediate_result(&window_id, ctx.state)
3834                    {
3835                        // Emit delete for previous value (retraction)
3836                        if let Some(old_event) = self.last_emitted.get(&window_id) {
3837                            let delete =
3838                                ChangelogRecord::delete(old_event.clone(), ctx.processing_time);
3839                            output.push(Output::Changelog(delete));
3840                        }
3841                        // Emit insert for new value
3842                        let insert =
3843                            ChangelogRecord::insert(new_event.clone(), ctx.processing_time);
3844                        output.push(Output::Changelog(insert));
3845                        self.last_emitted.insert(window_id, new_event);
3846                    }
3847                }
3848                // Other strategies: no intermediate emission
3849                EmitStrategy::OnWatermark
3850                | EmitStrategy::Periodic(_)
3851                | EmitStrategy::OnWindowClose
3852                | EmitStrategy::Final => {}
3853            }
3854        }
3855
3856        output
3857    }
3858
3859    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
3860        // Check if this is a periodic timer (high bit set)
3861        if Self::is_periodic_timer_key(&timer.key) {
3862            // OnWindowClose and Final suppress periodic emissions
3863            if self.emit_strategy.suppresses_intermediate() {
3864                // Don't emit, just clean up the periodic timer tracking
3865                if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3866                    self.periodic_timer_windows.remove(&window_id);
3867                }
3868                return OutputVec::new();
3869            }
3870
3871            if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
3872                return self.handle_periodic_timer(window_id, ctx);
3873            }
3874            return OutputVec::new();
3875        }
3876
3877        // Parse window ID from timer key (final emission timer)
3878        let Some(window_id) = WindowId::from_key(&timer.key) else {
3879            return OutputVec::new();
3880        };
3881
3882        // Get the accumulator
3883        let acc = self.get_accumulator(&window_id, ctx.state);
3884
3885        // Skip empty windows
3886        if acc.is_empty() {
3887            // Clean up state
3888            let _ = Self::delete_accumulator(&window_id, ctx.state);
3889            self.registered_windows.remove(&window_id);
3890            self.periodic_timer_windows.remove(&window_id);
3891            self.last_emitted.remove(&window_id);
3892            return OutputVec::new();
3893        }
3894
3895        // Get the result
3896        let result = acc.result();
3897
3898        // Clean up window state
3899        let _ = Self::delete_accumulator(&window_id, ctx.state);
3900        self.registered_windows.remove(&window_id);
3901        self.periodic_timer_windows.remove(&window_id);
3902        self.last_emitted.remove(&window_id);
3903
3904        // Convert result to Arrow array (preserves native type: Float64 for AVG, etc.)
3905        let result_array = result.to_arrow_array();
3906
3907        // Create output batch using cached schema (avoids ~200ns allocation per emit)
3908        let batch = RecordBatch::try_new(
3909            Arc::clone(&self.output_schema),
3910            vec![
3911                Arc::new(Int64Array::from(vec![window_id.start])),
3912                Arc::new(Int64Array::from(vec![window_id.end])),
3913                result_array,
3914            ],
3915        );
3916
3917        let mut output = OutputVec::new();
3918        match batch {
3919            Ok(data) => {
3920                let event = Event::new(window_id.end, data);
3921
3922                // Record window close metrics
3923                self.window_close_metrics
3924                    .record_close(window_id.end, ctx.processing_time);
3925
3926                // Emit based on strategy
3927                match &self.emit_strategy {
3928                    // Changelog: wrap in changelog record for CDC
3929                    EmitStrategy::Changelog => {
3930                        let record = ChangelogRecord::insert(event, ctx.processing_time);
3931                        output.push(Output::Changelog(record));
3932                    }
3933                    // All other strategies: emit as regular event
3934                    EmitStrategy::OnWatermark
3935                    | EmitStrategy::Periodic(_)
3936                    | EmitStrategy::OnUpdate
3937                    | EmitStrategy::OnWindowClose
3938                    | EmitStrategy::Final => {
3939                        output.push(Output::Event(event));
3940                    }
3941                }
3942            }
3943            Err(e) => {
3944                tracing::error!("Failed to create output batch: {e}");
3945            }
3946        }
3947        output
3948    }
3949
3950    fn checkpoint(&self) -> OperatorState {
3951        // Serialize both registered windows and periodic timer windows using rkyv
3952        let windows: Vec<_> = self.registered_windows.iter().copied().collect();
3953        let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
3954
3955        // Create a tuple of both sets
3956        let checkpoint_data = (windows, periodic_windows);
3957        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
3958            .map(|v| v.to_vec())
3959            .unwrap_or_default();
3960
3961        OperatorState {
3962            operator_id: self.operator_id.clone(),
3963            data,
3964        }
3965    }
3966
3967    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
3968        if state.operator_id != self.operator_id {
3969            return Err(OperatorError::StateAccessFailed(format!(
3970                "Operator ID mismatch: expected {}, got {}",
3971                self.operator_id, state.operator_id
3972            )));
3973        }
3974
3975        // Try to deserialize as the new format (tuple of two vectors)
3976        if let Ok(archived) =
3977            rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
3978        {
3979            if let Ok((windows, periodic_windows)) =
3980                rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
3981            {
3982                self.registered_windows = windows.into_iter().collect();
3983                self.periodic_timer_windows = periodic_windows.into_iter().collect();
3984                return Ok(());
3985            }
3986        }
3987
3988        // Fall back to old format (single vector) for backwards compatibility
3989        let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
3990            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3991        let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
3992            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
3993
3994        self.registered_windows = windows.into_iter().collect();
3995        self.periodic_timer_windows = FxHashSet::default();
3996        Ok(())
3997    }
3998}
3999
4000#[cfg(test)]
4001mod tests;