1use std::any::Any;
24use std::hash::{Hash, Hasher};
25use std::ops::RangeInclusive;
26use std::sync::Arc;
27
28use arrow::datatypes::{DataType, TimeUnit};
29use arrow_array::{ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray};
30use datafusion_common::{DataFusionError, Result, ScalarValue};
31use datafusion_expr::{
32 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
33};
34use laminar_core::time::cast_to_millis_array;
35
36macro_rules! window_udf {
44 (
45 $(#[$attr:meta])*
46 $type:ident, $sql_name:literal, $arity:expr, |$args:ident| $body:expr
47 ) => {
48 $(#[$attr])*
49 #[derive(Debug)]
50 pub struct $type {
51 signature: Signature,
52 }
53
54 impl $type {
55 #[must_use]
57 pub fn new() -> Self {
58 Self { signature: variadic_signature($arity) }
59 }
60 }
61
62 impl Default for $type {
63 fn default() -> Self { Self::new() }
64 }
65
66 impl PartialEq for $type {
67 fn eq(&self, _: &Self) -> bool { true }
68 }
69
70 impl Eq for $type {}
71
72 impl Hash for $type {
73 fn hash<H: Hasher>(&self, state: &mut H) { $sql_name.hash(state); }
74 }
75
76 impl ScalarUDFImpl for $type {
77 fn as_any(&self) -> &dyn Any { self }
78 fn name(&self) -> &'static str { $sql_name }
79 fn signature(&self) -> &Signature { &self.signature }
80 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
81 Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
82 }
83 fn invoke_with_args(&self, sfa: ScalarFunctionArgs) -> Result<ColumnarValue> {
84 let $args = sfa.args;
85 check_arity(&$args, $arity, $sql_name)?;
86 $body
87 }
88 }
89 };
90}
91
92window_udf!(
95 TumbleWindowStart, "tumble", 2..=3,
98 |args| {
99 let interval_ms = positive_interval(&args[1], "tumble", "interval")?;
100 let offset_ms = optional_offset(&args, 2)?;
101 into_us_columnar(&args[0], |ts| tumble_start_ms(ts, interval_ms, offset_ms))
102 }
103);
104
105window_udf!(
106 TumbleWindowEnd, "tumble_end", 2..=3,
109 |args| {
110 let interval_ms = positive_interval(&args[1], "tumble_end", "interval")?;
111 let offset_ms = optional_offset(&args, 2)?;
112 into_us_columnar(&args[0], |ts| {
113 tumble_start_ms(ts, interval_ms, offset_ms).saturating_add(interval_ms)
114 })
115 }
116);
117
118window_udf!(
119 HopWindowStart, "hop", 3..=4,
123 |args| {
124 let slide_ms = positive_interval(&args[1], "hop", "slide")?;
125 let size_ms = positive_interval(&args[2], "hop", "size")?;
126 let offset_ms = optional_offset(&args, 3)?;
127 into_us_columnar(&args[0], |ts| hop_start_ms(ts, slide_ms, size_ms, offset_ms))
128 }
129);
130
131window_udf!(
132 HopWindowEnd, "hop_end", 3..=4,
135 |args| {
136 let slide_ms = positive_interval(&args[1], "hop_end", "slide")?;
137 let size_ms = positive_interval(&args[2], "hop_end", "size")?;
138 let offset_ms = optional_offset(&args, 3)?;
139 into_us_columnar(&args[0], |ts| {
140 hop_start_ms(ts, slide_ms, size_ms, offset_ms).saturating_add(size_ms)
141 })
142 }
143);
144
145window_udf!(
146 SessionWindowStart, "session", 2..=2,
151 |args| into_us_columnar(&args[0], |ts| ts)
152);
153
154window_udf!(
155 CumulateWindowStart, "cumulate", 3..=3,
159 |args| {
160 let (_step_ms, size_ms) = cumulate_intervals(&args, "cumulate")?;
161 into_us_columnar(&args[0], |ts| tumble_start_ms(ts, size_ms, 0))
162 }
163);
164
165window_udf!(
166 CumulateWindowEnd, "cumulate_end", 3..=3,
169 |args| {
170 let (_step_ms, size_ms) = cumulate_intervals(&args, "cumulate_end")?;
171 into_us_columnar(&args[0], |ts| {
172 tumble_start_ms(ts, size_ms, 0).saturating_add(size_ms)
173 })
174 }
175);
176
177fn tumble_start_ms(ts: i64, interval_ms: i64, offset_ms: i64) -> i64 {
183 let adj = ts - offset_ms;
184 (adj - adj.rem_euclid(interval_ms)) + offset_ms
185}
186
187fn hop_start_ms(ts: i64, slide_ms: i64, size_ms: i64, offset_ms: i64) -> i64 {
190 let adj = ts - size_ms + slide_ms - offset_ms;
191 (adj - adj.rem_euclid(slide_ms)) + offset_ms
192}
193
194fn variadic_signature(arity: RangeInclusive<usize>) -> Signature {
199 let mut arities: Vec<TypeSignature> = arity.map(TypeSignature::Any).collect();
200 let inner = if arities.len() == 1 {
201 arities.pop().unwrap()
202 } else {
203 TypeSignature::OneOf(arities)
204 };
205 Signature::new(inner, Volatility::Immutable)
206}
207
208fn check_arity(args: &[ColumnarValue], arity: RangeInclusive<usize>, fn_name: &str) -> Result<()> {
209 if arity.contains(&args.len()) {
210 return Ok(());
211 }
212 let expected = if arity.start() == arity.end() {
213 format!("exactly {}", arity.start())
214 } else {
215 format!("{}-{}", arity.start(), arity.end())
216 };
217 Err(DataFusionError::Plan(format!(
218 "{}() requires {} arguments, got {}",
219 fn_name,
220 expected,
221 args.len()
222 )))
223}
224
225fn positive_interval(value: &ColumnarValue, fn_name: &str, arg_name: &str) -> Result<i64> {
226 let ms = extract_interval_ms(value)?;
227 if ms <= 0 {
228 return Err(DataFusionError::Plan(format!(
229 "{fn_name}() {arg_name} must be positive"
230 )));
231 }
232 Ok(ms)
233}
234
235fn optional_offset(args: &[ColumnarValue], idx: usize) -> Result<i64> {
236 match args.get(idx) {
237 Some(value) => extract_interval_ms(value),
238 None => Ok(0),
239 }
240}
241
242fn cumulate_intervals(args: &[ColumnarValue], fn_name: &str) -> Result<(i64, i64)> {
243 let step_ms = positive_interval(&args[1], fn_name, "step")?;
244 let size_ms = positive_interval(&args[2], fn_name, "size")?;
245 if step_ms > size_ms {
246 return Err(DataFusionError::Plan(format!(
247 "{fn_name}() step must not exceed size"
248 )));
249 }
250 if size_ms % step_ms != 0 {
251 return Err(DataFusionError::Plan(format!(
252 "{fn_name}() size must be evenly divisible by step"
253 )));
254 }
255 Ok((step_ms, size_ms))
256}
257
258fn extract_interval_ms(value: &ColumnarValue) -> Result<i64> {
261 match value {
262 ColumnarValue::Scalar(scalar) => scalar_interval_to_ms(scalar),
263 ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented(
264 "Array interval arguments not supported for window functions".to_string(),
265 )),
266 }
267}
268
269fn scalar_interval_to_ms(scalar: &ScalarValue) -> Result<i64> {
270 match scalar {
271 ScalarValue::IntervalDayTime(Some(v)) => {
272 Ok(i64::from(v.days) * 86_400_000 + i64::from(v.milliseconds))
273 }
274 ScalarValue::IntervalMonthDayNano(Some(v)) => {
275 if v.months != 0 {
276 return Err(DataFusionError::NotImplemented(
277 "Month-based intervals not supported for window functions \
278 (use days/hours/minutes/seconds)"
279 .to_string(),
280 ));
281 }
282 Ok(i64::from(v.days) * 86_400_000 + v.nanoseconds / 1_000_000)
283 }
284 ScalarValue::IntervalYearMonth(_) => Err(DataFusionError::NotImplemented(
285 "Year-month intervals not supported for window functions".to_string(),
286 )),
287 ScalarValue::Int64(Some(ms)) => Ok(*ms),
288 _ => Err(DataFusionError::Plan(format!(
289 "Expected interval argument for window function, got: {scalar:?}"
290 ))),
291 }
292}
293
294fn into_us_columnar(
300 value: &ColumnarValue,
301 transform: impl Fn(i64) -> i64,
302) -> Result<ColumnarValue> {
303 match value {
304 ColumnarValue::Array(array) => {
305 let input = to_millis_array(array)?;
306 let result: TimestampMicrosecondArray = input
307 .iter()
308 .map(|opt| opt.map(|ms| transform(ms).saturating_mul(1000)))
309 .collect();
310 Ok(ColumnarValue::Array(Arc::new(result)))
311 }
312 ColumnarValue::Scalar(scalar) => {
313 let result =
314 scalar_to_timestamp_ms(scalar)?.map(|ms| transform(ms).saturating_mul(1000));
315 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
316 result, None,
317 )))
318 }
319 }
320}
321
322fn to_millis_array(array: &ArrayRef) -> Result<TimestampMillisecondArray> {
323 cast_to_millis_array(array.as_ref()).map_err(|e| DataFusionError::Plan(e.to_string()))
324}
325
326fn scalar_to_timestamp_ms(scalar: &ScalarValue) -> Result<Option<i64>> {
327 match scalar {
328 ScalarValue::TimestampMillisecond(v, _) | ScalarValue::Int64(v) => Ok(*v),
329 ScalarValue::TimestampMicrosecond(v, _) => Ok(v.map(|v| v / 1_000)),
330 ScalarValue::TimestampNanosecond(v, _) => Ok(v.map(|v| v / 1_000_000)),
331 ScalarValue::TimestampSecond(v, _) => Ok(v.map(|v| v * 1_000)),
332 _ => Err(DataFusionError::Plan(format!(
333 "Expected timestamp argument for window function, got: {scalar:?}"
334 ))),
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano, TimestampMicrosecondType};
342 use arrow_array::cast::AsArray;
343 use arrow_array::Array;
344 use arrow_schema::Field;
345 use datafusion_common::config::ConfigOptions;
346 use datafusion_expr::ScalarUDF;
347
348 fn interval_dt(days: i32, ms: i32) -> ColumnarValue {
349 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(
350 days, ms,
351 ))))
352 }
353
354 fn ts_ms(ms: Option<i64>) -> ColumnarValue {
355 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(ms, None))
356 }
357
358 fn expect_ts_us(result: ColumnarValue) -> Option<i64> {
361 match result {
362 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, _)) => v,
363 other => panic!("Expected TimestampMicrosecond scalar, got: {other:?}"),
364 }
365 }
366
367 fn array_values_us(arr: &dyn Array) -> Vec<Option<i64>> {
369 let r = arr.as_primitive::<TimestampMicrosecondType>();
370 (0..r.len())
371 .map(|i| if r.is_null(i) { None } else { Some(r.value(i)) })
372 .collect()
373 }
374
375 fn make_args(args: Vec<ColumnarValue>, rows: usize) -> ScalarFunctionArgs {
376 ScalarFunctionArgs {
377 args,
378 arg_fields: vec![],
379 number_rows: rows,
380 return_field: Arc::new(Field::new(
381 "output",
382 DataType::Timestamp(TimeUnit::Microsecond, None),
383 true,
384 )),
385 config_options: Arc::new(ConfigOptions::default()),
386 }
387 }
388
389 #[test]
392 fn test_tumble_basic() {
393 let result = TumbleWindowStart::new()
396 .invoke_with_args(make_args(
397 vec![ts_ms(Some(420_000)), interval_dt(0, 300_000)],
398 1,
399 ))
400 .unwrap();
401 assert_eq!(expect_ts_us(result), Some(300_000_000));
402 }
403
404 #[test]
405 fn test_tumble_exact_boundary() {
406 let result = TumbleWindowStart::new()
407 .invoke_with_args(make_args(
408 vec![ts_ms(Some(300_000)), interval_dt(0, 300_000)],
409 1,
410 ))
411 .unwrap();
412 assert_eq!(expect_ts_us(result), Some(300_000_000));
413 }
414
415 #[test]
416 fn test_tumble_zero_timestamp() {
417 let result = TumbleWindowStart::new()
418 .invoke_with_args(make_args(vec![ts_ms(Some(0)), interval_dt(0, 300_000)], 1))
419 .unwrap();
420 assert_eq!(expect_ts_us(result), Some(0));
421 }
422
423 #[test]
424 fn test_tumble_null_handling() {
425 let result = TumbleWindowStart::new()
426 .invoke_with_args(make_args(vec![ts_ms(None), interval_dt(0, 300_000)], 1))
427 .unwrap();
428 assert_eq!(expect_ts_us(result), None);
429 }
430
431 #[test]
432 fn test_tumble_array_input() {
433 let ts = ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from(vec![
434 Some(0),
435 Some(150_000),
436 Some(300_000),
437 Some(420_000),
438 None,
439 ])));
440 let result = TumbleWindowStart::new()
441 .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 5))
442 .unwrap();
443 match result {
444 ColumnarValue::Array(arr) => assert_eq!(
445 array_values_us(&arr),
446 vec![Some(0), Some(0), Some(300_000_000), Some(300_000_000), None,]
447 ),
448 ColumnarValue::Scalar(_) => panic!("Expected array result"),
449 }
450 }
451
452 #[test]
456 fn test_tumble_array_input_nanosecond() {
457 use arrow_array::TimestampNanosecondArray;
458 let ts = ColumnarValue::Array(Arc::new(TimestampNanosecondArray::from(vec![
459 Some(0),
460 Some(150_000_000_000),
461 Some(300_000_000_000),
462 Some(420_000_000_000),
463 None,
464 ])));
465 let result = TumbleWindowStart::new()
466 .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 5))
467 .unwrap();
468 match result {
469 ColumnarValue::Array(arr) => assert_eq!(
470 array_values_us(&arr),
471 vec![Some(0), Some(0), Some(300_000_000), Some(300_000_000), None,]
472 ),
473 ColumnarValue::Scalar(_) => panic!("Expected array result"),
474 }
475 }
476
477 #[test]
480 fn test_hop_array_input_nanosecond() {
481 use arrow_array::TimestampNanosecondArray;
482 let ts = ColumnarValue::Array(Arc::new(TimestampNanosecondArray::from(vec![Some(
483 420_000_000_000,
484 )])));
485 let result = HopWindowStart::new()
486 .invoke_with_args(make_args(
487 vec![ts, interval_dt(0, 300_000), interval_dt(0, 600_000)],
488 1,
489 ))
490 .unwrap();
491 match result {
492 ColumnarValue::Array(arr) => assert_eq!(array_values_us(&arr), vec![Some(0)]),
493 ColumnarValue::Scalar(_) => panic!("Expected array result"),
494 }
495 }
496
497 #[test]
498 fn test_tumble_month_day_nano_interval() {
499 let interval = ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
503 IntervalMonthDayNano::new(0, 0, 3_600_000_000_000),
504 )));
505 let result = TumbleWindowStart::new()
506 .invoke_with_args(make_args(vec![ts_ms(Some(5_400_000)), interval], 1))
507 .unwrap();
508 assert_eq!(expect_ts_us(result), Some(3_600_000_000));
509 }
510
511 #[test]
512 fn test_tumble_rejects_zero_interval() {
513 let result = TumbleWindowStart::new()
514 .invoke_with_args(make_args(vec![ts_ms(Some(1000)), interval_dt(0, 0)], 1));
515 assert!(result.is_err());
516 }
517
518 #[test]
519 fn test_tumble_rejects_wrong_arg_count() {
520 let result =
521 TumbleWindowStart::new().invoke_with_args(make_args(vec![ts_ms(Some(1000))], 1));
522 assert!(result.is_err());
523 }
524
525 #[test]
528 fn test_tumble_end_basic() {
529 let result = TumbleWindowEnd::new()
532 .invoke_with_args(make_args(
533 vec![ts_ms(Some(420_000)), interval_dt(0, 300_000)],
534 1,
535 ))
536 .unwrap();
537 assert_eq!(expect_ts_us(result), Some(600_000_000));
538 }
539
540 #[test]
541 fn test_tumble_end_at_boundary() {
542 let result = TumbleWindowEnd::new()
543 .invoke_with_args(make_args(
544 vec![ts_ms(Some(300_000)), interval_dt(0, 300_000)],
545 1,
546 ))
547 .unwrap();
548 assert_eq!(expect_ts_us(result), Some(600_000_000));
549 }
550
551 #[test]
552 fn test_tumble_end_null_propagates() {
553 let result = TumbleWindowEnd::new()
554 .invoke_with_args(make_args(vec![ts_ms(None), interval_dt(0, 300_000)], 1))
555 .unwrap();
556 assert_eq!(expect_ts_us(result), None);
557 }
558
559 #[test]
560 fn test_tumble_end_array_input() {
561 let ts = ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from(vec![
562 Some(0),
563 Some(150_000),
564 Some(300_000),
565 Some(420_000),
566 None,
567 ])));
568 let result = TumbleWindowEnd::new()
569 .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 5))
570 .unwrap();
571 match result {
572 ColumnarValue::Array(arr) => assert_eq!(
573 array_values_us(&arr),
574 vec![
575 Some(300_000_000),
576 Some(300_000_000),
577 Some(600_000_000),
578 Some(600_000_000),
579 None,
580 ]
581 ),
582 ColumnarValue::Scalar(_) => panic!("Expected array result"),
583 }
584 }
585
586 #[test]
587 fn test_tumble_end_array_input_nanosecond() {
588 use arrow_array::TimestampNanosecondArray;
589 let ts = ColumnarValue::Array(Arc::new(TimestampNanosecondArray::from(vec![Some(
590 420_000_000_000,
591 )])));
592 let result = TumbleWindowEnd::new()
593 .invoke_with_args(make_args(vec![ts, interval_dt(0, 300_000)], 1))
594 .unwrap();
595 match result {
596 ColumnarValue::Array(arr) => assert_eq!(array_values_us(&arr), vec![Some(600_000_000)]),
597 ColumnarValue::Scalar(_) => panic!("Expected array result"),
598 }
599 }
600
601 #[test]
602 fn test_tumble_end_rejects_zero_interval() {
603 let result = TumbleWindowEnd::new()
604 .invoke_with_args(make_args(vec![ts_ms(Some(1000)), interval_dt(0, 0)], 1));
605 assert!(result.is_err());
606 }
607
608 #[test]
609 fn test_tumble_end_rejects_wrong_arg_count() {
610 let result = TumbleWindowEnd::new().invoke_with_args(make_args(vec![ts_ms(Some(1000))], 1));
611 assert!(result.is_err());
612 }
613
614 #[test]
617 fn test_hop_basic() {
618 let result = HopWindowStart::new()
623 .invoke_with_args(make_args(
624 vec![
625 ts_ms(Some(420_000)),
626 interval_dt(0, 300_000),
627 interval_dt(0, 600_000),
628 ],
629 1,
630 ))
631 .unwrap();
632 assert_eq!(expect_ts_us(result), Some(0));
633 }
634
635 #[test]
636 fn test_hop_at_boundary() {
637 let result = HopWindowStart::new()
638 .invoke_with_args(make_args(
639 vec![
640 ts_ms(Some(300_000)),
641 interval_dt(0, 300_000),
642 interval_dt(0, 600_000),
643 ],
644 1,
645 ))
646 .unwrap();
647 assert_eq!(expect_ts_us(result), Some(0));
648 }
649
650 #[test]
651 fn test_hop_rejects_wrong_arg_count() {
652 let result = HopWindowStart::new().invoke_with_args(make_args(
653 vec![ts_ms(Some(1000)), interval_dt(0, 300_000)],
654 1,
655 ));
656 assert!(result.is_err());
657 }
658
659 #[test]
662 fn test_hop_end_basic() {
663 let result = HopWindowEnd::new()
666 .invoke_with_args(make_args(
667 vec![
668 ts_ms(Some(420_000)),
669 interval_dt(0, 300_000),
670 interval_dt(0, 600_000),
671 ],
672 1,
673 ))
674 .unwrap();
675 assert_eq!(expect_ts_us(result), Some(600_000_000));
676 }
677
678 #[test]
679 fn test_hop_end_rejects_wrong_arg_count() {
680 let result = HopWindowEnd::new().invoke_with_args(make_args(
681 vec![ts_ms(Some(1000)), interval_dt(0, 300_000)],
682 1,
683 ));
684 assert!(result.is_err());
685 }
686
687 #[test]
690 fn test_session_passthrough_scalar() {
691 let result = SessionWindowStart::new()
692 .invoke_with_args(make_args(
693 vec![ts_ms(Some(42_000)), interval_dt(0, 60_000)],
694 1,
695 ))
696 .unwrap();
697 assert_eq!(expect_ts_us(result), Some(42_000_000));
698 }
699
700 #[test]
701 fn test_session_passthrough_null() {
702 let result = SessionWindowStart::new()
703 .invoke_with_args(make_args(vec![ts_ms(None), interval_dt(0, 60_000)], 1))
704 .unwrap();
705 assert_eq!(expect_ts_us(result), None);
706 }
707
708 #[test]
711 fn test_cumulate_basic() {
712 let result = CumulateWindowStart::new()
714 .invoke_with_args(make_args(
715 vec![
716 ts_ms(Some(30_000)),
717 interval_dt(0, 60_000),
718 interval_dt(0, 300_000),
719 ],
720 1,
721 ))
722 .unwrap();
723 assert_eq!(expect_ts_us(result), Some(0));
724 }
725
726 #[test]
727 fn test_cumulate_second_epoch() {
728 let result = CumulateWindowStart::new()
730 .invoke_with_args(make_args(
731 vec![
732 ts_ms(Some(350_000)),
733 interval_dt(0, 60_000),
734 interval_dt(0, 300_000),
735 ],
736 1,
737 ))
738 .unwrap();
739 assert_eq!(expect_ts_us(result), Some(300_000_000));
740 }
741
742 #[test]
743 fn test_cumulate_rejects_step_exceeds_size() {
744 let result = CumulateWindowStart::new().invoke_with_args(make_args(
745 vec![
746 ts_ms(Some(1000)),
747 interval_dt(0, 600_000),
748 interval_dt(0, 300_000),
749 ],
750 1,
751 ));
752 assert!(result.is_err());
753 }
754
755 #[test]
756 fn test_cumulate_rejects_not_divisible() {
757 let result = CumulateWindowStart::new().invoke_with_args(make_args(
758 vec![
759 ts_ms(Some(1000)),
760 interval_dt(0, 70_000),
761 interval_dt(0, 300_000),
762 ],
763 1,
764 ));
765 assert!(result.is_err());
766 }
767
768 #[test]
769 fn test_cumulate_rejects_wrong_arg_count() {
770 let result = CumulateWindowStart::new().invoke_with_args(make_args(
771 vec![ts_ms(Some(1000)), interval_dt(0, 60_000)],
772 1,
773 ));
774 assert!(result.is_err());
775 }
776
777 #[test]
780 fn test_cumulate_end_basic() {
781 let result = CumulateWindowEnd::new()
783 .invoke_with_args(make_args(
784 vec![
785 ts_ms(Some(30_000)),
786 interval_dt(0, 60_000),
787 interval_dt(0, 300_000),
788 ],
789 1,
790 ))
791 .unwrap();
792 assert_eq!(expect_ts_us(result), Some(300_000_000));
793 }
794
795 #[test]
796 fn test_cumulate_end_rejects_step_exceeds_size() {
797 let result = CumulateWindowEnd::new().invoke_with_args(make_args(
798 vec![
799 ts_ms(Some(1000)),
800 interval_dt(0, 600_000),
801 interval_dt(0, 300_000),
802 ],
803 1,
804 ));
805 assert!(result.is_err());
806 }
807
808 #[test]
811 fn test_udf_registration() {
812 for (impl_name, expected) in [
813 (
814 ScalarUDF::new_from_impl(TumbleWindowStart::new())
815 .name()
816 .to_string(),
817 "tumble",
818 ),
819 (
820 ScalarUDF::new_from_impl(TumbleWindowEnd::new())
821 .name()
822 .to_string(),
823 "tumble_end",
824 ),
825 (
826 ScalarUDF::new_from_impl(HopWindowStart::new())
827 .name()
828 .to_string(),
829 "hop",
830 ),
831 (
832 ScalarUDF::new_from_impl(HopWindowEnd::new())
833 .name()
834 .to_string(),
835 "hop_end",
836 ),
837 (
838 ScalarUDF::new_from_impl(SessionWindowStart::new())
839 .name()
840 .to_string(),
841 "session",
842 ),
843 (
844 ScalarUDF::new_from_impl(CumulateWindowStart::new())
845 .name()
846 .to_string(),
847 "cumulate",
848 ),
849 (
850 ScalarUDF::new_from_impl(CumulateWindowEnd::new())
851 .name()
852 .to_string(),
853 "cumulate_end",
854 ),
855 ] {
856 assert_eq!(impl_name, expected);
857 }
858 }
859
860 #[test]
861 fn test_udf_signatures_immutable() {
862 for sig in [
863 TumbleWindowStart::new().signature().clone(),
864 TumbleWindowEnd::new().signature().clone(),
865 HopWindowStart::new().signature().clone(),
866 HopWindowEnd::new().signature().clone(),
867 SessionWindowStart::new().signature().clone(),
868 CumulateWindowStart::new().signature().clone(),
869 CumulateWindowEnd::new().signature().clone(),
870 ] {
871 assert_eq!(sig.volatility, Volatility::Immutable);
872 }
873 }
874
875 #[test]
876 fn test_return_types_microsecond() {
877 let target = DataType::Timestamp(TimeUnit::Microsecond, None);
878 assert_eq!(TumbleWindowStart::new().return_type(&[]).unwrap(), target);
879 assert_eq!(TumbleWindowEnd::new().return_type(&[]).unwrap(), target);
880 assert_eq!(HopWindowStart::new().return_type(&[]).unwrap(), target);
881 assert_eq!(HopWindowEnd::new().return_type(&[]).unwrap(), target);
882 assert_eq!(SessionWindowStart::new().return_type(&[]).unwrap(), target);
883 assert_eq!(CumulateWindowStart::new().return_type(&[]).unwrap(), target);
884 assert_eq!(CumulateWindowEnd::new().return_type(&[]).unwrap(), target);
885 }
886}