Skip to main content

laminar_sql/datafusion/
window_udf.rs

1//! Scalar UDFs that compute window boundary timestamps for streaming
2//! `GROUP BY TUMBLE(...)` style queries.
3//!
4//! Pairs `tumble`/`tumble_end`, `hop`/`hop_end`, `cumulate`/`cumulate_end`
5//! must both appear in `GROUP BY`: DataFusion treats them as independent
6//! group keys, even though within a fixed interval they are functionally
7//! redundant. `session(ts, gap)` is a passthrough — real session
8//! boundaries are computed by Ring 0 operators.
9//!
10//! ```sql
11//! SELECT tumble(ts, INTERVAL '1' MINUTE)     AS window_start,
12//!        tumble_end(ts, INTERVAL '1' MINUTE) AS window_end, ...
13//! FROM ev
14//! GROUP BY tumble(ts, INTERVAL '1' MINUTE),
15//!          tumble_end(ts, INTERVAL '1' MINUTE), ...
16//! ```
17//!
18//! Math runs in milliseconds (matching the Ring 0 watermark). The SQL
19//! boundary returns `Timestamp(Microsecond, None)` so lakehouse sinks
20//! (Iceberg, Delta, Parquet) accept the columns directly — Iceberg
21//! rejects `Timestamp(Millisecond)`.
22
23use std::any::Any;
24use std::hash::{Hash, Hasher};
25use std::ops::RangeInclusive;
26use std::sync::Arc;
27
28use arrow::datatypes::{DataType, TimeUnit};
29use arrow_array::{ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray};
30use datafusion_common::{DataFusionError, Result, ScalarValue};
31use datafusion_expr::{
32    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
33};
34use laminar_core::time::cast_to_millis_array;
35
36// ─── window_udf! macro ──────────────────────────────────────────────────────
37//
38// Generates the trait boilerplate (Default + PartialEq + Eq + Hash +
39// ScalarUDFImpl) for a zero-state singleton UDF that returns a
40// `Timestamp(Microsecond)`. Each UDF declaration below is just SQL name,
41// arity, and a body closure.
42
43macro_rules! window_udf {
44    (
45        $(#[$attr:meta])*
46        $type:ident, $sql_name:literal, $arity:expr, |$args:ident| $body:expr
47    ) => {
48        $(#[$attr])*
49        #[derive(Debug)]
50        pub struct $type {
51            signature: Signature,
52        }
53
54        impl $type {
55            /// Constructs the UDF instance.
56            #[must_use]
57            pub fn new() -> Self {
58                Self { signature: variadic_signature($arity) }
59            }
60        }
61
62        impl Default for $type {
63            fn default() -> Self { Self::new() }
64        }
65
66        impl PartialEq for $type {
67            fn eq(&self, _: &Self) -> bool { true }
68        }
69
70        impl Eq for $type {}
71
72        impl Hash for $type {
73            fn hash<H: Hasher>(&self, state: &mut H) { $sql_name.hash(state); }
74        }
75
76        impl ScalarUDFImpl for $type {
77            fn as_any(&self) -> &dyn Any { self }
78            fn name(&self) -> &'static str { $sql_name }
79            fn signature(&self) -> &Signature { &self.signature }
80            fn return_type(&self, _: &[DataType]) -> Result<DataType> {
81                Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
82            }
83            fn invoke_with_args(&self, sfa: ScalarFunctionArgs) -> Result<ColumnarValue> {
84                let $args = sfa.args;
85                check_arity(&$args, $arity, $sql_name)?;
86                $body
87            }
88        }
89    };
90}
91
92// ─── UDF declarations ───────────────────────────────────────────────────────
93
94window_udf!(
95    /// `tumble(ts, interval [, offset])` — start of the non-overlapping
96    /// window containing `ts`. Returns `Timestamp(Microsecond, None)`.
97    TumbleWindowStart, "tumble", 2..=3,
98    |args| {
99        let interval_ms = positive_interval(&args[1], "tumble", "interval")?;
100        let offset_ms = optional_offset(&args, 2)?;
101        into_us_columnar(&args[0], |ts| tumble_start_ms(ts, interval_ms, offset_ms))
102    }
103);
104
105window_udf!(
106    /// `tumble_end(ts, interval [, offset])` — exclusive upper bound of
107    /// the tumble window containing `ts` (i.e. `tumble(...) + interval`).
108    TumbleWindowEnd, "tumble_end", 2..=3,
109    |args| {
110        let interval_ms = positive_interval(&args[1], "tumble_end", "interval")?;
111        let offset_ms = optional_offset(&args, 2)?;
112        into_us_columnar(&args[0], |ts| {
113            tumble_start_ms(ts, interval_ms, offset_ms).saturating_add(interval_ms)
114        })
115    }
116);
117
118window_udf!(
119    /// `hop(ts, slide, size [, offset])` — earliest sliding window
120    /// (size `size`, sliding by `slide`) that contains `ts`. Full
121    /// multi-window assignment is handled by Ring 0.
122    HopWindowStart, "hop", 3..=4,
123    |args| {
124        let slide_ms = positive_interval(&args[1], "hop", "slide")?;
125        let size_ms = positive_interval(&args[2], "hop", "size")?;
126        let offset_ms = optional_offset(&args, 3)?;
127        into_us_columnar(&args[0], |ts| hop_start_ms(ts, slide_ms, size_ms, offset_ms))
128    }
129);
130
131window_udf!(
132    /// `hop_end(ts, slide, size [, offset])` — end of the earliest
133    /// sliding window containing `ts` (i.e. `hop(...) + size`).
134    HopWindowEnd, "hop_end", 3..=4,
135    |args| {
136        let slide_ms = positive_interval(&args[1], "hop_end", "slide")?;
137        let size_ms = positive_interval(&args[2], "hop_end", "size")?;
138        let offset_ms = optional_offset(&args, 3)?;
139        into_us_columnar(&args[0], |ts| {
140            hop_start_ms(ts, slide_ms, size_ms, offset_ms).saturating_add(size_ms)
141        })
142    }
143);
144
145window_udf!(
146    /// `session(ts, gap)` — passthrough that converts `ts` to
147    /// microsecond resolution. Real session start/end are data-dependent
148    /// and computed by Ring 0; this UDF exists so `GROUP BY session(ts,
149    /// gap)` parses. There is no `session_end` UDF for the same reason.
150    SessionWindowStart, "session", 2..=2,
151    |args| into_us_columnar(&args[0], |ts| ts)
152);
153
154window_udf!(
155    /// `cumulate(ts, step, size)` — epoch start (size-aligned bucket)
156    /// containing `ts`. Per-step cumulating boundaries (which depend on
157    /// data) are exposed by Ring 0.
158    CumulateWindowStart, "cumulate", 3..=3,
159    |args| {
160        let (_step_ms, size_ms) = cumulate_intervals(&args, "cumulate")?;
161        into_us_columnar(&args[0], |ts| tumble_start_ms(ts, size_ms, 0))
162    }
163);
164
165window_udf!(
166    /// `cumulate_end(ts, step, size)` — exclusive upper bound of the
167    /// epoch (size-aligned bucket) containing `ts`.
168    CumulateWindowEnd, "cumulate_end", 3..=3,
169    |args| {
170        let (_step_ms, size_ms) = cumulate_intervals(&args, "cumulate_end")?;
171        into_us_columnar(&args[0], |ts| {
172            tumble_start_ms(ts, size_ms, 0).saturating_add(size_ms)
173        })
174    }
175);
176
177// ─── Math helpers ──────────────────────────────────────────────────────────
178//
179// All in milliseconds. Pure i64 → i64.
180
181/// Tumble window start: `floor((ts - offset) / interval) * interval + offset`.
182fn tumble_start_ms(ts: i64, interval_ms: i64, offset_ms: i64) -> i64 {
183    let adj = ts - offset_ms;
184    (adj - adj.rem_euclid(interval_ms)) + offset_ms
185}
186
187/// Earliest start of a hopping window of `size` (sliding by `slide`)
188/// that contains `ts`.
189fn hop_start_ms(ts: i64, slide_ms: i64, size_ms: i64, offset_ms: i64) -> i64 {
190    let adj = ts - size_ms + slide_ms - offset_ms;
191    (adj - adj.rem_euclid(slide_ms)) + offset_ms
192}
193
194// ─── Argument parsing helpers ──────────────────────────────────────────────
195
196/// Builds `Signature::any(N)` or `Signature::one_of([Any(N), …])` for a
197/// scalar UDF whose arity covers a contiguous range.
198fn variadic_signature(arity: RangeInclusive<usize>) -> Signature {
199    let mut arities: Vec<TypeSignature> = arity.map(TypeSignature::Any).collect();
200    let inner = if arities.len() == 1 {
201        arities.pop().unwrap()
202    } else {
203        TypeSignature::OneOf(arities)
204    };
205    Signature::new(inner, Volatility::Immutable)
206}
207
208fn check_arity(args: &[ColumnarValue], arity: RangeInclusive<usize>, fn_name: &str) -> Result<()> {
209    if arity.contains(&args.len()) {
210        return Ok(());
211    }
212    let expected = if arity.start() == arity.end() {
213        format!("exactly {}", arity.start())
214    } else {
215        format!("{}-{}", arity.start(), arity.end())
216    };
217    Err(DataFusionError::Plan(format!(
218        "{}() requires {} arguments, got {}",
219        fn_name,
220        expected,
221        args.len()
222    )))
223}
224
225fn positive_interval(value: &ColumnarValue, fn_name: &str, arg_name: &str) -> Result<i64> {
226    let ms = extract_interval_ms(value)?;
227    if ms <= 0 {
228        return Err(DataFusionError::Plan(format!(
229            "{fn_name}() {arg_name} must be positive"
230        )));
231    }
232    Ok(ms)
233}
234
235fn optional_offset(args: &[ColumnarValue], idx: usize) -> Result<i64> {
236    match args.get(idx) {
237        Some(value) => extract_interval_ms(value),
238        None => Ok(0),
239    }
240}
241
242fn cumulate_intervals(args: &[ColumnarValue], fn_name: &str) -> Result<(i64, i64)> {
243    let step_ms = positive_interval(&args[1], fn_name, "step")?;
244    let size_ms = positive_interval(&args[2], fn_name, "size")?;
245    if step_ms > size_ms {
246        return Err(DataFusionError::Plan(format!(
247            "{fn_name}() step must not exceed size"
248        )));
249    }
250    if size_ms % step_ms != 0 {
251        return Err(DataFusionError::Plan(format!(
252            "{fn_name}() size must be evenly divisible by step"
253        )));
254    }
255    Ok((step_ms, size_ms))
256}
257
258/// Extracts a scalar interval in milliseconds. Array intervals are
259/// rejected — per-row window sizes are not a valid streaming pattern.
260fn extract_interval_ms(value: &ColumnarValue) -> Result<i64> {
261    match value {
262        ColumnarValue::Scalar(scalar) => scalar_interval_to_ms(scalar),
263        ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented(
264            "Array interval arguments not supported for window functions".to_string(),
265        )),
266    }
267}
268
269fn scalar_interval_to_ms(scalar: &ScalarValue) -> Result<i64> {
270    match scalar {
271        ScalarValue::IntervalDayTime(Some(v)) => {
272            Ok(i64::from(v.days) * 86_400_000 + i64::from(v.milliseconds))
273        }
274        ScalarValue::IntervalMonthDayNano(Some(v)) => {
275            if v.months != 0 {
276                return Err(DataFusionError::NotImplemented(
277                    "Month-based intervals not supported for window functions \
278                     (use days/hours/minutes/seconds)"
279                        .to_string(),
280                ));
281            }
282            Ok(i64::from(v.days) * 86_400_000 + v.nanoseconds / 1_000_000)
283        }
284        ScalarValue::IntervalYearMonth(_) => Err(DataFusionError::NotImplemented(
285            "Year-month intervals not supported for window functions".to_string(),
286        )),
287        ScalarValue::Int64(Some(ms)) => Ok(*ms),
288        _ => Err(DataFusionError::Plan(format!(
289            "Expected interval argument for window function, got: {scalar:?}"
290        ))),
291    }
292}
293
294// ─── Dispatch helper ───────────────────────────────────────────────────────
295
296/// Applies `transform` (ms → ms) to every non-null input timestamp and
297/// returns a `Timestamp(Microsecond)` result. One allocation per call;
298/// null inputs propagate.
299fn into_us_columnar(
300    value: &ColumnarValue,
301    transform: impl Fn(i64) -> i64,
302) -> Result<ColumnarValue> {
303    match value {
304        ColumnarValue::Array(array) => {
305            let input = to_millis_array(array)?;
306            let result: TimestampMicrosecondArray = input
307                .iter()
308                .map(|opt| opt.map(|ms| transform(ms).saturating_mul(1000)))
309                .collect();
310            Ok(ColumnarValue::Array(Arc::new(result)))
311        }
312        ColumnarValue::Scalar(scalar) => {
313            let result =
314                scalar_to_timestamp_ms(scalar)?.map(|ms| transform(ms).saturating_mul(1000));
315            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
316                result, None,
317            )))
318        }
319    }
320}
321
322fn to_millis_array(array: &ArrayRef) -> Result<TimestampMillisecondArray> {
323    cast_to_millis_array(array.as_ref()).map_err(|e| DataFusionError::Plan(e.to_string()))
324}
325
326fn scalar_to_timestamp_ms(scalar: &ScalarValue) -> Result<Option<i64>> {
327    match scalar {
328        ScalarValue::TimestampMillisecond(v, _) | ScalarValue::Int64(v) => Ok(*v),
329        ScalarValue::TimestampMicrosecond(v, _) => Ok(v.map(|v| v / 1_000)),
330        ScalarValue::TimestampNanosecond(v, _) => Ok(v.map(|v| v / 1_000_000)),
331        ScalarValue::TimestampSecond(v, _) => Ok(v.map(|v| v * 1_000)),
332        _ => Err(DataFusionError::Plan(format!(
333            "Expected timestamp argument for window function, got: {scalar:?}"
334        ))),
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano, TimestampMicrosecondType};
342    use arrow_array::cast::AsArray;
343    use arrow_array::Array;
344    use arrow_schema::Field;
345    use datafusion_common::config::ConfigOptions;
346    use datafusion_expr::ScalarUDF;
347
348    fn interval_dt(days: i32, ms: i32) -> ColumnarValue {
349        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(
350            days, ms,
351        ))))
352    }
353
354    fn ts_ms(ms: Option<i64>) -> ColumnarValue {
355        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(ms, None))
356    }
357
358    /// Returns the microsecond value of a UDF scalar result, or panics
359    /// if the result isn't a `TimestampMicrosecond` scalar.
360    fn expect_ts_us(result: ColumnarValue) -> Option<i64> {
361        match result {
362            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, _)) => v,
363            other => panic!("Expected TimestampMicrosecond scalar, got: {other:?}"),
364        }
365    }
366
367    /// Returns the per-row microsecond values of a UDF array result.
368    fn array_values_us(arr: &dyn Array) -> Vec<Option<i64>> {
369        let r = arr.as_primitive::<TimestampMicrosecondType>();
370        (0..r.len())
371            .map(|i| if r.is_null(i) { None } else { Some(r.value(i)) })
372            .collect()
373    }
374
375    fn make_args(args: Vec<ColumnarValue>, rows: usize) -> ScalarFunctionArgs {
376        ScalarFunctionArgs {
377            args,
378            arg_fields: vec![],
379            number_rows: rows,
380            return_field: Arc::new(Field::new(
381                "output",
382                DataType::Timestamp(TimeUnit::Microsecond, None),
383                true,
384            )),
385            config_options: Arc::new(ConfigOptions::default()),
386        }
387    }
388
389    // ── Tumble ──────────────────────────────────────────────────────────
390
391    #[test]
392    fn test_tumble_basic() {
393        // 5-minute interval, ts=7 minutes (420 000 ms) → bucket start =
394        // 5 minutes = 300 000 ms = 300 000 000 µs.
395        let result = TumbleWindowStart::new()
396            .invoke_with_args(make_args(
397                vec![ts_ms(Some(420_000)), interval_dt(0, 300_000)],
398                1,
399            ))
400            .unwrap();
401        assert_eq!(expect_ts_us(result), Some(300_000_000));
402    }
403
404    #[test]
405    fn test_tumble_exact_boundary() {
406        let result = TumbleWindowStart::new()
407            .invoke_with_args(make_args(
408                vec![ts_ms(Some(300_000)), interval_dt(0, 300_000)],
409                1,
410            ))
411            .unwrap();
412        assert_eq!(expect_ts_us(result), Some(300_000_000));
413    }
414
415    #[test]
416    fn test_tumble_zero_timestamp() {
417        let result = TumbleWindowStart::new()
418            .invoke_with_args(make_args(vec![ts_ms(Some(0)), interval_dt(0, 300_000)], 1))
419            .unwrap();
420        assert_eq!(expect_ts_us(result), Some(0));
421    }
422
423    #[test]
424    fn test_tumble_null_handling() {
425        let result = TumbleWindowStart::new()
426            .invoke_with_args(make_args(vec![ts_ms(None), interval_dt(0, 300_000)], 1))
427            .unwrap();
428        assert_eq!(expect_ts_us(result), None);
429    }
430
431    #[test]
432    fn test_tumble_array_input() {
433        let ts = ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from(vec![
434            Some(0),
435            Some(150_000),
436            Some(300_000),
437            Some(420_000),
438            None,
439        ])));
440        let result = TumbleWindowStart::new()
441            .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 5))
442            .unwrap();
443        match result {
444            ColumnarValue::Array(arr) => assert_eq!(
445                array_values_us(&arr),
446                vec![Some(0), Some(0), Some(300_000_000), Some(300_000_000), None,]
447            ),
448            ColumnarValue::Scalar(_) => panic!("Expected array result"),
449        }
450    }
451
452    /// Regression: TUMBLE over a `Timestamp(Nanosecond)` column must
453    /// take the `to_millis_array` path (any precision in, milliseconds
454    /// out) rather than failing on the array fast path.
455    #[test]
456    fn test_tumble_array_input_nanosecond() {
457        use arrow_array::TimestampNanosecondArray;
458        let ts = ColumnarValue::Array(Arc::new(TimestampNanosecondArray::from(vec![
459            Some(0),
460            Some(150_000_000_000),
461            Some(300_000_000_000),
462            Some(420_000_000_000),
463            None,
464        ])));
465        let result = TumbleWindowStart::new()
466            .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 5))
467            .unwrap();
468        match result {
469            ColumnarValue::Array(arr) => assert_eq!(
470                array_values_us(&arr),
471                vec![Some(0), Some(0), Some(300_000_000), Some(300_000_000), None,]
472            ),
473            ColumnarValue::Scalar(_) => panic!("Expected array result"),
474        }
475    }
476
477    /// Regression: HOP over `Timestamp(Nanosecond)` — same shape as the
478    /// tumble nanosecond regression.
479    #[test]
480    fn test_hop_array_input_nanosecond() {
481        use arrow_array::TimestampNanosecondArray;
482        let ts = ColumnarValue::Array(Arc::new(TimestampNanosecondArray::from(vec![Some(
483            420_000_000_000,
484        )])));
485        let result = HopWindowStart::new()
486            .invoke_with_args(make_args(
487                vec![ts, interval_dt(0, 300_000), interval_dt(0, 600_000)],
488                1,
489            ))
490            .unwrap();
491        match result {
492            ColumnarValue::Array(arr) => assert_eq!(array_values_us(&arr), vec![Some(0)]),
493            ColumnarValue::Scalar(_) => panic!("Expected array result"),
494        }
495    }
496
497    #[test]
498    fn test_tumble_month_day_nano_interval() {
499        // 1 hour as IntervalMonthDayNano (3 600 s in nanoseconds);
500        // ts = 90 minutes (5 400 000 ms) → bucket start = 1 hour =
501        // 3 600 000 ms = 3 600 000 000 µs.
502        let interval = ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
503            IntervalMonthDayNano::new(0, 0, 3_600_000_000_000),
504        )));
505        let result = TumbleWindowStart::new()
506            .invoke_with_args(make_args(vec![ts_ms(Some(5_400_000)), interval], 1))
507            .unwrap();
508        assert_eq!(expect_ts_us(result), Some(3_600_000_000));
509    }
510
511    #[test]
512    fn test_tumble_rejects_zero_interval() {
513        let result = TumbleWindowStart::new()
514            .invoke_with_args(make_args(vec![ts_ms(Some(1000)), interval_dt(0, 0)], 1));
515        assert!(result.is_err());
516    }
517
518    #[test]
519    fn test_tumble_rejects_wrong_arg_count() {
520        let result =
521            TumbleWindowStart::new().invoke_with_args(make_args(vec![ts_ms(Some(1000))], 1));
522        assert!(result.is_err());
523    }
524
525    // ── Tumble end ──────────────────────────────────────────────────────
526
527    #[test]
528    fn test_tumble_end_basic() {
529        // 5-minute interval, ts=7 min → window [5, 10) min → end = 10
530        // min = 600 000 ms = 600 000 000 µs.
531        let result = TumbleWindowEnd::new()
532            .invoke_with_args(make_args(
533                vec![ts_ms(Some(420_000)), interval_dt(0, 300_000)],
534                1,
535            ))
536            .unwrap();
537        assert_eq!(expect_ts_us(result), Some(600_000_000));
538    }
539
540    #[test]
541    fn test_tumble_end_at_boundary() {
542        let result = TumbleWindowEnd::new()
543            .invoke_with_args(make_args(
544                vec![ts_ms(Some(300_000)), interval_dt(0, 300_000)],
545                1,
546            ))
547            .unwrap();
548        assert_eq!(expect_ts_us(result), Some(600_000_000));
549    }
550
551    #[test]
552    fn test_tumble_end_null_propagates() {
553        let result = TumbleWindowEnd::new()
554            .invoke_with_args(make_args(vec![ts_ms(None), interval_dt(0, 300_000)], 1))
555            .unwrap();
556        assert_eq!(expect_ts_us(result), None);
557    }
558
559    #[test]
560    fn test_tumble_end_array_input() {
561        let ts = ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from(vec![
562            Some(0),
563            Some(150_000),
564            Some(300_000),
565            Some(420_000),
566            None,
567        ])));
568        let result = TumbleWindowEnd::new()
569            .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 5))
570            .unwrap();
571        match result {
572            ColumnarValue::Array(arr) => assert_eq!(
573                array_values_us(&arr),
574                vec![
575                    Some(300_000_000),
576                    Some(300_000_000),
577                    Some(600_000_000),
578                    Some(600_000_000),
579                    None,
580                ]
581            ),
582            ColumnarValue::Scalar(_) => panic!("Expected array result"),
583        }
584    }
585
586    #[test]
587    fn test_tumble_end_array_input_nanosecond() {
588        use arrow_array::TimestampNanosecondArray;
589        let ts = ColumnarValue::Array(Arc::new(TimestampNanosecondArray::from(vec![Some(
590            420_000_000_000,
591        )])));
592        let result = TumbleWindowEnd::new()
593            .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 1))
594            .unwrap();
595        match result {
596            ColumnarValue::Array(arr) => assert_eq!(array_values_us(&arr), vec![Some(600_000_000)]),
597            ColumnarValue::Scalar(_) => panic!("Expected array result"),
598        }
599    }
600
601    #[test]
602    fn test_tumble_end_rejects_zero_interval() {
603        let result = TumbleWindowEnd::new()
604            .invoke_with_args(make_args(vec![ts_ms(Some(1000)), interval_dt(0, 0)], 1));
605        assert!(result.is_err());
606    }
607
608    #[test]
609    fn test_tumble_end_rejects_wrong_arg_count() {
610        let result = TumbleWindowEnd::new().invoke_with_args(make_args(vec![ts_ms(Some(1000))], 1));
611        assert!(result.is_err());
612    }
613
614    // ── Hop ─────────────────────────────────────────────────────────────
615
616    #[test]
617    fn test_hop_basic() {
618        // slide=5 min, size=10 min, ts=7 min → earliest start = 0 (the
619        // `[-2 min, 8 min)` window doesn't exist because earliest start
620        // is non-negative; correct earliest start that contains 7 min
621        // is 0 because `[0, 10)` is the first such window).
622        let result = HopWindowStart::new()
623            .invoke_with_args(make_args(
624                vec![
625                    ts_ms(Some(420_000)),
626                    interval_dt(0, 300_000),
627                    interval_dt(0, 600_000),
628                ],
629                1,
630            ))
631            .unwrap();
632        assert_eq!(expect_ts_us(result), Some(0));
633    }
634
635    #[test]
636    fn test_hop_at_boundary() {
637        let result = HopWindowStart::new()
638            .invoke_with_args(make_args(
639                vec![
640                    ts_ms(Some(300_000)),
641                    interval_dt(0, 300_000),
642                    interval_dt(0, 600_000),
643                ],
644                1,
645            ))
646            .unwrap();
647        assert_eq!(expect_ts_us(result), Some(0));
648    }
649
650    #[test]
651    fn test_hop_rejects_wrong_arg_count() {
652        let result = HopWindowStart::new().invoke_with_args(make_args(
653            vec![ts_ms(Some(1000)), interval_dt(0, 300_000)],
654            1,
655        ));
656        assert!(result.is_err());
657    }
658
659    // ── Hop end ─────────────────────────────────────────────────────────
660
661    #[test]
662    fn test_hop_end_basic() {
663        // slide=5 min, size=10 min, ts=7 min → earliest start=0,
664        // end=10 min = 600 000 000 µs.
665        let result = HopWindowEnd::new()
666            .invoke_with_args(make_args(
667                vec![
668                    ts_ms(Some(420_000)),
669                    interval_dt(0, 300_000),
670                    interval_dt(0, 600_000),
671                ],
672                1,
673            ))
674            .unwrap();
675        assert_eq!(expect_ts_us(result), Some(600_000_000));
676    }
677
678    #[test]
679    fn test_hop_end_rejects_wrong_arg_count() {
680        let result = HopWindowEnd::new().invoke_with_args(make_args(
681            vec![ts_ms(Some(1000)), interval_dt(0, 300_000)],
682            1,
683        ));
684        assert!(result.is_err());
685    }
686
687    // ── Session ─────────────────────────────────────────────────────────
688
689    #[test]
690    fn test_session_passthrough_scalar() {
691        let result = SessionWindowStart::new()
692            .invoke_with_args(make_args(
693                vec![ts_ms(Some(42_000)), interval_dt(0, 60_000)],
694                1,
695            ))
696            .unwrap();
697        assert_eq!(expect_ts_us(result), Some(42_000_000));
698    }
699
700    #[test]
701    fn test_session_passthrough_null() {
702        let result = SessionWindowStart::new()
703            .invoke_with_args(make_args(vec![ts_ms(None), interval_dt(0, 60_000)], 1))
704            .unwrap();
705        assert_eq!(expect_ts_us(result), None);
706    }
707
708    // ── Cumulate ────────────────────────────────────────────────────────
709
710    #[test]
711    fn test_cumulate_basic() {
712        // step=1 min, size=5 min, ts=30 s → epoch start = 0.
713        let result = CumulateWindowStart::new()
714            .invoke_with_args(make_args(
715                vec![
716                    ts_ms(Some(30_000)),
717                    interval_dt(0, 60_000),
718                    interval_dt(0, 300_000),
719                ],
720                1,
721            ))
722            .unwrap();
723        assert_eq!(expect_ts_us(result), Some(0));
724    }
725
726    #[test]
727    fn test_cumulate_second_epoch() {
728        // ts=350 s → epoch start = 5 min = 300 000 000 µs.
729        let result = CumulateWindowStart::new()
730            .invoke_with_args(make_args(
731                vec![
732                    ts_ms(Some(350_000)),
733                    interval_dt(0, 60_000),
734                    interval_dt(0, 300_000),
735                ],
736                1,
737            ))
738            .unwrap();
739        assert_eq!(expect_ts_us(result), Some(300_000_000));
740    }
741
742    #[test]
743    fn test_cumulate_rejects_step_exceeds_size() {
744        let result = CumulateWindowStart::new().invoke_with_args(make_args(
745            vec![
746                ts_ms(Some(1000)),
747                interval_dt(0, 600_000),
748                interval_dt(0, 300_000),
749            ],
750            1,
751        ));
752        assert!(result.is_err());
753    }
754
755    #[test]
756    fn test_cumulate_rejects_not_divisible() {
757        let result = CumulateWindowStart::new().invoke_with_args(make_args(
758            vec![
759                ts_ms(Some(1000)),
760                interval_dt(0, 70_000),
761                interval_dt(0, 300_000),
762            ],
763            1,
764        ));
765        assert!(result.is_err());
766    }
767
768    #[test]
769    fn test_cumulate_rejects_wrong_arg_count() {
770        let result = CumulateWindowStart::new().invoke_with_args(make_args(
771            vec![ts_ms(Some(1000)), interval_dt(0, 60_000)],
772            1,
773        ));
774        assert!(result.is_err());
775    }
776
777    // ── Cumulate end ────────────────────────────────────────────────────
778
779    #[test]
780    fn test_cumulate_end_basic() {
781        // ts=30 s → epoch=[0, 5 min) → end = 5 min = 300 000 000 µs.
782        let result = CumulateWindowEnd::new()
783            .invoke_with_args(make_args(
784                vec![
785                    ts_ms(Some(30_000)),
786                    interval_dt(0, 60_000),
787                    interval_dt(0, 300_000),
788                ],
789                1,
790            ))
791            .unwrap();
792        assert_eq!(expect_ts_us(result), Some(300_000_000));
793    }
794
795    #[test]
796    fn test_cumulate_end_rejects_step_exceeds_size() {
797        let result = CumulateWindowEnd::new().invoke_with_args(make_args(
798            vec![
799                ts_ms(Some(1000)),
800                interval_dt(0, 600_000),
801                interval_dt(0, 300_000),
802            ],
803            1,
804        ));
805        assert!(result.is_err());
806    }
807
808    // ── Registration / signature ────────────────────────────────────────
809
810    #[test]
811    fn test_udf_registration() {
812        for (impl_name, expected) in [
813            (
814                ScalarUDF::new_from_impl(TumbleWindowStart::new())
815                    .name()
816                    .to_string(),
817                "tumble",
818            ),
819            (
820                ScalarUDF::new_from_impl(TumbleWindowEnd::new())
821                    .name()
822                    .to_string(),
823                "tumble_end",
824            ),
825            (
826                ScalarUDF::new_from_impl(HopWindowStart::new())
827                    .name()
828                    .to_string(),
829                "hop",
830            ),
831            (
832                ScalarUDF::new_from_impl(HopWindowEnd::new())
833                    .name()
834                    .to_string(),
835                "hop_end",
836            ),
837            (
838                ScalarUDF::new_from_impl(SessionWindowStart::new())
839                    .name()
840                    .to_string(),
841                "session",
842            ),
843            (
844                ScalarUDF::new_from_impl(CumulateWindowStart::new())
845                    .name()
846                    .to_string(),
847                "cumulate",
848            ),
849            (
850                ScalarUDF::new_from_impl(CumulateWindowEnd::new())
851                    .name()
852                    .to_string(),
853                "cumulate_end",
854            ),
855        ] {
856            assert_eq!(impl_name, expected);
857        }
858    }
859
860    #[test]
861    fn test_udf_signatures_immutable() {
862        for sig in [
863            TumbleWindowStart::new().signature().clone(),
864            TumbleWindowEnd::new().signature().clone(),
865            HopWindowStart::new().signature().clone(),
866            HopWindowEnd::new().signature().clone(),
867            SessionWindowStart::new().signature().clone(),
868            CumulateWindowStart::new().signature().clone(),
869            CumulateWindowEnd::new().signature().clone(),
870        ] {
871            assert_eq!(sig.volatility, Volatility::Immutable);
872        }
873    }
874
875    #[test]
876    fn test_return_types_microsecond() {
877        let target = DataType::Timestamp(TimeUnit::Microsecond, None);
878        assert_eq!(TumbleWindowStart::new().return_type(&[]).unwrap(), target);
879        assert_eq!(TumbleWindowEnd::new().return_type(&[]).unwrap(), target);
880        assert_eq!(HopWindowStart::new().return_type(&[]).unwrap(), target);
881        assert_eq!(HopWindowEnd::new().return_type(&[]).unwrap(), target);
882        assert_eq!(SessionWindowStart::new().return_type(&[]).unwrap(), target);
883        assert_eq!(CumulateWindowStart::new().return_type(&[]).unwrap(), target);
884        assert_eq!(CumulateWindowEnd::new().return_type(&[]).unwrap(), target);
885    }
886}