Skip to main content

laminar_core/operator/
asof_join.rs

1//! # ASOF Join Operators
2//!
3//! Implementation of temporal proximity joins that match events based on
4//! closest timestamp rather than exact equality.
5//!
6//! ASOF joins are essential for financial and time-series applications where
7//! you need to enrich events with the most recent prior data (e.g., enriching
8//! trades with the most recent quote).
9//!
10//! ## Join Directions
11//!
12//! - **Backward**: Match with the most recent prior event (default for finance)
13//! - **Forward**: Match with the next future event
14//! - **Nearest**: Match with the closest event by absolute time difference
15//!
16//! ## Example
17//!
18//! ```rust,no_run
19//! use laminar_core::operator::asof_join::{
20//!     AsofJoinOperator, AsofJoinConfig, AsofDirection, AsofJoinType,
21//! };
22//! use std::time::Duration;
23//!
24//! // Join trades with the most recent quote within 5 seconds
25//! let config = AsofJoinConfig {
26//!     key_column: "symbol".to_string(),
27//!     left_time_column: "trade_time".to_string(),
28//!     right_time_column: "quote_time".to_string(),
29//!     direction: AsofDirection::Backward,
30//!     tolerance: Some(Duration::from_secs(5)),
31//!     join_type: AsofJoinType::Inner,
32//!     operator_id: Some("trade_quote_join".to_string()),
33//! };
34//!
35//! let operator = AsofJoinOperator::new(config);
36//! ```
37//!
38//! ## SQL Syntax (Future)
39//!
40//! ```sql
41//! SELECT t.*, q.bid, q.ask
42//! FROM trades t
43//! ASOF JOIN quotes q
44//!     ON t.symbol = q.symbol
45//!     AND t.trade_time >= q.quote_time
46//!     AND t.trade_time - q.quote_time <= INTERVAL '5' SECOND;
47//! ```
48//!
49//! ## State Management
50//!
51//! Right-side events are stored in per-key `BTreeMap` structures for O(log n)
52//! temporal lookups. State is cleaned up based on watermark progress.
53
54use super::{
55    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
56    TimerKey,
57};
58use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
59use arrow_schema::{DataType, Field, Schema, SchemaRef};
60use rkyv::{
61    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
62};
63use rustc_hash::FxHashMap;
64use smallvec::SmallVec;
65use std::collections::BTreeMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67use std::sync::Arc;
68use std::time::Duration;
69
70/// Direction for ASOF matching.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum AsofDirection {
73    /// Match with the most recent prior event (timestamp <= left timestamp).
74    /// This is the default and most common for financial applications.
75    #[default]
76    Backward,
77    /// Match with the next future event (timestamp >= left timestamp).
78    Forward,
79    /// Match with the closest event by absolute time difference.
80    Nearest,
81}
82
83/// Type of ASOF join to perform.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum AsofJoinType {
86    /// Inner join - only emit when a match is found.
87    #[default]
88    Inner,
89    /// Left outer join - emit all left events, with nulls for unmatched.
90    Left,
91}
92
93impl AsofJoinType {
94    /// Returns true if unmatched left events should be emitted.
95    #[must_use]
96    pub fn emits_unmatched(&self) -> bool {
97        matches!(self, AsofJoinType::Left)
98    }
99}
100
101/// Configuration for an ASOF join operator.
102#[derive(Debug, Clone)]
103pub struct AsofJoinConfig {
104    /// Column name used as the join key (must match in both streams).
105    pub key_column: String,
106    /// Column name for the timestamp in left stream events.
107    pub left_time_column: String,
108    /// Column name for the timestamp in right stream events.
109    pub right_time_column: String,
110    /// Direction for temporal matching.
111    pub direction: AsofDirection,
112    /// Maximum time difference for matching (None = unlimited).
113    pub tolerance: Option<Duration>,
114    /// Type of join to perform.
115    pub join_type: AsofJoinType,
116    /// Operator ID for checkpointing.
117    pub operator_id: Option<String>,
118}
119
120impl AsofJoinConfig {
121    /// Creates a new builder for ASOF join configuration.
122    #[must_use]
123    pub fn builder() -> AsofJoinConfigBuilder {
124        AsofJoinConfigBuilder::default()
125    }
126
127    /// Returns the tolerance in milliseconds, or `i64::MAX` if unlimited.
128    #[must_use]
129    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
130    pub fn tolerance_ms(&self) -> i64 {
131        self.tolerance.map_or(i64::MAX, |d| d.as_millis() as i64)
132    }
133}
134
135/// Builder for [`AsofJoinConfig`].
136#[derive(Debug, Default)]
137pub struct AsofJoinConfigBuilder {
138    key_column: Option<String>,
139    left_time_column: Option<String>,
140    right_time_column: Option<String>,
141    direction: Option<AsofDirection>,
142    tolerance: Option<Duration>,
143    join_type: Option<AsofJoinType>,
144    operator_id: Option<String>,
145}
146
147impl AsofJoinConfigBuilder {
148    /// Sets the join key column name.
149    #[must_use]
150    pub fn key_column(mut self, column: String) -> Self {
151        self.key_column = Some(column);
152        self
153    }
154
155    /// Sets the left timestamp column name.
156    #[must_use]
157    pub fn left_time_column(mut self, column: String) -> Self {
158        self.left_time_column = Some(column);
159        self
160    }
161
162    /// Sets the right timestamp column name.
163    #[must_use]
164    pub fn right_time_column(mut self, column: String) -> Self {
165        self.right_time_column = Some(column);
166        self
167    }
168
169    /// Sets the ASOF direction.
170    #[must_use]
171    pub fn direction(mut self, direction: AsofDirection) -> Self {
172        self.direction = Some(direction);
173        self
174    }
175
176    /// Sets the tolerance for matching.
177    #[must_use]
178    pub fn tolerance(mut self, tolerance: Duration) -> Self {
179        self.tolerance = Some(tolerance);
180        self
181    }
182
183    /// Sets the join type.
184    #[must_use]
185    pub fn join_type(mut self, join_type: AsofJoinType) -> Self {
186        self.join_type = Some(join_type);
187        self
188    }
189
190    /// Sets a custom operator ID.
191    #[must_use]
192    pub fn operator_id(mut self, id: String) -> Self {
193        self.operator_id = Some(id);
194        self
195    }
196
197    /// Builds the configuration.
198    ///
199    /// # Errors
200    ///
201    /// Returns `OperatorError::ConfigError` if required fields
202    /// (`key_column`, `left_time_column`, `right_time_column`) are not set.
203    pub fn build(self) -> Result<AsofJoinConfig, OperatorError> {
204        Ok(AsofJoinConfig {
205            key_column: self
206                .key_column
207                .ok_or_else(|| OperatorError::ConfigError("key_column is required".into()))?,
208            left_time_column: self
209                .left_time_column
210                .ok_or_else(|| OperatorError::ConfigError("left_time_column is required".into()))?,
211            right_time_column: self.right_time_column.ok_or_else(|| {
212                OperatorError::ConfigError("right_time_column is required".into())
213            })?,
214            direction: self.direction.unwrap_or_default(),
215            tolerance: self.tolerance,
216            join_type: self.join_type.unwrap_or_default(),
217            operator_id: self.operator_id,
218        })
219    }
220}
221
222/// Timer key prefix for state cleanup.
223const ASOF_TIMER_PREFIX: u8 = 0x50;
224
225/// Static counter for generating unique operator IDs.
226static ASOF_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
227
228/// Stack-allocated key buffer for join keys.
229/// 24 bytes covers most string symbol names and all numeric keys.
230type AsofKey = SmallVec<[u8; 24]>;
231
232/// A stored right-side event for ASOF matching.
233///
234/// Stores `Arc<RecordBatch>` directly for zero-copy access on the hot path.
235/// IPC serialization is only used during checkpoint/restore (Ring 1).
236#[derive(Debug, Clone)]
237pub struct AsofRow {
238    /// Event timestamp in milliseconds.
239    pub timestamp: i64,
240    /// The record batch data (zero-copy via Arc).
241    batch: Arc<RecordBatch>,
242}
243
244impl AsofRow {
245    /// Creates a new ASOF row from an event.
246    ///
247    /// Cost: O(1) — just an atomic increment on the Arc.
248    fn new(timestamp: i64, batch: &Arc<RecordBatch>) -> Self {
249        Self {
250            timestamp,
251            batch: Arc::clone(batch),
252        }
253    }
254
255    /// Returns a reference to the record batch.
256    #[must_use]
257    pub fn batch(&self) -> &RecordBatch {
258        &self.batch
259    }
260}
261
262/// Serializable version of [`AsofRow`] for checkpointing.
263/// IPC serialization lives here, not on the hot path.
264#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
265struct SerializableAsofRow {
266    timestamp: i64,
267    data: Vec<u8>,
268}
269
270impl SerializableAsofRow {
271    fn from_row(row: &AsofRow) -> Result<Self, OperatorError> {
272        let data = crate::serialization::serialize_batch_stream(&row.batch)?;
273        Ok(Self {
274            timestamp: row.timestamp,
275            data,
276        })
277    }
278
279    fn to_row(&self) -> Result<AsofRow, OperatorError> {
280        let batch = crate::serialization::deserialize_batch_stream(&self.data)?;
281        Ok(AsofRow {
282            timestamp: self.timestamp,
283            batch: Arc::new(batch),
284        })
285    }
286}
287
288/// Per-key state for ASOF joining.
289///
290/// Uses `BTreeMap` for O(log n) range queries on timestamps.
291#[derive(Debug, Clone, Default)]
292pub struct KeyState {
293    /// Events indexed by timestamp for efficient range queries.
294    /// Multiple events at the same timestamp are stored in a vector.
295    pub events: BTreeMap<i64, SmallVec<[AsofRow; 1]>>,
296    /// Minimum timestamp in this key's state.
297    pub min_timestamp: i64,
298    /// Maximum timestamp in this key's state.
299    pub max_timestamp: i64,
300}
301
302impl KeyState {
303    /// Creates a new empty key state.
304    #[must_use]
305    pub fn new() -> Self {
306        Self {
307            events: BTreeMap::new(),
308            min_timestamp: i64::MAX,
309            max_timestamp: i64::MIN,
310        }
311    }
312
313    /// Inserts an event into the state.
314    pub fn insert(&mut self, row: AsofRow) {
315        let ts = row.timestamp;
316        self.events.entry(ts).or_default().push(row);
317        self.min_timestamp = self.min_timestamp.min(ts);
318        self.max_timestamp = self.max_timestamp.max(ts);
319    }
320
321    /// Returns the number of events in this key's state.
322    #[must_use]
323    pub fn len(&self) -> usize {
324        self.events.values().map(SmallVec::len).sum()
325    }
326
327    /// Returns true if this key has no events.
328    #[must_use]
329    pub fn is_empty(&self) -> bool {
330        self.events.is_empty()
331    }
332
333    /// Removes events with timestamps before the given threshold.
334    pub fn cleanup_before(&mut self, threshold: i64) {
335        self.events = self.events.split_off(&threshold);
336        self.min_timestamp = self.events.keys().next().copied().unwrap_or(i64::MAX);
337    }
338}
339
340/// Serializable version of `KeyState` for checkpointing.
341#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
342struct SerializableKeyState {
343    events: Vec<(i64, Vec<SerializableAsofRow>)>,
344    min_timestamp: i64,
345    max_timestamp: i64,
346}
347
348impl SerializableKeyState {
349    /// Converts a `KeyState` to serializable form for checkpointing.
350    fn from_key_state(state: &KeyState) -> Result<Self, OperatorError> {
351        let events = state
352            .events
353            .iter()
354            .map(|(ts, rows)| {
355                let ser_rows: Result<Vec<_>, _> =
356                    rows.iter().map(SerializableAsofRow::from_row).collect();
357                ser_rows.map(|r| (*ts, r))
358            })
359            .collect::<Result<Vec<_>, _>>()?;
360        Ok(Self {
361            events,
362            min_timestamp: state.min_timestamp,
363            max_timestamp: state.max_timestamp,
364        })
365    }
366
367    /// Converts serialized form back to a `KeyState` during restore.
368    fn to_key_state(&self) -> Result<KeyState, OperatorError> {
369        let mut events = BTreeMap::new();
370        for (ts, rows) in &self.events {
371            let asof_rows: Result<SmallVec<[AsofRow; 1]>, _> =
372                rows.iter().map(SerializableAsofRow::to_row).collect();
373            events.insert(*ts, asof_rows?);
374        }
375        Ok(KeyState {
376            events,
377            min_timestamp: self.min_timestamp,
378            max_timestamp: self.max_timestamp,
379        })
380    }
381}
382
383/// Metrics for tracking ASOF join operations.
384#[derive(Debug, Clone, Default)]
385pub struct AsofJoinMetrics {
386    /// Number of left events processed.
387    pub left_events: u64,
388    /// Number of right events processed.
389    pub right_events: u64,
390    /// Number of matches found.
391    pub matches: u64,
392    /// Number of unmatched left events (for left join).
393    pub unmatched_left: u64,
394    /// Number of matches within tolerance.
395    pub within_tolerance: u64,
396    /// Number of matches rejected due to tolerance.
397    pub outside_tolerance: u64,
398    /// Number of late events dropped.
399    pub late_events: u64,
400    /// Number of state cleanup operations.
401    pub state_cleanups: u64,
402}
403
404impl AsofJoinMetrics {
405    /// Creates new metrics.
406    #[must_use]
407    pub fn new() -> Self {
408        Self::default()
409    }
410
411    /// Resets all counters.
412    pub fn reset(&mut self) {
413        *self = Self::default();
414    }
415}
416
417/// ASOF join operator.
418///
419/// Performs temporal proximity joins between two event streams. Left events
420/// probe the right-side state for the closest matching timestamp.
421///
422/// # State Management
423///
424/// Right-side events are stored in memory in per-key `BTreeMap` structures.
425/// State is persisted to the state store on checkpoint and cleaned up
426/// based on watermark progress.
427///
428/// # Performance Characteristics
429///
430/// - Matching: O(log n) per left event (`BTreeMap` range query)
431/// - State size: Bounded by right-side events within tolerance + watermark lag
432/// - Memory: Linear in right-side event count per key
433pub struct AsofJoinOperator {
434    /// Configuration.
435    config: AsofJoinConfig,
436    /// Operator ID.
437    operator_id: String,
438    /// Per-key right-side state.
439    right_state: FxHashMap<AsofKey, KeyState>,
440    /// Current watermark.
441    watermark: i64,
442    /// Metrics.
443    metrics: AsofJoinMetrics,
444    /// Output schema (lazily initialized).
445    output_schema: Option<SchemaRef>,
446    /// Left schema (captured from first left event).
447    left_schema: Option<SchemaRef>,
448    /// Right schema (captured from first right event).
449    right_schema: Option<SchemaRef>,
450    /// Cached column index for join key in left schema.
451    left_key_index: Option<usize>,
452    /// Cached column index for join key in right schema.
453    right_key_index: Option<usize>,
454}
455
456impl AsofJoinOperator {
457    /// Creates a new ASOF join operator.
458    #[must_use]
459    pub fn new(config: AsofJoinConfig) -> Self {
460        let operator_id = config.operator_id.clone().unwrap_or_else(|| {
461            let num = ASOF_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
462            format!("asof_join_{num}")
463        });
464
465        Self {
466            config,
467            operator_id,
468            right_state: FxHashMap::default(),
469            watermark: i64::MIN,
470            metrics: AsofJoinMetrics::new(),
471            output_schema: None,
472            left_schema: None,
473            right_schema: None,
474            left_key_index: None,
475            right_key_index: None,
476        }
477    }
478
479    /// Creates a new ASOF join operator with explicit ID.
480    #[must_use]
481    pub fn with_id(mut config: AsofJoinConfig, operator_id: String) -> Self {
482        config.operator_id = Some(operator_id);
483        Self::new(config)
484    }
485
486    /// Returns the configuration.
487    #[must_use]
488    pub fn config(&self) -> &AsofJoinConfig {
489        &self.config
490    }
491
492    /// Returns the metrics.
493    #[must_use]
494    pub fn metrics(&self) -> &AsofJoinMetrics {
495        &self.metrics
496    }
497
498    /// Resets the metrics.
499    pub fn reset_metrics(&mut self) {
500        self.metrics.reset();
501    }
502
503    /// Returns the current watermark.
504    #[must_use]
505    pub fn watermark(&self) -> i64 {
506        self.watermark
507    }
508
509    /// Returns the total number of right-side events in state.
510    #[must_use]
511    pub fn state_size(&self) -> usize {
512        self.right_state.values().map(KeyState::len).sum()
513    }
514
515    /// Processes a left-side event (probe side).
516    pub fn process_left(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
517        self.metrics.left_events += 1;
518
519        // Capture left schema on first event
520        if self.left_schema.is_none() {
521            self.left_schema = Some(event.data.schema());
522            self.update_output_schema();
523        }
524
525        let mut output = OutputVec::new();
526
527        // Extract join key
528        let Some(key_value) = Self::extract_key(
529            &event.data,
530            &self.config.key_column,
531            &mut self.left_key_index,
532        ) else {
533            return output;
534        };
535
536        // Extract timestamp from left event
537        let left_timestamp = event.timestamp;
538
539        // Find matching right event
540        let match_result = self.find_match(&key_value, left_timestamp);
541
542        match match_result {
543            Some(matched_row) => {
544                self.metrics.matches += 1;
545                self.metrics.within_tolerance += 1;
546
547                // Create joined output
548                if let Some(joined) = self.create_joined_event(event, &matched_row) {
549                    output.push(Output::Event(joined));
550                }
551            }
552            None => {
553                if self.config.join_type.emits_unmatched() {
554                    self.metrics.unmatched_left += 1;
555                    if let Some(unmatched) = self.create_unmatched_event(event) {
556                        output.push(Output::Event(unmatched));
557                    }
558                }
559            }
560        }
561
562        output
563    }
564
565    /// Processes a right-side event (build side).
566    pub fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
567        self.metrics.right_events += 1;
568
569        // Capture right schema on first event
570        if self.right_schema.is_none() {
571            self.right_schema = Some(event.data.schema());
572            self.update_output_schema();
573        }
574
575        let output = OutputVec::new();
576
577        // Extract join key
578        let Some(key_value) = Self::extract_key(
579            &event.data,
580            &self.config.key_column,
581            &mut self.right_key_index,
582        ) else {
583            return output;
584        };
585
586        // Check if event is too late
587        if self.watermark > i64::MIN && event.timestamp < self.watermark {
588            self.metrics.late_events += 1;
589            // Still store it as it might match future left events for Backward direction
590        }
591
592        // Create row and store in state (key is stored in HashMap, not in row)
593        let row = AsofRow::new(event.timestamp, &event.data);
594
595        // Calculate cleanup time before borrowing state
596        let cleanup_time = self.calculate_cleanup_time(event.timestamp);
597
598        let key_state = self.right_state.entry(key_value).or_default();
599        key_state.insert(row);
600
601        // Register cleanup timer based on tolerance and direction
602        let timer_key = Self::make_cleanup_timer_key(&key_state.max_timestamp.to_be_bytes());
603        ctx.timers
604            .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
605
606        output
607    }
608
609    /// Finds a matching right-side event for the given left timestamp.
610    fn find_match(&self, key: &[u8], left_timestamp: i64) -> Option<AsofRow> {
611        let key_state = self.right_state.get(key)?;
612
613        match self.config.direction {
614            AsofDirection::Backward => self.find_backward_match(key_state, left_timestamp),
615            AsofDirection::Forward => self.find_forward_match(key_state, left_timestamp),
616            AsofDirection::Nearest => self.find_nearest_match(key_state, left_timestamp),
617        }
618    }
619
620    /// Finds the most recent prior event (timestamp <= `left_timestamp`).
621    fn find_backward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
622        // Use range(..=left_timestamp).last() to find the most recent prior event
623        let (ts, rows) = key_state.events.range(..=left_timestamp).next_back()?;
624
625        // Check tolerance
626        let diff = left_timestamp - ts;
627        if diff > self.config.tolerance_ms() {
628            return None;
629        }
630
631        // Return the last row at this timestamp (most recent)
632        rows.last().cloned()
633    }
634
635    /// Finds the next future event (timestamp >= `left_timestamp`).
636    fn find_forward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
637        // Use range(left_timestamp..).first() to find the next future event
638        let (ts, rows) = key_state.events.range(left_timestamp..).next()?;
639
640        // Check tolerance
641        let diff = ts - left_timestamp;
642        if diff > self.config.tolerance_ms() {
643            return None;
644        }
645
646        // Return the first row at this timestamp
647        rows.first().cloned()
648    }
649
650    /// Finds the closest event by absolute time difference.
651    fn find_nearest_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
652        let before = key_state.events.range(..=left_timestamp).next_back();
653        let after = key_state.events.range(left_timestamp..).next();
654
655        let candidate = match (before, after) {
656            (Some((ts_before, rows_before)), Some((ts_after, rows_after))) => {
657                let diff_before = left_timestamp - ts_before;
658                let diff_after = ts_after - left_timestamp;
659                if diff_before <= diff_after {
660                    Some((diff_before, rows_before.last()?.clone()))
661                } else {
662                    Some((diff_after, rows_after.first()?.clone()))
663                }
664            }
665            (Some((ts, rows)), None) => {
666                let diff = left_timestamp - ts;
667                Some((diff, rows.last()?.clone()))
668            }
669            (None, Some((ts, rows))) => {
670                let diff = ts - left_timestamp;
671                Some((diff, rows.first()?.clone()))
672            }
673            (None, None) => None,
674        };
675
676        let (diff, row) = candidate?;
677
678        // Check tolerance
679        if diff > self.config.tolerance_ms() {
680            return None;
681        }
682
683        Some(row)
684    }
685
686    /// Calculates when state for a given timestamp can be cleaned up.
687    fn calculate_cleanup_time(&self, timestamp: i64) -> i64 {
688        let tolerance_ms = self.config.tolerance_ms();
689        match self.config.direction {
690            // For Backward and Nearest, we need to keep state longer
691            // because future left events may match with these right events
692            AsofDirection::Backward | AsofDirection::Nearest => {
693                if tolerance_ms == i64::MAX {
694                    i64::MAX
695                } else {
696                    timestamp.saturating_add(tolerance_ms)
697                }
698            }
699            // For Forward, we can clean up more aggressively
700            AsofDirection::Forward => timestamp,
701        }
702    }
703
704    /// Handles watermark updates and triggers state cleanup.
705    pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
706        self.watermark = watermark;
707        self.cleanup_state(watermark);
708        OutputVec::new()
709    }
710
711    /// Cleans up state that can no longer produce matches.
712    fn cleanup_state(&mut self, watermark: i64) {
713        let tolerance_ms = self.config.tolerance_ms();
714
715        let threshold = match self.config.direction {
716            AsofDirection::Backward | AsofDirection::Nearest => {
717                if tolerance_ms == i64::MAX {
718                    i64::MIN // Never clean up
719                } else {
720                    watermark.saturating_sub(tolerance_ms)
721                }
722            }
723            AsofDirection::Forward => watermark,
724        };
725
726        if threshold == i64::MIN {
727            return;
728        }
729
730        let initial_count: usize = self.right_state.values().map(KeyState::len).sum();
731
732        for key_state in self.right_state.values_mut() {
733            key_state.cleanup_before(threshold);
734        }
735
736        // Remove empty key states
737        self.right_state.retain(|_, v| !v.is_empty());
738
739        let final_count: usize = self.right_state.values().map(KeyState::len).sum();
740        if final_count < initial_count {
741            self.metrics.state_cleanups += (initial_count - final_count) as u64;
742        }
743    }
744
745    /// Extracts the join key value from a record batch.
746    ///
747    /// Uses a cached column index to avoid O(n) schema lookups after the first call.
748    fn extract_key(
749        batch: &RecordBatch,
750        column_name: &str,
751        cached_index: &mut Option<usize>,
752    ) -> Option<AsofKey> {
753        let column_index = if let Some(idx) = *cached_index {
754            idx
755        } else {
756            let idx = batch.schema().index_of(column_name).ok()?;
757            *cached_index = Some(idx);
758            idx
759        };
760        let column = batch.column(column_index);
761
762        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
763            if string_array.is_empty() || string_array.is_null(0) {
764                return None;
765            }
766            return Some(AsofKey::from_slice(string_array.value(0).as_bytes()));
767        }
768
769        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
770            if int_array.is_empty() || int_array.is_null(0) {
771                return None;
772            }
773            return Some(AsofKey::from_slice(&int_array.value(0).to_le_bytes()));
774        }
775
776        None
777    }
778
779    /// Creates a timer key for state cleanup.
780    fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
781        let mut key = TimerKey::new();
782        key.push(ASOF_TIMER_PREFIX);
783        key.extend_from_slice(key_suffix);
784        key
785    }
786
787    /// Updates the output schema when both input schemas are known.
788    fn update_output_schema(&mut self) {
789        if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
790            let mut fields: Vec<Field> =
791                Vec::with_capacity(left.fields().len() + right.fields().len());
792            fields.extend(left.fields().iter().map(|f| f.as_ref().clone()));
793
794            // Add right fields, prefixing duplicates
795            for field in right.fields() {
796                let name = if left.field_with_name(field.name()).is_ok() {
797                    format!("right_{}", field.name())
798                } else {
799                    field.name().clone()
800                };
801                fields.push(Field::new(
802                    name,
803                    field.data_type().clone(),
804                    true, // Nullable for outer joins
805                ));
806            }
807
808            self.output_schema = Some(Arc::new(Schema::new(fields)));
809        }
810    }
811
812    /// Creates a joined event from left event and matched right row.
813    fn create_joined_event(&self, left_event: &Event, right_row: &AsofRow) -> Option<Event> {
814        let schema = self.output_schema.as_ref()?;
815
816        let left_cols = left_event.data.columns();
817        let right_cols = right_row.batch().columns();
818        let mut columns: Vec<ArrayRef> = Vec::with_capacity(left_cols.len() + right_cols.len());
819        columns.extend_from_slice(left_cols);
820        for column in right_cols {
821            columns.push(Arc::clone(column));
822        }
823
824        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
825
826        Some(Event::new(left_event.timestamp, joined_batch))
827    }
828
829    /// Creates an unmatched event for left outer joins (with null right columns).
830    fn create_unmatched_event(&self, left_event: &Event) -> Option<Event> {
831        let schema = self.output_schema.as_ref()?;
832        let right_schema = self.right_schema.as_ref()?;
833
834        let num_rows = left_event.data.num_rows();
835        let left_cols = left_event.data.columns();
836        let mut columns: Vec<ArrayRef> =
837            Vec::with_capacity(left_cols.len() + right_schema.fields().len());
838        columns.extend_from_slice(left_cols);
839
840        // Add null columns for right side
841        for field in right_schema.fields() {
842            columns.push(Self::create_null_array(field.data_type(), num_rows));
843        }
844
845        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
846
847        Some(Event::new(left_event.timestamp, joined_batch))
848    }
849
850    /// Creates a null array of the given type and length.
851    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
852        match data_type {
853            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
854            DataType::Float64 => {
855                use arrow_array::Float64Array;
856                Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
857            }
858            // Default to Int64 for numeric and other types
859            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
860        }
861    }
862}
863
864impl Operator for AsofJoinOperator {
865    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
866        // Default to processing as left event
867        self.process_left(event, ctx)
868    }
869
870    fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
871        // Timers trigger state cleanup
872        if timer.key.first() == Some(&ASOF_TIMER_PREFIX) {
873            self.cleanup_state(timer.timestamp);
874        }
875        OutputVec::new()
876    }
877
878    fn checkpoint(&self) -> OperatorState {
879        let mut keys_dropped: u64 = 0;
880        let mut state_entries: Vec<(Vec<u8>, SerializableKeyState)> =
881            Vec::with_capacity(self.right_state.len());
882        for (k, v) in &self.right_state {
883            match SerializableKeyState::from_key_state(v) {
884                Ok(s) => state_entries.push((k.to_vec(), s)),
885                Err(e) => {
886                    keys_dropped += 1;
887                    tracing::error!(
888                        operator_id = %self.operator_id,
889                        error = %e,
890                        "[LDB-6013] ASOF join key failed serialization during checkpoint — \
891                         this key will be MISSING after recovery"
892                    );
893                }
894            }
895        }
896        if keys_dropped > 0 {
897            tracing::error!(
898                operator_id = %self.operator_id,
899                keys_dropped,
900                keys_total = self.right_state.len(),
901                "[LDB-6013] ASOF join checkpoint lost state — \
902                 {keys_dropped} keys could not be serialized"
903            );
904        }
905
906        let checkpoint_data = (
907            self.watermark,
908            self.metrics.left_events,
909            self.metrics.right_events,
910            self.metrics.matches,
911            self.metrics.unmatched_left,
912            state_entries,
913        );
914
915        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
916            .map(|v| v.to_vec())
917            .unwrap_or_default();
918
919        OperatorState {
920            operator_id: self.operator_id.clone(),
921            data,
922        }
923    }
924
925    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
926        type CheckpointData = (
927            i64,
928            u64,
929            u64,
930            u64,
931            u64,
932            Vec<(Vec<u8>, SerializableKeyState)>,
933        );
934
935        if state.operator_id != self.operator_id {
936            return Err(OperatorError::StateAccessFailed(format!(
937                "Operator ID mismatch: expected {}, got {}",
938                self.operator_id, state.operator_id
939            )));
940        }
941
942        let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
943            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
944        let (watermark, left_events, right_events, matches, unmatched_left, state_entries) =
945            rkyv::deserialize::<CheckpointData, RkyvError>(archived)
946                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
947
948        self.watermark = watermark;
949        self.metrics.left_events = left_events;
950        self.metrics.right_events = right_events;
951        self.metrics.matches = matches;
952        self.metrics.unmatched_left = unmatched_left;
953
954        // Restore state
955        self.right_state.clear();
956        for (key, serializable) in state_entries {
957            let key_state = serializable.to_key_state()?;
958            self.right_state
959                .insert(AsofKey::from_slice(&key), key_state);
960        }
961
962        Ok(())
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use super::*;
969    use crate::state::{InMemoryStore, StateStore};
970    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
971    use arrow_array::Float64Array;
972    use arrow_schema::{DataType, Field, Schema};
973
974    /// Creates a trade event for testing.
975    fn create_trade_event(timestamp: i64, symbol: &str, price: f64) -> Event {
976        let schema = Arc::new(Schema::new(vec![
977            Field::new("symbol", DataType::Utf8, false),
978            Field::new("price", DataType::Float64, false),
979        ]));
980        let batch = RecordBatch::try_new(
981            schema,
982            vec![
983                Arc::new(StringArray::from(vec![symbol])),
984                Arc::new(Float64Array::from(vec![price])),
985            ],
986        )
987        .unwrap();
988        Event::new(timestamp, batch)
989    }
990
991    /// Creates a quote event for testing.
992    fn create_quote_event(timestamp: i64, symbol: &str, bid: f64, ask: f64) -> Event {
993        let schema = Arc::new(Schema::new(vec![
994            Field::new("symbol", DataType::Utf8, false),
995            Field::new("bid", DataType::Float64, false),
996            Field::new("ask", DataType::Float64, false),
997        ]));
998        let batch = RecordBatch::try_new(
999            schema,
1000            vec![
1001                Arc::new(StringArray::from(vec![symbol])),
1002                Arc::new(Float64Array::from(vec![bid])),
1003                Arc::new(Float64Array::from(vec![ask])),
1004            ],
1005        )
1006        .unwrap();
1007        Event::new(timestamp, batch)
1008    }
1009
1010    fn create_test_context<'a>(
1011        timers: &'a mut TimerService,
1012        state: &'a mut dyn StateStore,
1013        watermark_gen: &'a mut dyn WatermarkGenerator,
1014    ) -> OperatorContext<'a> {
1015        OperatorContext {
1016            event_time: 0,
1017            processing_time: 0,
1018            timers,
1019            state,
1020            watermark_generator: watermark_gen,
1021            operator_index: 0,
1022        }
1023    }
1024
1025    #[test]
1026    fn test_asof_direction_default() {
1027        assert_eq!(AsofDirection::default(), AsofDirection::Backward);
1028    }
1029
1030    #[test]
1031    fn test_asof_join_type_properties() {
1032        assert!(!AsofJoinType::Inner.emits_unmatched());
1033        assert!(AsofJoinType::Left.emits_unmatched());
1034    }
1035
1036    #[test]
1037    fn test_config_builder() {
1038        let config = AsofJoinConfig::builder()
1039            .key_column("symbol".to_string())
1040            .left_time_column("trade_time".to_string())
1041            .right_time_column("quote_time".to_string())
1042            .direction(AsofDirection::Backward)
1043            .tolerance(Duration::from_secs(5))
1044            .join_type(AsofJoinType::Left)
1045            .operator_id("test_op".to_string())
1046            .build()
1047            .unwrap();
1048
1049        assert_eq!(config.key_column, "symbol");
1050        assert_eq!(config.left_time_column, "trade_time");
1051        assert_eq!(config.right_time_column, "quote_time");
1052        assert_eq!(config.direction, AsofDirection::Backward);
1053        assert_eq!(config.tolerance, Some(Duration::from_secs(5)));
1054        assert_eq!(config.join_type, AsofJoinType::Left);
1055        assert_eq!(config.tolerance_ms(), 5000);
1056    }
1057
1058    #[test]
1059    fn test_backward_asof_basic() {
1060        let config = AsofJoinConfig::builder()
1061            .key_column("symbol".to_string())
1062            .left_time_column("trade_time".to_string())
1063            .right_time_column("quote_time".to_string())
1064            .direction(AsofDirection::Backward)
1065            .tolerance(Duration::from_secs(10))
1066            .join_type(AsofJoinType::Inner)
1067            .build()
1068            .unwrap();
1069
1070        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1071
1072        let mut timers = TimerService::new();
1073        let mut state = InMemoryStore::new();
1074        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1075
1076        // Store quote at t=900
1077        let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1078        {
1079            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1080            operator.process_right(&quote, &mut ctx);
1081        }
1082
1083        // Store quote at t=950
1084        let quote2 = create_quote_event(950, "AAPL", 152.0, 153.0);
1085        {
1086            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1087            operator.process_right(&quote2, &mut ctx);
1088        }
1089
1090        // Trade at t=1000 should match quote at t=950 (most recent prior)
1091        let trade = create_trade_event(1000, "AAPL", 152.5);
1092        let outputs = {
1093            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1094            operator.process_left(&trade, &mut ctx)
1095        };
1096
1097        assert_eq!(
1098            outputs
1099                .iter()
1100                .filter(|o| matches!(o, Output::Event(_)))
1101                .count(),
1102            1
1103        );
1104        assert_eq!(operator.metrics().matches, 1);
1105
1106        // Verify output has both trade and quote columns
1107        if let Some(Output::Event(event)) = outputs.first() {
1108            assert_eq!(event.data.num_columns(), 5); // 2 trade + 3 quote
1109        }
1110    }
1111
1112    #[test]
1113    fn test_forward_asof_basic() {
1114        let config = AsofJoinConfig::builder()
1115            .key_column("symbol".to_string())
1116            .left_time_column("trade_time".to_string())
1117            .right_time_column("quote_time".to_string())
1118            .direction(AsofDirection::Forward)
1119            .tolerance(Duration::from_secs(10))
1120            .join_type(AsofJoinType::Inner)
1121            .build()
1122            .unwrap();
1123
1124        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1125
1126        let mut timers = TimerService::new();
1127        let mut state = InMemoryStore::new();
1128        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1129
1130        // Store quotes after the trade time
1131        let quote1 = create_quote_event(1050, "AAPL", 150.0, 151.0);
1132        let quote2 = create_quote_event(1100, "AAPL", 152.0, 153.0);
1133        {
1134            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1135            operator.process_right(&quote1, &mut ctx);
1136            operator.process_right(&quote2, &mut ctx);
1137        }
1138
1139        // Trade at t=1000 should match quote at t=1050 (next future)
1140        let trade = create_trade_event(1000, "AAPL", 150.5);
1141        let outputs = {
1142            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1143            operator.process_left(&trade, &mut ctx)
1144        };
1145
1146        assert_eq!(
1147            outputs
1148                .iter()
1149                .filter(|o| matches!(o, Output::Event(_)))
1150                .count(),
1151            1
1152        );
1153        assert_eq!(operator.metrics().matches, 1);
1154    }
1155
1156    #[test]
1157    fn test_nearest_asof() {
1158        let config = AsofJoinConfig::builder()
1159            .key_column("symbol".to_string())
1160            .left_time_column("trade_time".to_string())
1161            .right_time_column("quote_time".to_string())
1162            .direction(AsofDirection::Nearest)
1163            .tolerance(Duration::from_secs(10))
1164            .join_type(AsofJoinType::Inner)
1165            .build()
1166            .unwrap();
1167
1168        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1169
1170        let mut timers = TimerService::new();
1171        let mut state = InMemoryStore::new();
1172        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1173
1174        // Store quotes before and after
1175        let quote_before = create_quote_event(990, "AAPL", 150.0, 151.0);
1176        let quote_after = create_quote_event(1020, "AAPL", 152.0, 153.0);
1177        {
1178            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1179            operator.process_right(&quote_before, &mut ctx);
1180            operator.process_right(&quote_after, &mut ctx);
1181        }
1182
1183        // Trade at t=1000 - quote_before is 10ms away, quote_after is 20ms away
1184        // Should match quote_before (closer)
1185        let trade = create_trade_event(1000, "AAPL", 150.5);
1186        let outputs = {
1187            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1188            operator.process_left(&trade, &mut ctx)
1189        };
1190
1191        assert_eq!(
1192            outputs
1193                .iter()
1194                .filter(|o| matches!(o, Output::Event(_)))
1195                .count(),
1196            1
1197        );
1198    }
1199
1200    #[test]
1201    fn test_tolerance_exceeded() {
1202        let config = AsofJoinConfig::builder()
1203            .key_column("symbol".to_string())
1204            .left_time_column("trade_time".to_string())
1205            .right_time_column("quote_time".to_string())
1206            .direction(AsofDirection::Backward)
1207            .tolerance(Duration::from_millis(50)) // 50ms tolerance
1208            .join_type(AsofJoinType::Inner)
1209            .build()
1210            .unwrap();
1211
1212        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1213
1214        let mut timers = TimerService::new();
1215        let mut state = InMemoryStore::new();
1216        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1217
1218        // Store quote at t=900
1219        let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1220        {
1221            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1222            operator.process_right(&quote, &mut ctx);
1223        }
1224
1225        // Trade at t=1000 - 100ms after quote, exceeds 50ms tolerance
1226        let trade = create_trade_event(1000, "AAPL", 150.5);
1227        let outputs = {
1228            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1229            operator.process_left(&trade, &mut ctx)
1230        };
1231
1232        // No match due to tolerance exceeded
1233        assert_eq!(outputs.len(), 0);
1234        assert_eq!(operator.metrics().matches, 0);
1235    }
1236
1237    #[test]
1238    fn test_tolerance_within() {
1239        let config = AsofJoinConfig::builder()
1240            .key_column("symbol".to_string())
1241            .left_time_column("trade_time".to_string())
1242            .right_time_column("quote_time".to_string())
1243            .direction(AsofDirection::Backward)
1244            .tolerance(Duration::from_millis(100)) // 100ms tolerance
1245            .join_type(AsofJoinType::Inner)
1246            .build()
1247            .unwrap();
1248
1249        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1250
1251        let mut timers = TimerService::new();
1252        let mut state = InMemoryStore::new();
1253        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1254
1255        // Store quote at t=950
1256        let quote = create_quote_event(950, "AAPL", 150.0, 151.0);
1257        {
1258            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1259            operator.process_right(&quote, &mut ctx);
1260        }
1261
1262        // Trade at t=1000 - 50ms after quote, within 100ms tolerance
1263        let trade = create_trade_event(1000, "AAPL", 150.5);
1264        let outputs = {
1265            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1266            operator.process_left(&trade, &mut ctx)
1267        };
1268
1269        assert_eq!(
1270            outputs
1271                .iter()
1272                .filter(|o| matches!(o, Output::Event(_)))
1273                .count(),
1274            1
1275        );
1276        assert_eq!(operator.metrics().within_tolerance, 1);
1277    }
1278
1279    #[test]
1280    fn test_no_match_empty_state() {
1281        let config = AsofJoinConfig::builder()
1282            .key_column("symbol".to_string())
1283            .left_time_column("trade_time".to_string())
1284            .right_time_column("quote_time".to_string())
1285            .direction(AsofDirection::Backward)
1286            .join_type(AsofJoinType::Inner)
1287            .build()
1288            .unwrap();
1289
1290        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1291
1292        let mut timers = TimerService::new();
1293        let mut state = InMemoryStore::new();
1294        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1295
1296        // Trade with no quotes in state
1297        let trade = create_trade_event(1000, "AAPL", 150.5);
1298        let outputs = {
1299            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1300            operator.process_left(&trade, &mut ctx)
1301        };
1302
1303        assert_eq!(outputs.len(), 0);
1304        assert_eq!(operator.metrics().matches, 0);
1305    }
1306
1307    #[test]
1308    fn test_multiple_keys() {
1309        let config = AsofJoinConfig::builder()
1310            .key_column("symbol".to_string())
1311            .left_time_column("trade_time".to_string())
1312            .right_time_column("quote_time".to_string())
1313            .direction(AsofDirection::Backward)
1314            .tolerance(Duration::from_secs(10))
1315            .join_type(AsofJoinType::Inner)
1316            .build()
1317            .unwrap();
1318
1319        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1320
1321        let mut timers = TimerService::new();
1322        let mut state = InMemoryStore::new();
1323        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1324
1325        // Store quotes for different symbols
1326        {
1327            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1328            operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1329            operator.process_right(&create_quote_event(960, "GOOG", 2800.0, 2801.0), &mut ctx);
1330        }
1331
1332        // Trade for AAPL should match AAPL quote, not GOOG
1333        let trade = create_trade_event(1000, "AAPL", 150.5);
1334        let outputs = {
1335            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1336            operator.process_left(&trade, &mut ctx)
1337        };
1338
1339        assert_eq!(
1340            outputs
1341                .iter()
1342                .filter(|o| matches!(o, Output::Event(_)))
1343                .count(),
1344            1
1345        );
1346
1347        // Trade for GOOG should match GOOG quote
1348        let trade2 = create_trade_event(1000, "GOOG", 2800.5);
1349        let outputs2 = {
1350            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1351            operator.process_left(&trade2, &mut ctx)
1352        };
1353
1354        assert_eq!(
1355            outputs2
1356                .iter()
1357                .filter(|o| matches!(o, Output::Event(_)))
1358                .count(),
1359            1
1360        );
1361        assert_eq!(operator.metrics().matches, 2);
1362    }
1363
1364    #[test]
1365    fn test_multiple_events_same_timestamp() {
1366        let config = AsofJoinConfig::builder()
1367            .key_column("symbol".to_string())
1368            .left_time_column("trade_time".to_string())
1369            .right_time_column("quote_time".to_string())
1370            .direction(AsofDirection::Backward)
1371            .tolerance(Duration::from_secs(10))
1372            .join_type(AsofJoinType::Inner)
1373            .build()
1374            .unwrap();
1375
1376        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1377
1378        let mut timers = TimerService::new();
1379        let mut state = InMemoryStore::new();
1380        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1381
1382        // Store multiple quotes at the same timestamp
1383        {
1384            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1385            operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1386            operator.process_right(&create_quote_event(950, "AAPL", 150.5, 151.5), &mut ctx);
1387            // Same ts
1388        }
1389
1390        // Trade should match (last quote at that timestamp for Backward)
1391        let trade = create_trade_event(1000, "AAPL", 150.5);
1392        let outputs = {
1393            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1394            operator.process_left(&trade, &mut ctx)
1395        };
1396
1397        assert_eq!(
1398            outputs
1399                .iter()
1400                .filter(|o| matches!(o, Output::Event(_)))
1401                .count(),
1402            1
1403        );
1404    }
1405
1406    #[test]
1407    fn test_left_outer_join() {
1408        let config = AsofJoinConfig::builder()
1409            .key_column("symbol".to_string())
1410            .left_time_column("trade_time".to_string())
1411            .right_time_column("quote_time".to_string())
1412            .direction(AsofDirection::Backward)
1413            .tolerance(Duration::from_millis(50))
1414            .join_type(AsofJoinType::Left) // Left outer join
1415            .build()
1416            .unwrap();
1417
1418        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1419
1420        let mut timers = TimerService::new();
1421        let mut state = InMemoryStore::new();
1422        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1423
1424        // First, process a matched event to establish right schema
1425        {
1426            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1427            operator.process_right(&create_quote_event(990, "AAPL", 150.0, 151.0), &mut ctx);
1428        }
1429        {
1430            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1431            operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1432        }
1433
1434        // Now process trade with no matching quote (different symbol)
1435        let trade = create_trade_event(2000, "GOOG", 2800.5);
1436        let outputs = {
1437            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1438            operator.process_left(&trade, &mut ctx)
1439        };
1440
1441        // Left join should emit with nulls
1442        assert_eq!(
1443            outputs
1444                .iter()
1445                .filter(|o| matches!(o, Output::Event(_)))
1446                .count(),
1447            1
1448        );
1449        assert_eq!(operator.metrics().unmatched_left, 1);
1450
1451        if let Some(Output::Event(event)) = outputs.first() {
1452            // Should have both left and right columns (with nulls)
1453            assert_eq!(event.data.num_columns(), 5);
1454        }
1455    }
1456
1457    #[test]
1458    fn test_inner_join_no_output() {
1459        let config = AsofJoinConfig::builder()
1460            .key_column("symbol".to_string())
1461            .left_time_column("trade_time".to_string())
1462            .right_time_column("quote_time".to_string())
1463            .direction(AsofDirection::Backward)
1464            .tolerance(Duration::from_millis(50))
1465            .join_type(AsofJoinType::Inner)
1466            .build()
1467            .unwrap();
1468
1469        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1470
1471        let mut timers = TimerService::new();
1472        let mut state = InMemoryStore::new();
1473        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1474
1475        // Trade with no matching quote
1476        let trade = create_trade_event(1000, "AAPL", 150.5);
1477        let outputs = {
1478            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1479            operator.process_left(&trade, &mut ctx)
1480        };
1481
1482        // Inner join should emit nothing when no match
1483        assert_eq!(outputs.len(), 0);
1484    }
1485
1486    #[test]
1487    fn test_state_cleanup() {
1488        let config = AsofJoinConfig::builder()
1489            .key_column("symbol".to_string())
1490            .left_time_column("trade_time".to_string())
1491            .right_time_column("quote_time".to_string())
1492            .direction(AsofDirection::Backward)
1493            .tolerance(Duration::from_millis(100))
1494            .join_type(AsofJoinType::Inner)
1495            .build()
1496            .unwrap();
1497
1498        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1499
1500        let mut timers = TimerService::new();
1501        let mut state = InMemoryStore::new();
1502        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1503
1504        // Store quotes
1505        {
1506            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1507            operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1508            operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1509        }
1510
1511        assert_eq!(operator.state_size(), 2);
1512
1513        // Advance watermark past old quotes
1514        {
1515            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1516            operator.on_watermark(1100, &mut ctx);
1517        }
1518
1519        // Old quotes should be cleaned up
1520        assert!(operator.state_size() < 2 || operator.metrics().state_cleanups > 0);
1521    }
1522
1523    #[test]
1524    fn test_late_event_still_stored() {
1525        let config = AsofJoinConfig::builder()
1526            .key_column("symbol".to_string())
1527            .left_time_column("trade_time".to_string())
1528            .right_time_column("quote_time".to_string())
1529            .direction(AsofDirection::Backward)
1530            .tolerance(Duration::from_secs(10))
1531            .join_type(AsofJoinType::Inner)
1532            .build()
1533            .unwrap();
1534
1535        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1536
1537        let mut timers = TimerService::new();
1538        let mut state = InMemoryStore::new();
1539        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1540
1541        // Set watermark
1542        operator.watermark = 1000;
1543
1544        // Process late quote
1545        let quote = create_quote_event(500, "AAPL", 150.0, 151.0);
1546        {
1547            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1548            operator.process_right(&quote, &mut ctx);
1549        }
1550
1551        assert_eq!(operator.metrics().late_events, 1);
1552        // Late events are still stored (for Backward joins)
1553        assert_eq!(operator.state_size(), 1);
1554    }
1555
1556    #[test]
1557    fn test_checkpoint_restore() {
1558        let config = AsofJoinConfig::builder()
1559            .key_column("symbol".to_string())
1560            .left_time_column("trade_time".to_string())
1561            .right_time_column("quote_time".to_string())
1562            .direction(AsofDirection::Backward)
1563            .tolerance(Duration::from_secs(10))
1564            .join_type(AsofJoinType::Inner)
1565            .build()
1566            .unwrap();
1567
1568        let mut operator = AsofJoinOperator::with_id(config.clone(), "test_asof".to_string());
1569
1570        let mut timers = TimerService::new();
1571        let mut state = InMemoryStore::new();
1572        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1573
1574        // Add some state
1575        {
1576            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1577            operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1578            operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1579        }
1580
1581        // Record metrics
1582        operator.metrics.left_events = 10;
1583        operator.metrics.matches = 5;
1584        operator.watermark = 800;
1585
1586        // Checkpoint
1587        let checkpoint = operator.checkpoint();
1588
1589        // Restore to new operator
1590        let mut restored = AsofJoinOperator::with_id(config, "test_asof".to_string());
1591        restored.restore(checkpoint).unwrap();
1592
1593        // Verify state restored
1594        assert_eq!(restored.metrics().left_events, 10);
1595        assert_eq!(restored.metrics().matches, 5);
1596        assert_eq!(restored.watermark(), 800);
1597        assert_eq!(restored.state_size(), 2);
1598    }
1599
1600    #[test]
1601    fn test_schema_composition() {
1602        let config = AsofJoinConfig::builder()
1603            .key_column("symbol".to_string())
1604            .left_time_column("trade_time".to_string())
1605            .right_time_column("quote_time".to_string())
1606            .direction(AsofDirection::Backward)
1607            .tolerance(Duration::from_secs(10))
1608            .join_type(AsofJoinType::Inner)
1609            .build()
1610            .unwrap();
1611
1612        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1613
1614        let mut timers = TimerService::new();
1615        let mut state = InMemoryStore::new();
1616        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1617
1618        // Process right to capture schema
1619        {
1620            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1621            operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1622        }
1623
1624        // Process left to capture schema and produce output
1625        let trade = create_trade_event(1000, "AAPL", 150.5);
1626        let outputs = {
1627            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1628            operator.process_left(&trade, &mut ctx)
1629        };
1630
1631        assert_eq!(outputs.len(), 1);
1632
1633        if let Some(Output::Event(event)) = outputs.first() {
1634            let schema = event.data.schema();
1635
1636            // Check left columns (trade)
1637            assert!(schema.field_with_name("price").is_ok());
1638
1639            // Check right columns (quote) - symbol is duplicated, so prefixed
1640            assert!(schema.field_with_name("right_symbol").is_ok());
1641            assert!(schema.field_with_name("bid").is_ok());
1642            assert!(schema.field_with_name("ask").is_ok());
1643        }
1644    }
1645
1646    #[test]
1647    fn test_metrics_tracking() {
1648        let config = AsofJoinConfig::builder()
1649            .key_column("symbol".to_string())
1650            .left_time_column("trade_time".to_string())
1651            .right_time_column("quote_time".to_string())
1652            .direction(AsofDirection::Backward)
1653            .tolerance(Duration::from_secs(10))
1654            .join_type(AsofJoinType::Inner)
1655            .build()
1656            .unwrap();
1657
1658        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1659
1660        let mut timers = TimerService::new();
1661        let mut state = InMemoryStore::new();
1662        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1663
1664        // Process some events
1665        {
1666            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1667            operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1668            operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1669        }
1670
1671        {
1672            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1673            operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1674            operator.process_left(&create_trade_event(1100, "AAPL", 151.5), &mut ctx);
1675        }
1676
1677        assert_eq!(operator.metrics().right_events, 2);
1678        assert_eq!(operator.metrics().left_events, 2);
1679        assert_eq!(operator.metrics().matches, 2);
1680        assert_eq!(operator.metrics().within_tolerance, 2);
1681    }
1682
1683    #[test]
1684    fn test_key_state_operations() {
1685        let mut key_state = KeyState::new();
1686        assert!(key_state.is_empty());
1687
1688        // Insert some rows
1689        let empty_batch = Arc::new(RecordBatch::new_empty(Arc::new(Schema::empty())));
1690        let row1 = AsofRow::new(100, &empty_batch);
1691        let row2 = AsofRow::new(200, &empty_batch);
1692
1693        key_state.insert(row1);
1694        key_state.insert(row2);
1695
1696        assert_eq!(key_state.len(), 2);
1697        assert_eq!(key_state.min_timestamp, 100);
1698        assert_eq!(key_state.max_timestamp, 200);
1699
1700        // Cleanup before 150
1701        key_state.cleanup_before(150);
1702        assert_eq!(key_state.len(), 1);
1703        assert_eq!(key_state.min_timestamp, 200);
1704    }
1705
1706    #[test]
1707    fn test_asof_row_serialization() {
1708        let schema = Arc::new(Schema::new(vec![
1709            Field::new("symbol", DataType::Utf8, false),
1710            Field::new("value", DataType::Float64, false),
1711        ]));
1712        let batch = Arc::new(
1713            RecordBatch::try_new(
1714                schema,
1715                vec![
1716                    Arc::new(StringArray::from(vec!["AAPL"])),
1717                    Arc::new(Float64Array::from(vec![150.5])),
1718                ],
1719            )
1720            .unwrap(),
1721        );
1722
1723        let row = AsofRow::new(1000, &batch);
1724
1725        // Verify round-trip through serializable form (checkpoint path)
1726        let serializable = SerializableAsofRow::from_row(&row).unwrap();
1727        let restored = serializable.to_row().unwrap();
1728        assert_eq!(restored.batch().num_rows(), 1);
1729        assert_eq!(restored.batch().num_columns(), 2);
1730        assert_eq!(restored.timestamp, 1000);
1731    }
1732
1733    #[test]
1734    fn test_metrics_reset() {
1735        let mut metrics = AsofJoinMetrics::new();
1736        metrics.left_events = 100;
1737        metrics.matches = 50;
1738
1739        metrics.reset();
1740
1741        assert_eq!(metrics.left_events, 0);
1742        assert_eq!(metrics.matches, 0);
1743    }
1744
1745    #[test]
1746    fn test_unlimited_tolerance() {
1747        let config = AsofJoinConfig::builder()
1748            .key_column("symbol".to_string())
1749            .left_time_column("trade_time".to_string())
1750            .right_time_column("quote_time".to_string())
1751            .direction(AsofDirection::Backward)
1752            // No tolerance set - unlimited
1753            .join_type(AsofJoinType::Inner)
1754            .build()
1755            .unwrap();
1756
1757        assert_eq!(config.tolerance_ms(), i64::MAX);
1758
1759        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1760
1761        let mut timers = TimerService::new();
1762        let mut state = InMemoryStore::new();
1763        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1764
1765        // Store very old quote
1766        {
1767            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1768            operator.process_right(&create_quote_event(100, "AAPL", 150.0, 151.0), &mut ctx);
1769        }
1770
1771        // Trade much later should still match with unlimited tolerance
1772        let trade = create_trade_event(1_000_000, "AAPL", 150.5);
1773        let outputs = {
1774            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1775            operator.process_left(&trade, &mut ctx)
1776        };
1777
1778        assert_eq!(
1779            outputs
1780                .iter()
1781                .filter(|o| matches!(o, Output::Event(_)))
1782                .count(),
1783            1
1784        );
1785    }
1786}