Skip to main content

laminar_sql/datafusion/
proctime_udf.rs

1//! Processing-time UDF for `DataFusion` integration
2//!
3//! Provides a `proctime()` scalar function that returns the current
4//! wall-clock timestamp. Used to declare processing-time watermarks:
5//!
6//! ```sql
7//! CREATE SOURCE events (
8//!     data VARCHAR,
9//!     ts TIMESTAMP,
10//!     WATERMARK FOR ts AS PROCTIME()
11//! );
12//! ```
13
14use std::any::Any;
15use std::hash::{Hash, Hasher};
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_common::{Result, ScalarValue};
20use datafusion_expr::{
21    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
22};
23
24/// Scalar UDF that returns the current processing time.
25///
26/// `proctime()` takes no arguments and returns a `TimestampMillisecond`
27/// representing the current wall-clock time. Unlike `watermark()`, this
28/// function is volatile — it returns a different value on each invocation.
29#[derive(Debug)]
30pub struct ProcTimeUdf {
31    signature: Signature,
32}
33
34impl ProcTimeUdf {
35    /// Creates a new processing-time UDF.
36    #[must_use]
37    pub fn new() -> Self {
38        Self {
39            signature: Signature::new(TypeSignature::Nullary, Volatility::Volatile),
40        }
41    }
42}
43
44impl Default for ProcTimeUdf {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl PartialEq for ProcTimeUdf {
51    fn eq(&self, _other: &Self) -> bool {
52        true
53    }
54}
55
56impl Eq for ProcTimeUdf {}
57
58impl Hash for ProcTimeUdf {
59    fn hash<H: Hasher>(&self, state: &mut H) {
60        "proctime".hash(state);
61    }
62}
63
64impl ScalarUDFImpl for ProcTimeUdf {
65    fn as_any(&self) -> &dyn Any {
66        self
67    }
68
69    fn name(&self) -> &'static str {
70        "proctime"
71    }
72
73    fn signature(&self) -> &Signature {
74        &self.signature
75    }
76
77    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
78        Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
79    }
80
81    #[allow(clippy::cast_possible_truncation)]
82    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
83        let now_ms = SystemTime::now()
84            .duration_since(UNIX_EPOCH)
85            .unwrap_or_default()
86            .as_millis() as i64;
87        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
88            Some(now_ms),
89            None,
90        )))
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use std::sync::Arc;
98
99    use arrow_schema::Field;
100    use datafusion_common::config::ConfigOptions;
101    use datafusion_expr::ScalarUDF;
102
103    fn make_args() -> ScalarFunctionArgs {
104        ScalarFunctionArgs {
105            args: vec![],
106            arg_fields: vec![],
107            number_rows: 1,
108            return_field: Arc::new(Field::new(
109                "output",
110                DataType::Timestamp(TimeUnit::Millisecond, None),
111                true,
112            )),
113            config_options: Arc::new(ConfigOptions::default()),
114        }
115    }
116
117    #[test]
118    fn test_proctime_returns_timestamp() {
119        let udf = ProcTimeUdf::new();
120        let result = udf.invoke_with_args(make_args()).unwrap();
121        match result {
122            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(v), _)) => {
123                // Should be a reasonable timestamp (after 2020-01-01)
124                assert!(v > 1_577_836_800_000, "timestamp too old: {v}");
125            }
126            other => panic!("Expected TimestampMillisecond, got: {other:?}"),
127        }
128    }
129
130    #[test]
131    fn test_proctime_registration() {
132        let udf = ScalarUDF::new_from_impl(ProcTimeUdf::new());
133        assert_eq!(udf.name(), "proctime");
134    }
135
136    #[test]
137    fn test_proctime_volatile() {
138        let udf = ProcTimeUdf::new();
139        assert_eq!(udf.signature().volatility, Volatility::Volatile);
140    }
141
142    #[test]
143    fn test_proctime_return_type() {
144        let udf = ProcTimeUdf::new();
145        let rt = udf.return_type(&[]).unwrap();
146        assert_eq!(rt, DataType::Timestamp(TimeUnit::Millisecond, None));
147    }
148}