1use super::row::{EventRow, FieldType, RowSchema};
11
12#[derive(Debug, Clone, Default)]
16pub struct EventTimeConfig {
17 pub column: Option<String>,
20 pub watermark_delay_us: i64,
22}
23
24pub struct RowEventTimeExtractor {
41 field_idx: usize,
43 field_type: FieldType,
45 max_timestamp: i64,
47 delay: i64,
49}
50
51impl RowEventTimeExtractor {
52 #[must_use]
58 pub fn new(field_idx: usize, field_type: FieldType, delay: i64) -> Self {
59 debug_assert!(
60 field_type == FieldType::Int64 || field_type == FieldType::TimestampMicros,
61 "event time field must be Int64 or TimestampMicros, got {field_type:?}"
62 );
63 Self {
64 field_idx,
65 field_type,
66 max_timestamp: i64::MIN,
67 delay,
68 }
69 }
70
71 #[must_use]
80 pub fn from_schema(schema: &RowSchema, config: &EventTimeConfig) -> Option<Self> {
81 let delay = config.watermark_delay_us;
82
83 if let Some(ref name) = config.column {
85 return Self::find_by_name(schema, name, delay);
86 }
87
88 for (idx, layout) in schema.fields().iter().enumerate() {
90 if layout.field_type == FieldType::TimestampMicros {
91 return Some(Self::new(idx, FieldType::TimestampMicros, delay));
92 }
93 }
94
95 for well_known in &["ts", "event_time", "timestamp"] {
97 if let Some(ext) = Self::find_by_name(schema, well_known, delay) {
98 return Some(ext);
99 }
100 }
101
102 None
103 }
104
105 #[inline]
109 pub fn extract(&mut self, row: &EventRow<'_>) -> i64 {
110 if row.is_null(self.field_idx) {
111 return self.max_timestamp;
112 }
113
114 let ts = match self.field_type {
115 FieldType::Int64 | FieldType::TimestampMicros => row.get_i64(self.field_idx),
116 _ => unreachable!("validated in constructor"),
117 };
118
119 if ts > self.max_timestamp {
120 self.max_timestamp = ts;
121 }
122 ts
123 }
124
125 #[inline]
129 #[must_use]
130 pub fn watermark(&self) -> i64 {
131 self.max_timestamp.saturating_sub(self.delay)
132 }
133
134 #[inline]
136 #[must_use]
137 pub fn max_timestamp(&self) -> i64 {
138 self.max_timestamp
139 }
140
141 #[inline]
143 #[must_use]
144 pub fn field_idx(&self) -> usize {
145 self.field_idx
146 }
147
148 #[inline]
150 #[must_use]
151 pub fn field_type(&self) -> FieldType {
152 self.field_type
153 }
154
155 fn find_by_name(schema: &RowSchema, name: &str, delay: i64) -> Option<Self> {
158 let arrow_schema = schema.arrow_schema();
159 let lower = name.to_ascii_lowercase();
160 for (idx, field) in arrow_schema.fields().iter().enumerate() {
161 if field.name().to_ascii_lowercase() == lower {
162 let ft = schema.fields()[idx].field_type;
163 if ft == FieldType::Int64 || ft == FieldType::TimestampMicros {
164 return Some(Self::new(idx, ft, delay));
165 }
166 }
167 }
168 None
169 }
170}
171
172impl std::fmt::Debug for RowEventTimeExtractor {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("RowEventTimeExtractor")
175 .field("field_idx", &self.field_idx)
176 .field("field_type", &self.field_type)
177 .field("max_timestamp", &self.max_timestamp)
178 .field("delay", &self.delay)
179 .finish()
180 }
181}
182
183#[cfg(test)]
184#[allow(clippy::approx_constant)]
185mod tests {
186 use super::*;
187 use crate::compiler::row::{MutableEventRow, RowSchema};
188 use arrow_schema::{DataType, Field, Schema, TimeUnit};
189 use bumpalo::Bump;
190 use std::sync::Arc;
191
192 fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
193 Arc::new(Schema::new(
194 fields
195 .into_iter()
196 .map(|(name, dt)| Field::new(name, dt, true))
197 .collect::<Vec<_>>(),
198 ))
199 }
200
201 #[test]
204 fn new_with_int64() {
205 let ext = RowEventTimeExtractor::new(0, FieldType::Int64, 1000);
206 assert_eq!(ext.field_idx(), 0);
207 assert_eq!(ext.field_type(), FieldType::Int64);
208 assert_eq!(ext.max_timestamp(), i64::MIN);
209 assert_eq!(ext.watermark(), i64::MIN); }
211
212 #[test]
213 fn new_with_timestamp_micros() {
214 let ext = RowEventTimeExtractor::new(1, FieldType::TimestampMicros, 0);
215 assert_eq!(ext.field_idx(), 1);
216 assert_eq!(ext.field_type(), FieldType::TimestampMicros);
217 }
218
219 #[test]
222 fn auto_detect_timestamp_micros() {
223 let arrow = make_schema(vec![
224 ("x", DataType::Int64),
225 (
226 "created_at",
227 DataType::Timestamp(TimeUnit::Microsecond, None),
228 ),
229 ]);
230 let rs = RowSchema::from_arrow(&arrow).unwrap();
231 let config = EventTimeConfig::default();
232 let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
233 assert_eq!(ext.field_idx(), 1);
234 assert_eq!(ext.field_type(), FieldType::TimestampMicros);
235 }
236
237 #[test]
238 fn auto_detect_well_known_name_ts() {
239 let arrow = make_schema(vec![("value", DataType::Float64), ("ts", DataType::Int64)]);
240 let rs = RowSchema::from_arrow(&arrow).unwrap();
241 let config = EventTimeConfig::default();
242 let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
243 assert_eq!(ext.field_idx(), 1);
244 assert_eq!(ext.field_type(), FieldType::Int64);
245 }
246
247 #[test]
248 fn auto_detect_well_known_name_event_time() {
249 let arrow = make_schema(vec![
250 ("event_time", DataType::Int64),
251 ("val", DataType::Float64),
252 ]);
253 let rs = RowSchema::from_arrow(&arrow).unwrap();
254 let config = EventTimeConfig::default();
255 let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
256 assert_eq!(ext.field_idx(), 0);
257 }
258
259 #[test]
260 fn auto_detect_well_known_name_timestamp() {
261 let arrow = make_schema(vec![
262 ("id", DataType::Int64),
263 ("timestamp", DataType::Int64),
264 ]);
265 let rs = RowSchema::from_arrow(&arrow).unwrap();
266 let config = EventTimeConfig::default();
267 let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
268 assert_eq!(ext.field_idx(), 1);
269 }
270
271 #[test]
272 fn auto_detect_case_insensitive() {
273 let arrow = make_schema(vec![("id", DataType::Float64), ("TS", DataType::Int64)]);
274 let rs = RowSchema::from_arrow(&arrow).unwrap();
275 let config = EventTimeConfig::default();
276 let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
277 assert_eq!(ext.field_idx(), 1);
278 }
279
280 #[test]
281 fn auto_detect_none_when_no_match() {
282 let arrow = make_schema(vec![("x", DataType::Float64), ("y", DataType::Float64)]);
283 let rs = RowSchema::from_arrow(&arrow).unwrap();
284 let config = EventTimeConfig::default();
285 assert!(RowEventTimeExtractor::from_schema(&rs, &config).is_none());
286 }
287
288 #[test]
289 fn explicit_column_name() {
290 let arrow = make_schema(vec![("x", DataType::Int64), ("my_ts", DataType::Int64)]);
291 let rs = RowSchema::from_arrow(&arrow).unwrap();
292 let config = EventTimeConfig {
293 column: Some("my_ts".to_string()),
294 watermark_delay_us: 5000,
295 };
296 let ext = RowEventTimeExtractor::from_schema(&rs, &config).unwrap();
297 assert_eq!(ext.field_idx(), 1);
298 assert_eq!(ext.delay, 5000);
299 }
300
301 #[test]
302 fn explicit_column_not_found() {
303 let arrow = make_schema(vec![("x", DataType::Int64)]);
304 let rs = RowSchema::from_arrow(&arrow).unwrap();
305 let config = EventTimeConfig {
306 column: Some("nonexistent".to_string()),
307 watermark_delay_us: 0,
308 };
309 assert!(RowEventTimeExtractor::from_schema(&rs, &config).is_none());
310 }
311
312 #[test]
313 fn explicit_column_wrong_type() {
314 let arrow = make_schema(vec![("label", DataType::Utf8)]);
315 let rs = RowSchema::from_arrow(&arrow).unwrap();
316 let config = EventTimeConfig {
317 column: Some("label".to_string()),
318 watermark_delay_us: 0,
319 };
320 assert!(RowEventTimeExtractor::from_schema(&rs, &config).is_none());
321 }
322
323 #[test]
326 fn extract_from_int64() {
327 let arrow = make_schema(vec![("ts", DataType::Int64), ("val", DataType::Float64)]);
328 let rs = RowSchema::from_arrow(&arrow).unwrap();
329 let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 0);
330
331 let arena = Bump::new();
332 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
333 row.set_i64(0, 42_000);
334 row.set_f64(1, 1.0);
335 let row = row.freeze();
336
337 assert_eq!(ext.extract(&row), 42_000);
338 assert_eq!(ext.max_timestamp(), 42_000);
339 }
340
341 #[test]
342 fn extract_from_timestamp_micros() {
343 let arrow = make_schema(vec![
344 ("x", DataType::Float64),
345 ("ts", DataType::Timestamp(TimeUnit::Microsecond, None)),
346 ]);
347 let rs = RowSchema::from_arrow(&arrow).unwrap();
348 let mut ext = RowEventTimeExtractor::new(1, FieldType::TimestampMicros, 0);
349
350 let arena = Bump::new();
351 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
352 row.set_f64(0, 3.14);
353 row.set_i64(1, 1_000_000);
354 let row = row.freeze();
355
356 assert_eq!(ext.extract(&row), 1_000_000);
357 }
358
359 #[test]
360 fn extract_null_returns_max() {
361 let arrow = make_schema(vec![("ts", DataType::Int64)]);
362 let rs = RowSchema::from_arrow(&arrow).unwrap();
363 let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 0);
364
365 let arena = Bump::new();
367 let mut row1 = MutableEventRow::new_in(&arena, &rs, 0);
368 row1.set_i64(0, 100);
369 let row1 = row1.freeze();
370 ext.extract(&row1);
371
372 let mut row2 = MutableEventRow::new_in(&arena, &rs, 0);
374 row2.set_null(0, true);
375 let row2 = row2.freeze();
376 assert_eq!(ext.extract(&row2), 100);
377 }
378
379 #[test]
382 fn watermark_monotonic_advancement() {
383 let arrow = make_schema(vec![("ts", DataType::Int64)]);
384 let rs = RowSchema::from_arrow(&arrow).unwrap();
385 let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 1000);
386
387 let arena = Bump::new();
388 let timestamps = [5000_i64, 3000, 7000, 6000, 10_000];
389 for &ts in ×tamps {
390 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
391 row.set_i64(0, ts);
392 let row = row.freeze();
393 ext.extract(&row);
394 }
395
396 assert_eq!(ext.max_timestamp(), 10_000);
398 assert_eq!(ext.watermark(), 9_000);
399 }
400
401 #[test]
402 fn watermark_delay_applied() {
403 let arrow = make_schema(vec![("ts", DataType::Int64)]);
404 let rs = RowSchema::from_arrow(&arrow).unwrap();
405 let mut ext = RowEventTimeExtractor::new(0, FieldType::Int64, 5000);
406
407 let arena = Bump::new();
408 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
409 row.set_i64(0, 10_000);
410 let row = row.freeze();
411 ext.extract(&row);
412
413 assert_eq!(ext.watermark(), 5_000);
414 }
415
416 #[test]
417 fn watermark_saturates_at_min() {
418 let ext = RowEventTimeExtractor::new(0, FieldType::Int64, 1000);
419 assert_eq!(ext.watermark(), i64::MIN);
421 }
422
423 #[test]
426 fn debug_format() {
427 let ext = RowEventTimeExtractor::new(0, FieldType::Int64, 100);
428 let s = format!("{ext:?}");
429 assert!(s.contains("RowEventTimeExtractor"));
430 assert!(s.contains("field_idx"));
431 }
432}