Skip to main content

laminar_core/compiler/
event_time.rs

1//! Schema-aware event time extraction for compiled queries.
2//!
3//! [`RowEventTimeExtractor`] reads the event timestamp directly from an
4//! [`EventRow`] field, avoiding the overhead of columnar extraction used
5//! by the batch-level [`EventTimeExtractor`](crate::time::EventTimeExtractor).
6//!
7//! When no timestamp column is present (or the schema has no suitable field),
8//! the caller should fall back to using the row index as a monotonic surrogate.
9
10use super::row::{EventRow, FieldType, RowSchema};
11
12// ────────────────────────────── EventTimeConfig ─────────────────────────
13
14/// Configuration for event time extraction in compiled queries.
15#[derive(Debug, Clone, Default)]
16pub struct EventTimeConfig {
17    /// Explicit column name for the event timestamp.
18    /// When `None`, auto-detection scans for well-known names.
19    pub column: Option<String>,
20    /// Watermark delay in microseconds (subtracted from max observed timestamp).
21    pub watermark_delay_us: i64,
22}
23
24// ────────────────────────────── RowEventTimeExtractor ────────────────────
25
26/// Extracts event timestamps from [`EventRow`] fields.
27///
28/// Supports `Int64` and `TimestampMicros` field types. Tracks the maximum
29/// observed timestamp and computes a watermark with a configurable delay.
30///
31/// # Auto-Detection
32///
33/// When no explicit column is specified, [`from_schema`](Self::from_schema)
34/// scans the schema for:
35/// 1. A `TimestampMicros` field (first one found)
36/// 2. A field named `ts`, `event_time`, or `timestamp` (case-insensitive)
37///
38/// If neither heuristic matches, returns `None` — the caller should use
39/// a monotonic surrogate (row index).
40pub struct RowEventTimeExtractor {
41    /// Index of the timestamp field in the `RowSchema`.
42    field_idx: usize,
43    /// Type of the timestamp field.
44    field_type: FieldType,
45    /// Maximum observed timestamp.
46    max_timestamp: i64,
47    /// Configured watermark delay (microseconds).
48    delay: i64,
49}
50
51impl RowEventTimeExtractor {
52    /// Creates an extractor for a known field index and type.
53    ///
54    /// # Panics
55    ///
56    /// Debug-asserts that `field_type` is `Int64` or `TimestampMicros`.
57    #[must_use]
58    pub fn new(field_idx: usize, field_type: FieldType, delay: i64) -> Self {
59        debug_assert!(
60            field_type == FieldType::Int64 || field_type == FieldType::TimestampMicros,
61            "event time field must be Int64 or TimestampMicros, got {field_type:?}"
62        );
63        Self {
64            field_idx,
65            field_type,
66            max_timestamp: i64::MIN,
67            delay,
68        }
69    }
70
71    /// Attempts to create an extractor by scanning the schema.
72    ///
73    /// Uses the following strategy:
74    /// 1. If `config.column` is `Some(name)`, look up that column exactly.
75    /// 2. Otherwise, pick the first `TimestampMicros` field.
76    /// 3. Otherwise, pick a field named `ts`, `event_time`, or `timestamp`.
77    ///
78    /// Returns `None` if no suitable field is found.
79    #[must_use]
80    pub fn from_schema(schema: &RowSchema, config: &EventTimeConfig) -> Option<Self> {
81        let delay = config.watermark_delay_us;
82
83        // 1. Explicit column name.
84        if let Some(ref name) = config.column {
85            return Self::find_by_name(schema, name, delay);
86        }
87
88        // 2. First TimestampMicros field.
89        for (idx, layout) in schema.fields().iter().enumerate() {
90            if layout.field_type == FieldType::TimestampMicros {
91                return Some(Self::new(idx, FieldType::TimestampMicros, delay));
92            }
93        }
94
95        // 3. Well-known names.
96        for well_known in &["ts", "event_time", "timestamp"] {
97            if let Some(ext) = Self::find_by_name(schema, well_known, delay) {
98                return Some(ext);
99            }
100        }
101
102        None
103    }
104
105    /// Extracts the event time from a row and updates the max timestamp.
106    ///
107    /// Returns the raw timestamp value (microseconds or plain `i64`).
108    #[inline]
109    pub fn extract(&mut self, row: &EventRow<'_>) -> i64 {
110        if row.is_null(self.field_idx) {
111            return self.max_timestamp;
112        }
113
114        let ts = match self.field_type {
115            FieldType::Int64 | FieldType::TimestampMicros => row.get_i64(self.field_idx),
116            _ => unreachable!("validated in constructor"),
117        };
118
119        if ts > self.max_timestamp {
120            self.max_timestamp = ts;
121        }
122        ts
123    }
124
125    /// Returns the current watermark: `max_timestamp - delay`.
126    ///
127    /// Returns `i64::MIN` if no timestamps have been observed.
128    #[inline]
129    #[must_use]
130    pub fn watermark(&self) -> i64 {
131        self.max_timestamp.saturating_sub(self.delay)
132    }
133
134    /// Returns the maximum observed timestamp.
135    #[inline]
136    #[must_use]
137    pub fn max_timestamp(&self) -> i64 {
138        self.max_timestamp
139    }
140
141    /// Returns the field index used for extraction.
142    #[inline]
143    #[must_use]
144    pub fn field_idx(&self) -> usize {
145        self.field_idx
146    }
147
148    /// Returns the field type used for extraction.
149    #[inline]
150    #[must_use]
151    pub fn field_type(&self) -> FieldType {
152        self.field_type
153    }
154
155    // ── internal ─────────────────────────────────────────────────
156
157    fn find_by_name(schema: &RowSchema, name: &str, delay: i64) -> Option<Self> {
158        let arrow_schema = schema.arrow_schema();
159        let lower = name.to_ascii_lowercase();
160        for (idx, field) in arrow_schema.fields().iter().enumerate() {
161            if field.name().to_ascii_lowercase() == lower {
162                let ft = schema.fields()[idx].field_type;
163                if ft == FieldType::Int64 || ft == FieldType::TimestampMicros {
164                    return Some(Self::new(idx, ft, delay));
165                }
166            }
167        }
168        None
169    }
170}
171
172impl std::fmt::Debug for RowEventTimeExtractor {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct("RowEventTimeExtractor")
175            .field("field_idx", &self.field_idx)
176            .field("field_type", &self.field_type)
177            .field("max_timestamp", &self.max_timestamp)
178            .field("delay", &self.delay)
179            .finish()
180    }
181}
182
183#[cfg(test)]
184#[allow(clippy::approx_constant)]
185mod tests {
186    use super::*;
187    use crate::compiler::row::{MutableEventRow, RowSchema};
188    use arrow_schema::{DataType, Field, Schema, TimeUnit};
189    use bumpalo::Bump;
190    use std::sync::Arc;
191
192    fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
193        Arc::new(Schema::new(
194            fields
195                .into_iter()
196                .map(|(name, dt)| Field::new(name, dt, true))
197                .collect::<Vec<_>>(),
198        ))
199    }
200
201    // ── Constructor tests ────────────────────────────────────────
202
203    #[test]
204    fn new_with_int64() {
205        let ext = RowEventTimeExtractor::new(0, FieldType::Int64, 1000);
206        assert_eq!(ext.field_idx(), 0);
207        assert_eq!(ext.field_type(), FieldType::Int64);
208        assert_eq!(ext.max_timestamp(), i64::MIN);
209        assert_eq!(ext.watermark(), i64::MIN); // MIN - 1000 saturates
210    }
211
212    #[test]
213    fn new_with_timestamp_micros() {
214        let ext = RowEventTimeExtractor::new(1, FieldType::TimestampMicros, 0);
215        assert_eq!(ext.field_idx(), 1);
216        assert_eq!(ext.field_type(), FieldType::TimestampMicros);
217    }
218
219    // ── Auto-detection tests ────────────────────────────────────
220
221    #[test]
222    fn auto_detect_timestamp_micros() {
223        let arrow = make_schema(vec![
224            ("x", DataType::Int64),
225            (
226                "created_at",
227                DataType::Timestamp(TimeUnit::Microsecond, None),
228            ),
229        ]);
230        let rs = RowSchema::from_arrow(&arrow).unwrap();
231        let config = EventTimeConfig::default();
232        let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
233        assert_eq!(ext.field_idx(), 1);
234        assert_eq!(ext.field_type(), FieldType::TimestampMicros);
235    }
236
237    #[test]
238    fn auto_detect_well_known_name_ts() {
239        let arrow = make_schema(vec![("value", DataType::Float64), ("ts", DataType::Int64)]);
240        let rs = RowSchema::from_arrow(&arrow).unwrap();
241        let config = EventTimeConfig::default();
242        let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
243        assert_eq!(ext.field_idx(), 1);
244        assert_eq!(ext.field_type(), FieldType::Int64);
245    }
246
247    #[test]
248    fn auto_detect_well_known_name_event_time() {
249        let arrow = make_schema(vec![
250            ("event_time", DataType::Int64),
251            ("val", DataType::Float64),
252        ]);
253        let rs = RowSchema::from_arrow(&arrow).unwrap();
254        let config = EventTimeConfig::default();
255        let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
256        assert_eq!(ext.field_idx(), 0);
257    }
258
259    #[test]
260    fn auto_detect_well_known_name_timestamp() {
261        let arrow = make_schema(vec![
262            ("id", DataType::Int64),
263            ("timestamp", DataType::Int64),
264        ]);
265        let rs = RowSchema::from_arrow(&arrow).unwrap();
266        let config = EventTimeConfig::default();
267        let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
268        assert_eq!(ext.field_idx(), 1);
269    }
270
271    #[test]
272    fn auto_detect_case_insensitive() {
273        let arrow = make_schema(vec![("id", DataType::Float64), ("TS", DataType::Int64)]);
274        let rs = RowSchema::from_arrow(&arrow).unwrap();
275        let config = EventTimeConfig::default();
276        let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
277        assert_eq!(ext.field_idx(), 1);
278    }
279
280    #[test]
281    fn auto_detect_none_when_no_match() {
282        let arrow = make_schema(vec![("x", DataType::Float64), ("y", DataType::Float64)]);
283        let rs = RowSchema::from_arrow(&arrow).unwrap();
284        let config = EventTimeConfig::default();
285        assert!(RowEventTimeExtractor::from_schema(&rs, &config).is_none());
286    }
287
288    #[test]
289    fn explicit_column_name() {
290        let arrow = make_schema(vec![("x", DataType::Int64), ("my_ts", DataType::Int64)]);
291        let rs = RowSchema::from_arrow(&arrow).unwrap();
292        let config = EventTimeConfig {
293            column: Some("my_ts".to_string()),
294            watermark_delay_us: 5000,
295        };
296        let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
297        assert_eq!(ext.field_idx(), 1);
298        assert_eq!(ext.delay, 5000);
299    }
300
301    #[test]
302    fn explicit_column_not_found() {
303        let arrow = make_schema(vec![("x", DataType::Int64)]);
304        let rs = RowSchema::from_arrow(&arrow).unwrap();
305        let config = EventTimeConfig {
306            column: Some("nonexistent".to_string()),
307            watermark_delay_us: 0,
308        };
309        assert!(RowEventTimeExtractor::from_schema(&rs, &config).is_none());
310    }
311
312    #[test]
313    fn explicit_column_wrong_type() {
314        let arrow = make_schema(vec![("label", DataType::Utf8)]);
315        let rs = RowSchema::from_arrow(&arrow).unwrap();
316        let config = EventTimeConfig {
317            column: Some("label".to_string()),
318            watermark_delay_us: 0,
319        };
320        assert!(RowEventTimeExtractor::from_schema(&rs, &config).is_none());
321    }
322
323    // ── Extraction tests ─────────────────────────────────────────
324
325    #[test]
326    fn extract_from_int64() {
327        let arrow = make_schema(vec![("ts", DataType::Int64), ("val", DataType::Float64)]);
328        let rs = RowSchema::from_arrow(&arrow).unwrap();
329        let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 0);
330
331        let arena = Bump::new();
332        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
333        row.set_i64(0, 42_000);
334        row.set_f64(1, 1.0);
335        let row = row.freeze();
336
337        assert_eq!(ext.extract(&row), 42_000);
338        assert_eq!(ext.max_timestamp(), 42_000);
339    }
340
341    #[test]
342    fn extract_from_timestamp_micros() {
343        let arrow = make_schema(vec![
344            ("x", DataType::Float64),
345            ("ts", DataType::Timestamp(TimeUnit::Microsecond, None)),
346        ]);
347        let rs = RowSchema::from_arrow(&arrow).unwrap();
348        let mut ext = RowEventTimeExtractor::new(1, FieldType::TimestampMicros, 0);
349
350        let arena = Bump::new();
351        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
352        row.set_f64(0, 3.14);
353        row.set_i64(1, 1_000_000);
354        let row = row.freeze();
355
356        assert_eq!(ext.extract(&row), 1_000_000);
357    }
358
359    #[test]
360    fn extract_null_returns_max() {
361        let arrow = make_schema(vec![("ts", DataType::Int64)]);
362        let rs = RowSchema::from_arrow(&arrow).unwrap();
363        let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 0);
364
365        // First: set a non-null value.
366        let arena = Bump::new();
367        let mut row1 = MutableEventRow::new_in(&arena, &rs, 0);
368        row1.set_i64(0, 100);
369        let row1 = row1.freeze();
370        ext.extract(&row1);
371
372        // Second: null field — should return current max (100).
373        let mut row2 = MutableEventRow::new_in(&arena, &rs, 0);
374        row2.set_null(0, true);
375        let row2 = row2.freeze();
376        assert_eq!(ext.extract(&row2), 100);
377    }
378
379    // ── Watermark tests ──────────────────────────────────────────
380
381    #[test]
382    fn watermark_monotonic_advancement() {
383        let arrow = make_schema(vec![("ts", DataType::Int64)]);
384        let rs = RowSchema::from_arrow(&arrow).unwrap();
385        let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 1000);
386
387        let arena = Bump::new();
388        let timestamps = [5000_i64, 3000, 7000, 6000, 10_000];
389        for &ts in &timestamps {
390            let mut row = MutableEventRow::new_in(&arena, &rs, 0);
391            row.set_i64(0, ts);
392            let row = row.freeze();
393            ext.extract(&row);
394        }
395
396        // max = 10_000, delay = 1000 → watermark = 9000
397        assert_eq!(ext.max_timestamp(), 10_000);
398        assert_eq!(ext.watermark(), 9_000);
399    }
400
401    #[test]
402    fn watermark_delay_applied() {
403        let arrow = make_schema(vec![("ts", DataType::Int64)]);
404        let rs = RowSchema::from_arrow(&arrow).unwrap();
405        let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 5000);
406
407        let arena = Bump::new();
408        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
409        row.set_i64(0, 10_000);
410        let row = row.freeze();
411        ext.extract(&row);
412
413        assert_eq!(ext.watermark(), 5_000);
414    }
415
416    #[test]
417    fn watermark_saturates_at_min() {
418        let ext = RowEventTimeExtractor::new(0, FieldType::Int64, 1000);
419        // No rows observed → max = i64::MIN → watermark saturates at i64::MIN
420        assert_eq!(ext.watermark(), i64::MIN);
421    }
422
423    // ── Debug ────────────────────────────────────────────────────
424
425    #[test]
426    fn debug_format() {
427        let ext = RowEventTimeExtractor::new(0, FieldType::Int64, 100);
428        let s = format!("{ext:?}");
429        assert!(s.contains("RowEventTimeExtractor"));
430        assert!(s.contains("field_idx"));
431    }
432}