Skip to main content

laminar_core/operator/
temporal_join.rs

1//! # Temporal Join Operators
2//!
3//! Join streaming events with versioned tables using point-in-time lookups.
4//! Temporal joins return the table value that was valid at the event's timestamp,
5//! enabling consistent enrichment with time-varying dimension data.
6//!
7//! ## Use Cases
8//!
9//! - Currency rate lookup at transaction time
10//! - Product price lookup at order time
11//! - User tier lookup at event time
12//! - Regulatory compliance (audit trail)
13//!
14//! ## Join Types
15//!
16//! - **Event-Time**: Deterministic lookup based on event timestamp (`FOR SYSTEM_TIME AS OF`)
17//! - **Process-Time**: Non-deterministic lookup based on processing time (latest value)
18//!
19//! ## Table Characteristics
20//!
21//! - **Append-Only**: Only inserts, no updates/deletes. Optimized: no stream-side state needed.
22//! - **Non-Append-Only**: Has updates/deletes. Requires state to track join results for retractions.
23//!
24//! ## Example
25//!
26//! ```rust,no_run
27//! use laminar_core::operator::temporal_join::{
28//!     TemporalJoinOperator, TemporalJoinConfig, TemporalJoinSemantics,
29//!     TableCharacteristics, TemporalJoinType,
30//! };
31//!
32//! // Join orders with currency rates valid at order time
33//! let config = TemporalJoinConfig::builder()
34//!     .stream_key_column("currency".to_string())
35//!     .table_key_column("currency".to_string())
36//!     .table_version_column("valid_from".to_string())
37//!     .semantics(TemporalJoinSemantics::EventTime)
38//!     .table_characteristics(TableCharacteristics::AppendOnly)
39//!     .join_type(TemporalJoinType::Inner)
40//!     .build()
41//!     .unwrap();
42//!
43//! let operator = TemporalJoinOperator::new(config);
44//! ```
45//!
46//! ## SQL Syntax (Future)
47//!
48//! ```sql
49//! -- Event-time temporal join
50//! SELECT o.*, r.rate
51//! FROM orders o
52//! JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time r
53//!     ON o.currency = r.currency;
54//!
55//! -- Process-time temporal join (latest value)
56//! SELECT o.*, c.tier
57//! FROM orders o
58//! JOIN customers FOR SYSTEM_TIME AS OF PROCTIME() c
59//!     ON o.customer_id = c.id;
60//! ```
61
62use super::{
63    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
64    TimerKey,
65};
66use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
67use arrow_schema::{DataType, Field, Schema, SchemaRef};
68use rkyv::{
69    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
70};
71use rustc_hash::FxHashMap;
72use smallvec::SmallVec;
73use std::collections::BTreeMap;
74use std::sync::atomic::{AtomicU64, Ordering};
75use std::sync::Arc;
76
77/// Type of temporal join semantics.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
79pub enum TemporalJoinSemantics {
80    /// Event-time: Lookup value valid at event timestamp.
81    /// Deterministic, requires versioned table.
82    #[default]
83    EventTime,
84
85    /// Process-time: Lookup current value at processing time.
86    /// Non-deterministic, simpler but less predictable.
87    ProcessTime,
88}
89
90/// Table update characteristics.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
92pub enum TableCharacteristics {
93    /// Append-only: Only inserts, no updates/deletes.
94    /// Optimization: No state needed on streaming side.
95    #[default]
96    AppendOnly,
97
98    /// Non-append-only: Has updates and/or deletes.
99    /// Requires state to track which rows were joined.
100    NonAppendOnly,
101}
102
103/// Type of temporal join to perform.
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
105pub enum TemporalJoinType {
106    /// Inner join - only emit when a match is found.
107    #[default]
108    Inner,
109    /// Left outer join - emit all stream events, with nulls for unmatched.
110    Left,
111}
112
113impl TemporalJoinType {
114    /// Returns true if unmatched stream events should be emitted.
115    #[must_use]
116    pub fn emits_unmatched(&self) -> bool {
117        matches!(self, TemporalJoinType::Left)
118    }
119}
120
121/// Configuration for a temporal join operator.
122#[derive(Debug, Clone)]
123pub struct TemporalJoinConfig {
124    /// Column name in the stream to use as lookup key.
125    pub stream_key_column: String,
126    /// Column name in the table that matches the stream key.
127    pub table_key_column: String,
128    /// Column name for the version timestamp in the table.
129    pub table_version_column: String,
130    /// Join semantics (event-time or process-time).
131    pub semantics: TemporalJoinSemantics,
132    /// Table characteristics (append-only or non-append-only).
133    pub table_characteristics: TableCharacteristics,
134    /// Type of join to perform.
135    pub join_type: TemporalJoinType,
136    /// Operator ID for checkpointing.
137    pub operator_id: Option<String>,
138    /// Maximum number of versions to retain per key (0 = unlimited).
139    pub max_versions_per_key: usize,
140}
141
142impl TemporalJoinConfig {
143    /// Creates a new builder for temporal join configuration.
144    #[must_use]
145    pub fn builder() -> TemporalJoinConfigBuilder {
146        TemporalJoinConfigBuilder::default()
147    }
148}
149
150/// Builder for [`TemporalJoinConfig`].
151#[derive(Debug, Default)]
152pub struct TemporalJoinConfigBuilder {
153    stream_key_column: Option<String>,
154    table_key_column: Option<String>,
155    table_version_column: Option<String>,
156    semantics: Option<TemporalJoinSemantics>,
157    table_characteristics: Option<TableCharacteristics>,
158    join_type: Option<TemporalJoinType>,
159    operator_id: Option<String>,
160    max_versions_per_key: Option<usize>,
161}
162
163impl TemporalJoinConfigBuilder {
164    /// Sets the stream key column name.
165    #[must_use]
166    pub fn stream_key_column(mut self, column: String) -> Self {
167        self.stream_key_column = Some(column);
168        self
169    }
170
171    /// Sets the table key column name.
172    #[must_use]
173    pub fn table_key_column(mut self, column: String) -> Self {
174        self.table_key_column = Some(column);
175        self
176    }
177
178    /// Sets the table version column name.
179    #[must_use]
180    pub fn table_version_column(mut self, column: String) -> Self {
181        self.table_version_column = Some(column);
182        self
183    }
184
185    /// Sets the join semantics.
186    #[must_use]
187    pub fn semantics(mut self, semantics: TemporalJoinSemantics) -> Self {
188        self.semantics = Some(semantics);
189        self
190    }
191
192    /// Sets the table characteristics.
193    #[must_use]
194    pub fn table_characteristics(mut self, characteristics: TableCharacteristics) -> Self {
195        self.table_characteristics = Some(characteristics);
196        self
197    }
198
199    /// Sets the join type.
200    #[must_use]
201    pub fn join_type(mut self, join_type: TemporalJoinType) -> Self {
202        self.join_type = Some(join_type);
203        self
204    }
205
206    /// Sets a custom operator ID.
207    #[must_use]
208    pub fn operator_id(mut self, id: String) -> Self {
209        self.operator_id = Some(id);
210        self
211    }
212
213    /// Sets the maximum number of versions to retain per key.
214    #[must_use]
215    pub fn max_versions_per_key(mut self, max: usize) -> Self {
216        self.max_versions_per_key = Some(max);
217        self
218    }
219
220    /// Builds the configuration.
221    ///
222    /// # Errors
223    ///
224    /// Returns `OperatorError::ConfigError` if required fields are not set.
225    pub fn build(self) -> std::result::Result<TemporalJoinConfig, OperatorError> {
226        Ok(TemporalJoinConfig {
227            stream_key_column: self.stream_key_column.ok_or_else(|| {
228                OperatorError::ConfigError("stream_key_column is required".into())
229            })?,
230            table_key_column: self
231                .table_key_column
232                .ok_or_else(|| OperatorError::ConfigError("table_key_column is required".into()))?,
233            table_version_column: self.table_version_column.ok_or_else(|| {
234                OperatorError::ConfigError("table_version_column is required".into())
235            })?,
236            semantics: self.semantics.unwrap_or_default(),
237            table_characteristics: self.table_characteristics.unwrap_or_default(),
238            join_type: self.join_type.unwrap_or_default(),
239            operator_id: self.operator_id,
240            max_versions_per_key: self.max_versions_per_key.unwrap_or(0),
241        })
242    }
243}
244
245/// Timer key prefix for version cleanup.
246const TEMPORAL_TIMER_PREFIX: u8 = 0x60;
247
248/// Static counter for generating unique operator IDs.
249static TEMPORAL_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
250
251/// A stored table row for temporal joining.
252#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
253pub struct TableRow {
254    /// Version timestamp (when this row became valid).
255    pub version_timestamp: i64,
256    /// Serialized key value.
257    pub key_value: Vec<u8>,
258    /// Serialized record batch data.
259    pub data: Vec<u8>,
260}
261
262impl TableRow {
263    /// Creates a new table row from a record batch.
264    fn new(
265        version_timestamp: i64,
266        key_value: Vec<u8>,
267        batch: &RecordBatch,
268    ) -> Result<Self, OperatorError> {
269        let data = Self::serialize_batch(batch)?;
270        Ok(Self {
271            version_timestamp,
272            key_value,
273            data,
274        })
275    }
276
277    fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
278        Ok(crate::serialization::serialize_batch_stream(batch)?)
279    }
280
281    fn deserialize_batch(data: &[u8]) -> Result<RecordBatch, OperatorError> {
282        Ok(crate::serialization::deserialize_batch_stream(data)?)
283    }
284
285    /// Converts this row back to a record batch.
286    ///
287    /// # Errors
288    ///
289    /// Returns `OperatorError::SerializationFailed` if the batch data is invalid.
290    pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
291        Self::deserialize_batch(&self.data)
292    }
293}
294
295/// Record of a joined event for retraction tracking (non-append-only tables).
296#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
297pub struct JoinedEventRecord {
298    /// Original stream event timestamp.
299    pub event_timestamp: i64,
300    /// Serialized stream event data.
301    pub event_data: Vec<u8>,
302    /// Table row version that was joined.
303    pub table_version: i64,
304    /// The key value used for joining.
305    pub key_value: Vec<u8>,
306}
307
308/// Per-key versioned table state.
309#[derive(Debug, Clone, Default)]
310pub struct VersionedKeyState {
311    /// Rows indexed by version timestamp.
312    /// Multiple rows at the same timestamp stored in a vector.
313    pub versions: BTreeMap<i64, SmallVec<[TableRow; 1]>>,
314    /// Minimum version timestamp.
315    pub min_version: i64,
316    /// Maximum version timestamp.
317    pub max_version: i64,
318}
319
320impl VersionedKeyState {
321    /// Creates a new empty key state.
322    #[must_use]
323    pub fn new() -> Self {
324        Self {
325            versions: BTreeMap::new(),
326            min_version: i64::MAX,
327            max_version: i64::MIN,
328        }
329    }
330
331    /// Inserts a table row.
332    pub fn insert(&mut self, row: TableRow) {
333        let version = row.version_timestamp;
334        self.versions.entry(version).or_default().push(row);
335        self.min_version = self.min_version.min(version);
336        self.max_version = self.max_version.max(version);
337    }
338
339    /// Returns the number of rows in this key's state.
340    #[must_use]
341    pub fn len(&self) -> usize {
342        self.versions.values().map(SmallVec::len).sum()
343    }
344
345    /// Returns true if this key has no rows.
346    #[must_use]
347    pub fn is_empty(&self) -> bool {
348        self.versions.is_empty()
349    }
350
351    /// Finds the row valid at the given timestamp.
352    /// Returns the row with the largest `version_ts` <= `event_ts`.
353    #[must_use]
354    pub fn lookup_at_time(&self, timestamp: i64) -> Option<&TableRow> {
355        let (_, rows) = self.versions.range(..=timestamp).next_back()?;
356        rows.last()
357    }
358
359    /// Finds the latest row (for process-time lookups).
360    #[must_use]
361    pub fn lookup_latest(&self) -> Option<&TableRow> {
362        let (_, rows) = self.versions.iter().next_back()?;
363        rows.last()
364    }
365
366    /// Removes versions before the given timestamp.
367    pub fn cleanup_before(&mut self, threshold: i64) {
368        self.versions = self.versions.split_off(&threshold);
369        self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
370    }
371
372    /// Removes a specific version (for non-append-only table deletes).
373    pub fn remove_version(&mut self, version: i64) -> Option<SmallVec<[TableRow; 1]>> {
374        let removed = self.versions.remove(&version);
375        if removed.is_some() {
376            self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
377            self.max_version = self
378                .versions
379                .keys()
380                .next_back()
381                .copied()
382                .unwrap_or(i64::MIN);
383        }
384        removed
385    }
386
387    /// Limits the number of versions (keeps most recent).
388    pub fn limit_versions(&mut self, max_versions: usize) {
389        if max_versions == 0 || self.versions.len() <= max_versions {
390            return;
391        }
392
393        let to_remove = self.versions.len() - max_versions;
394        // Use split_off to avoid intermediate Vec allocation.
395        // BTreeMap is sorted ascending, so nth(to_remove) gives the first key to keep.
396        if let Some(&split_key) = self.versions.keys().nth(to_remove) {
397            self.versions = self.versions.split_off(&split_key);
398        }
399        self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
400    }
401}
402
403/// Serializable version of `VersionedKeyState` for checkpointing.
404#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
405struct SerializableVersionedKeyState {
406    versions: Vec<(i64, Vec<TableRow>)>,
407    min_version: i64,
408    max_version: i64,
409}
410
411impl From<&VersionedKeyState> for SerializableVersionedKeyState {
412    fn from(state: &VersionedKeyState) -> Self {
413        Self {
414            versions: state
415                .versions
416                .iter()
417                .map(|(ts, rows)| (*ts, rows.to_vec()))
418                .collect(),
419            min_version: state.min_version,
420            max_version: state.max_version,
421        }
422    }
423}
424
425impl From<SerializableVersionedKeyState> for VersionedKeyState {
426    fn from(state: SerializableVersionedKeyState) -> Self {
427        let mut versions = BTreeMap::new();
428        for (ts, rows) in state.versions {
429            versions.insert(ts, SmallVec::from_vec(rows));
430        }
431        Self {
432            versions,
433            min_version: state.min_version,
434            max_version: state.max_version,
435        }
436    }
437}
438
439/// Table change event for non-append-only tables.
440#[derive(Debug, Clone)]
441pub enum TableChange {
442    /// Insert a new row.
443    Insert(TableRow),
444    /// Update an existing row (provides old and new versions).
445    Update {
446        /// The old row being replaced.
447        old: TableRow,
448        /// The new row.
449        new: TableRow,
450    },
451    /// Delete a row.
452    Delete(TableRow),
453}
454
455/// Metrics for tracking temporal join operations.
456#[derive(Debug, Clone, Default)]
457pub struct TemporalJoinMetrics {
458    /// Number of stream events processed.
459    pub stream_events: u64,
460    /// Number of table inserts processed.
461    pub table_inserts: u64,
462    /// Number of table updates processed.
463    pub table_updates: u64,
464    /// Number of table deletes processed.
465    pub table_deletes: u64,
466    /// Number of successful matches.
467    pub matches: u64,
468    /// Number of unmatched stream events.
469    pub unmatched: u64,
470    /// Number of retractions emitted.
471    pub retractions: u64,
472    /// Number of state cleanup operations.
473    pub state_cleanups: u64,
474}
475
476impl TemporalJoinMetrics {
477    /// Creates new metrics.
478    #[must_use]
479    pub fn new() -> Self {
480        Self::default()
481    }
482
483    /// Resets all counters.
484    pub fn reset(&mut self) {
485        *self = Self::default();
486    }
487}
488
489/// Temporal join operator.
490///
491/// Performs point-in-time lookups against a versioned table. Supports both
492/// append-only tables (optimized, no stream-side state) and non-append-only
493/// tables (with retraction support).
494pub struct TemporalJoinOperator {
495    /// Configuration.
496    config: TemporalJoinConfig,
497    /// Operator ID.
498    operator_id: String,
499    /// Versioned table state: key -> versions.
500    table_state: FxHashMap<Vec<u8>, VersionedKeyState>,
501    /// Stream event state for retraction tracking (non-append-only only).
502    /// key -> list of joined event records.
503    stream_state: FxHashMap<Vec<u8>, Vec<JoinedEventRecord>>,
504    /// Current watermark.
505    watermark: i64,
506    /// Metrics.
507    metrics: TemporalJoinMetrics,
508    /// Output schema (lazily initialized).
509    output_schema: Option<SchemaRef>,
510    /// Stream schema.
511    stream_schema: Option<SchemaRef>,
512    /// Table schema.
513    table_schema: Option<SchemaRef>,
514    /// Cached column index for stream key — resolved on first stream event.
515    stream_key_index: Option<usize>,
516    /// Cached column index for table key — resolved on first table event.
517    table_key_index: Option<usize>,
518    /// Cached column index for table timestamp — resolved on first table event.
519    table_ts_index: Option<usize>,
520}
521
522impl TemporalJoinOperator {
523    /// Creates a new temporal join operator.
524    #[must_use]
525    pub fn new(config: TemporalJoinConfig) -> Self {
526        let operator_id = config.operator_id.clone().unwrap_or_else(|| {
527            let num = TEMPORAL_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
528            format!("temporal_join_{num}")
529        });
530
531        Self {
532            config,
533            operator_id,
534            table_state: FxHashMap::default(),
535            stream_state: FxHashMap::default(),
536            watermark: i64::MIN,
537            metrics: TemporalJoinMetrics::new(),
538            output_schema: None,
539            stream_schema: None,
540            table_schema: None,
541            stream_key_index: None,
542            table_key_index: None,
543            table_ts_index: None,
544        }
545    }
546
547    /// Creates a new temporal join operator with explicit ID.
548    #[must_use]
549    pub fn with_id(mut config: TemporalJoinConfig, operator_id: String) -> Self {
550        config.operator_id = Some(operator_id);
551        Self::new(config)
552    }
553
554    /// Returns the configuration.
555    #[must_use]
556    pub fn config(&self) -> &TemporalJoinConfig {
557        &self.config
558    }
559
560    /// Returns the metrics.
561    #[must_use]
562    pub fn metrics(&self) -> &TemporalJoinMetrics {
563        &self.metrics
564    }
565
566    /// Resets the metrics.
567    pub fn reset_metrics(&mut self) {
568        self.metrics.reset();
569    }
570
571    /// Returns the current watermark.
572    #[must_use]
573    pub fn watermark(&self) -> i64 {
574        self.watermark
575    }
576
577    /// Returns the total number of table rows in state.
578    #[must_use]
579    pub fn table_state_size(&self) -> usize {
580        self.table_state.values().map(VersionedKeyState::len).sum()
581    }
582
583    /// Returns the total number of tracked stream events (non-append-only only).
584    #[must_use]
585    pub fn stream_state_size(&self) -> usize {
586        self.stream_state.values().map(Vec::len).sum()
587    }
588
589    /// Processes a stream event (probe side).
590    pub fn process_stream(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
591        self.metrics.stream_events += 1;
592
593        // Capture stream schema
594        if self.stream_schema.is_none() {
595            self.stream_schema = Some(event.data.schema());
596            self.update_output_schema();
597        }
598
599        let mut output = OutputVec::new();
600
601        // Extract join key
602        let Some(key_value) = Self::extract_key(
603            &event.data,
604            &self.config.stream_key_column,
605            &mut self.stream_key_index,
606        ) else {
607            return output;
608        };
609
610        // Determine lookup timestamp
611        let lookup_ts = match self.config.semantics {
612            TemporalJoinSemantics::EventTime => event.timestamp,
613            TemporalJoinSemantics::ProcessTime => ctx.processing_time,
614        };
615
616        // Find matching table row
617        if let Some(table_row) = self.lookup_table(&key_value, lookup_ts) {
618            self.metrics.matches += 1;
619
620            // Track join for potential retraction (non-append-only)
621            if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
622                if let Ok(event_data) = TableRow::serialize_batch(&event.data) {
623                    let record = JoinedEventRecord {
624                        event_timestamp: event.timestamp,
625                        event_data,
626                        table_version: table_row.version_timestamp,
627                        key_value: key_value.clone(),
628                    };
629                    self.stream_state.entry(key_value).or_default().push(record);
630                }
631            }
632
633            // Create joined output
634            if let Some(joined) = self.create_joined_event(event, &table_row) {
635                output.push(Output::Event(joined));
636            }
637        } else {
638            self.metrics.unmatched += 1;
639            if self.config.join_type.emits_unmatched() {
640                if let Some(unmatched) = self.create_unmatched_event(event) {
641                    output.push(Output::Event(unmatched));
642                }
643            }
644        }
645
646        output
647    }
648
649    /// Processes a table insert (append-only or non-append-only).
650    pub fn process_table_insert(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
651        self.metrics.table_inserts += 1;
652
653        // Capture table schema
654        if self.table_schema.is_none() {
655            self.table_schema = Some(event.data.schema());
656            self.update_output_schema();
657        }
658
659        // Extract key and version
660        let Some(key_value) = Self::extract_key(
661            &event.data,
662            &self.config.table_key_column,
663            &mut self.table_key_index,
664        ) else {
665            return OutputVec::new();
666        };
667
668        let version_ts = Self::extract_timestamp(
669            &event.data,
670            &self.config.table_version_column,
671            &mut self.table_ts_index,
672        )
673        .unwrap_or(event.timestamp);
674
675        // Create and store table row
676        let Ok(row) = TableRow::new(version_ts, key_value.clone(), &event.data) else {
677            return OutputVec::new();
678        };
679
680        let key_state = self.table_state.entry(key_value).or_default();
681        key_state.insert(row);
682
683        // Limit versions if configured
684        if self.config.max_versions_per_key > 0 {
685            key_state.limit_versions(self.config.max_versions_per_key);
686        }
687
688        OutputVec::new()
689    }
690
691    /// Processes a table change (non-append-only tables).
692    /// Returns retractions and new join results for affected stream events.
693    pub fn process_table_change(
694        &mut self,
695        change: &TableChange,
696        _ctx: &mut OperatorContext,
697    ) -> OutputVec {
698        if self.config.table_characteristics != TableCharacteristics::NonAppendOnly {
699            // For append-only tables, only inserts are valid
700            if let TableChange::Insert(row) = change {
701                let key_state = self.table_state.entry(row.key_value.clone()).or_default();
702                key_state.insert(row.clone());
703            }
704            return OutputVec::new();
705        }
706
707        let mut output = OutputVec::new();
708
709        match change {
710            TableChange::Insert(row) => {
711                self.metrics.table_inserts += 1;
712                let key_state = self.table_state.entry(row.key_value.clone()).or_default();
713                key_state.insert(row.clone());
714            }
715            TableChange::Update { old, new } => {
716                self.metrics.table_updates += 1;
717
718                // Emit retractions for events that joined with old version
719                self.emit_retractions_for_version(
720                    &old.key_value,
721                    old.version_timestamp,
722                    &mut output,
723                );
724
725                // Update table state
726                if let Some(key_state) = self.table_state.get_mut(&old.key_value) {
727                    key_state.remove_version(old.version_timestamp);
728                }
729                let key_state = self.table_state.entry(new.key_value.clone()).or_default();
730                key_state.insert(new.clone());
731
732                // Re-emit joins for affected events with new version
733                self.rejoin_affected_events(&new.key_value, new.version_timestamp, &mut output);
734            }
735            TableChange::Delete(row) => {
736                self.metrics.table_deletes += 1;
737
738                // Emit retractions for events that joined with deleted version
739                self.emit_retractions_for_version(
740                    &row.key_value,
741                    row.version_timestamp,
742                    &mut output,
743                );
744
745                // Remove from table state
746                if let Some(key_state) = self.table_state.get_mut(&row.key_value) {
747                    key_state.remove_version(row.version_timestamp);
748                }
749            }
750        }
751
752        output
753    }
754
755    /// Emits retractions for all stream events that joined with a specific table version.
756    fn emit_retractions_for_version(&mut self, key: &[u8], version: i64, output: &mut OutputVec) {
757        let Some(records) = self.stream_state.get(key) else {
758            return;
759        };
760
761        for record in records {
762            if record.table_version == version {
763                // Emit retraction (reconstruct the joined event and mark as retraction)
764                if let Ok(event_batch) = TableRow::deserialize_batch(&record.event_data) {
765                    let event = Event::new(record.event_timestamp, event_batch);
766
767                    // Look up the old table row to reconstruct the join
768                    if let Some(key_state) = self.table_state.get(key) {
769                        if let Some((_, rows)) = key_state.versions.get_key_value(&version) {
770                            if let Some(table_row) = rows.last() {
771                                if let Some(joined) = self.create_joined_event(&event, table_row) {
772                                    // Emit as late event with retraction semantic
773                                    // In a full implementation, this would be Output::Retraction
774                                    output.push(Output::LateEvent(joined));
775                                    self.metrics.retractions += 1;
776                                }
777                            }
778                        }
779                    }
780                }
781            }
782        }
783    }
784
785    /// Re-joins affected stream events with a new table version.
786    fn rejoin_affected_events(&mut self, key: &[u8], new_version: i64, output: &mut OutputVec) {
787        // Collect events that need re-joining
788        let events_to_rejoin: Vec<(i64, Vec<u8>)> = {
789            let Some(records) = self.stream_state.get(key) else {
790                return;
791            };
792            let Some(key_state) = self.table_state.get(key) else {
793                return;
794            };
795
796            records
797                .iter()
798                .filter_map(|record| {
799                    let lookup_ts = record.event_timestamp;
800                    if let Some(new_row) = key_state.lookup_at_time(lookup_ts) {
801                        if new_row.version_timestamp == new_version {
802                            return Some((record.event_timestamp, record.event_data.clone()));
803                        }
804                    }
805                    None
806                })
807                .collect()
808        };
809
810        // Now emit the rejoined events
811        if let Some(key_state) = self.table_state.get(key) {
812            for (event_ts, event_data) in &events_to_rejoin {
813                if let Ok(event_batch) = TableRow::deserialize_batch(event_data) {
814                    let event = Event::new(*event_ts, event_batch);
815                    if let Some(new_row) = key_state.lookup_at_time(*event_ts) {
816                        if let Some(joined) = self.create_joined_event(&event, new_row) {
817                            output.push(Output::Event(joined));
818                        }
819                    }
820                }
821            }
822        }
823
824        // Update tracked versions
825        if let Some(records) = self.stream_state.get_mut(key) {
826            for record in records.iter_mut() {
827                if events_to_rejoin
828                    .iter()
829                    .any(|(ts, _)| *ts == record.event_timestamp)
830                {
831                    record.table_version = new_version;
832                }
833            }
834        }
835    }
836
837    /// Looks up a table row for the given key and timestamp.
838    fn lookup_table(&self, key: &[u8], timestamp: i64) -> Option<TableRow> {
839        let key_state = self.table_state.get(key)?;
840
841        match self.config.semantics {
842            TemporalJoinSemantics::EventTime => key_state.lookup_at_time(timestamp).cloned(),
843            TemporalJoinSemantics::ProcessTime => key_state.lookup_latest().cloned(),
844        }
845    }
846
847    /// Handles watermark updates and triggers cleanup.
848    pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
849        self.watermark = watermark;
850
851        // Cleanup old stream state for non-append-only
852        if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
853            self.cleanup_stream_state(watermark);
854        }
855
856        OutputVec::new()
857    }
858
859    /// Cleans up stream state for events that can no longer receive retractions.
860    fn cleanup_stream_state(&mut self, watermark: i64) {
861        let initial_count: usize = self.stream_state.values().map(Vec::len).sum();
862
863        for records in self.stream_state.values_mut() {
864            records.retain(|r| r.event_timestamp >= watermark);
865        }
866        self.stream_state.retain(|_, v| !v.is_empty());
867
868        let final_count: usize = self.stream_state.values().map(Vec::len).sum();
869        if final_count < initial_count {
870            self.metrics.state_cleanups += (initial_count - final_count) as u64;
871        }
872    }
873
874    /// Extracts the key value from a record batch.
875    ///
876    /// Caches the column index on first call to avoid per-event schema lookups.
877    fn extract_key(
878        batch: &RecordBatch,
879        column_name: &str,
880        cached_index: &mut Option<usize>,
881    ) -> Option<Vec<u8>> {
882        let column_index = if let Some(idx) = *cached_index {
883            idx
884        } else {
885            let idx = batch.schema().index_of(column_name).ok()?;
886            *cached_index = Some(idx);
887            idx
888        };
889        let column = batch.column(column_index);
890
891        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
892            if string_array.is_empty() || string_array.is_null(0) {
893                return None;
894            }
895            return Some(string_array.value(0).as_bytes().to_vec());
896        }
897
898        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
899            if int_array.is_empty() || int_array.is_null(0) {
900                return None;
901            }
902            return Some(int_array.value(0).to_le_bytes().to_vec());
903        }
904
905        None
906    }
907
908    /// Extracts a timestamp value from a record batch.
909    ///
910    /// Caches the column index on first call to avoid per-event schema lookups.
911    fn extract_timestamp(
912        batch: &RecordBatch,
913        column_name: &str,
914        cached_index: &mut Option<usize>,
915    ) -> Option<i64> {
916        let column_index = if let Some(idx) = *cached_index {
917            idx
918        } else {
919            let idx = batch.schema().index_of(column_name).ok()?;
920            *cached_index = Some(idx);
921            idx
922        };
923        let column = batch.column(column_index);
924
925        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
926            if int_array.is_empty() || int_array.is_null(0) {
927                return None;
928            }
929            return Some(int_array.value(0));
930        }
931
932        None
933    }
934
935    /// Creates a timer key for cleanup.
936    /// Reserved for future use when timer-based cleanup is implemented.
937    #[allow(dead_code)]
938    fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
939        let mut key = TimerKey::new();
940        key.push(TEMPORAL_TIMER_PREFIX);
941        key.extend_from_slice(key_suffix);
942        key
943    }
944
945    /// Updates the output schema when both input schemas are known.
946    fn update_output_schema(&mut self) {
947        if let (Some(stream), Some(table)) = (&self.stream_schema, &self.table_schema) {
948            let mut fields: Vec<Field> =
949                Vec::with_capacity(stream.fields().len() + table.fields().len());
950            fields.extend(stream.fields().iter().map(|f| f.as_ref().clone()));
951
952            // Add table fields, prefixing duplicates
953            for field in table.fields() {
954                let name = if stream.field_with_name(field.name()).is_ok() {
955                    format!("table_{}", field.name())
956                } else {
957                    field.name().clone()
958                };
959                fields.push(Field::new(
960                    name,
961                    field.data_type().clone(),
962                    true, // Nullable for outer joins
963                ));
964            }
965
966            self.output_schema = Some(Arc::new(Schema::new(fields)));
967        }
968    }
969
970    /// Creates a joined event from stream event and table row.
971    fn create_joined_event(&self, stream_event: &Event, table_row: &TableRow) -> Option<Event> {
972        let schema = self.output_schema.as_ref()?;
973        let table_batch = table_row.to_batch().ok()?;
974
975        let stream_cols = stream_event.data.columns();
976        let table_cols = table_batch.columns();
977        let mut columns: Vec<ArrayRef> = Vec::with_capacity(stream_cols.len() + table_cols.len());
978        columns.extend_from_slice(stream_cols);
979        for column in table_cols {
980            columns.push(Arc::clone(column));
981        }
982
983        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
984
985        Some(Event::new(stream_event.timestamp, joined_batch))
986    }
987
988    /// Creates an unmatched event for left outer joins (with null table columns).
989    fn create_unmatched_event(&self, stream_event: &Event) -> Option<Event> {
990        let schema = self.output_schema.as_ref()?;
991        let table_schema = self.table_schema.as_ref()?;
992
993        let num_rows = stream_event.data.num_rows();
994        let stream_cols = stream_event.data.columns();
995        let mut columns: Vec<ArrayRef> =
996            Vec::with_capacity(stream_cols.len() + table_schema.fields().len());
997        columns.extend_from_slice(stream_cols);
998
999        // Add null columns for table side
1000        for field in table_schema.fields() {
1001            columns.push(Self::create_null_array(field.data_type(), num_rows));
1002        }
1003
1004        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
1005
1006        Some(Event::new(stream_event.timestamp, joined_batch))
1007    }
1008
1009    /// Creates a null array of the given type and length.
1010    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
1011        match data_type {
1012            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
1013            DataType::Float64 => {
1014                use arrow_array::Float64Array;
1015                Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
1016            }
1017            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
1018        }
1019    }
1020}
1021
1022impl Operator for TemporalJoinOperator {
1023    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1024        // Default to processing as stream event
1025        self.process_stream(event, ctx)
1026    }
1027
1028    fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
1029        if timer.key.first() == Some(&TEMPORAL_TIMER_PREFIX) {
1030            // Cleanup triggered
1031            self.cleanup_stream_state(timer.timestamp);
1032        }
1033        OutputVec::new()
1034    }
1035
1036    fn checkpoint(&self) -> OperatorState {
1037        // Serialize table state
1038        let table_entries: Vec<(Vec<u8>, SerializableVersionedKeyState)> = self
1039            .table_state
1040            .iter()
1041            .map(|(k, v)| (k.clone(), v.into()))
1042            .collect();
1043
1044        // Serialize stream state (for non-append-only)
1045        let stream_entries: Vec<(Vec<u8>, Vec<JoinedEventRecord>)> = self
1046            .stream_state
1047            .iter()
1048            .map(|(k, v)| (k.clone(), v.clone()))
1049            .collect();
1050
1051        let checkpoint_data = (
1052            self.watermark,
1053            self.metrics.stream_events,
1054            self.metrics.table_inserts,
1055            self.metrics.matches,
1056            self.metrics.unmatched,
1057            self.metrics.retractions,
1058            table_entries,
1059            stream_entries,
1060        );
1061
1062        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
1063            .map(|v| v.to_vec())
1064            .unwrap_or_default();
1065
1066        OperatorState {
1067            operator_id: self.operator_id.clone(),
1068            data,
1069        }
1070    }
1071
1072    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
1073        type CheckpointData = (
1074            i64,
1075            u64,
1076            u64,
1077            u64,
1078            u64,
1079            u64,
1080            Vec<(Vec<u8>, SerializableVersionedKeyState)>,
1081            Vec<(Vec<u8>, Vec<JoinedEventRecord>)>,
1082        );
1083
1084        if state.operator_id != self.operator_id {
1085            return Err(OperatorError::StateAccessFailed(format!(
1086                "Operator ID mismatch: expected {}, got {}",
1087                self.operator_id, state.operator_id
1088            )));
1089        }
1090
1091        let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
1092            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1093        let (
1094            watermark,
1095            stream_events,
1096            table_inserts,
1097            matches,
1098            unmatched,
1099            retractions,
1100            table_entries,
1101            stream_entries,
1102        ) = rkyv::deserialize::<CheckpointData, RkyvError>(archived)
1103            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1104
1105        self.watermark = watermark;
1106        self.metrics.stream_events = stream_events;
1107        self.metrics.table_inserts = table_inserts;
1108        self.metrics.matches = matches;
1109        self.metrics.unmatched = unmatched;
1110        self.metrics.retractions = retractions;
1111
1112        // Restore table state
1113        self.table_state.clear();
1114        for (key, serializable) in table_entries {
1115            self.table_state.insert(key, serializable.into());
1116        }
1117
1118        // Restore stream state
1119        self.stream_state.clear();
1120        for (key, records) in stream_entries {
1121            self.stream_state.insert(key, records);
1122        }
1123
1124        Ok(())
1125    }
1126}
1127
1128#[cfg(test)]
1129#[allow(clippy::cast_precision_loss)]
1130#[allow(clippy::unnecessary_to_owned)]
1131mod tests {
1132    use super::*;
1133    use crate::state::{InMemoryStore, StateStore};
1134    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
1135    use arrow_array::Float64Array;
1136    use arrow_schema::{DataType, Field, Schema};
1137
1138    /// Creates an order event for testing.
1139    fn create_order_event(timestamp: i64, currency: &str, amount: f64) -> Event {
1140        let schema = Arc::new(Schema::new(vec![
1141            Field::new("currency", DataType::Utf8, false),
1142            Field::new("amount", DataType::Float64, false),
1143        ]));
1144        let batch = RecordBatch::try_new(
1145            schema,
1146            vec![
1147                Arc::new(StringArray::from(vec![currency])),
1148                Arc::new(Float64Array::from(vec![amount])),
1149            ],
1150        )
1151        .unwrap();
1152        Event::new(timestamp, batch)
1153    }
1154
1155    /// Creates a currency rate event for testing.
1156    fn create_rate_event(timestamp: i64, currency: &str, rate: f64, valid_from: i64) -> Event {
1157        let schema = Arc::new(Schema::new(vec![
1158            Field::new("currency", DataType::Utf8, false),
1159            Field::new("rate", DataType::Float64, false),
1160            Field::new("valid_from", DataType::Int64, false),
1161        ]));
1162        let batch = RecordBatch::try_new(
1163            schema,
1164            vec![
1165                Arc::new(StringArray::from(vec![currency])),
1166                Arc::new(Float64Array::from(vec![rate])),
1167                Arc::new(Int64Array::from(vec![valid_from])),
1168            ],
1169        )
1170        .unwrap();
1171        Event::new(timestamp, batch)
1172    }
1173
1174    fn create_test_context<'a>(
1175        timers: &'a mut TimerService,
1176        state: &'a mut dyn StateStore,
1177        watermark_gen: &'a mut dyn WatermarkGenerator,
1178    ) -> OperatorContext<'a> {
1179        OperatorContext {
1180            event_time: 0,
1181            processing_time: 0,
1182            timers,
1183            state,
1184            watermark_generator: watermark_gen,
1185            operator_index: 0,
1186        }
1187    }
1188
1189    #[test]
1190    fn test_temporal_join_semantics_default() {
1191        assert_eq!(
1192            TemporalJoinSemantics::default(),
1193            TemporalJoinSemantics::EventTime
1194        );
1195    }
1196
1197    #[test]
1198    fn test_table_characteristics_default() {
1199        assert_eq!(
1200            TableCharacteristics::default(),
1201            TableCharacteristics::AppendOnly
1202        );
1203    }
1204
1205    #[test]
1206    fn test_temporal_join_type_properties() {
1207        assert!(!TemporalJoinType::Inner.emits_unmatched());
1208        assert!(TemporalJoinType::Left.emits_unmatched());
1209    }
1210
1211    #[test]
1212    fn test_config_builder() {
1213        let config = TemporalJoinConfig::builder()
1214            .stream_key_column("currency".to_string())
1215            .table_key_column("currency".to_string())
1216            .table_version_column("valid_from".to_string())
1217            .semantics(TemporalJoinSemantics::EventTime)
1218            .table_characteristics(TableCharacteristics::AppendOnly)
1219            .join_type(TemporalJoinType::Left)
1220            .max_versions_per_key(100)
1221            .operator_id("test_temporal".to_string())
1222            .build()
1223            .unwrap();
1224
1225        assert_eq!(config.stream_key_column, "currency");
1226        assert_eq!(config.table_key_column, "currency");
1227        assert_eq!(config.table_version_column, "valid_from");
1228        assert_eq!(config.semantics, TemporalJoinSemantics::EventTime);
1229        assert_eq!(
1230            config.table_characteristics,
1231            TableCharacteristics::AppendOnly
1232        );
1233        assert_eq!(config.join_type, TemporalJoinType::Left);
1234        assert_eq!(config.max_versions_per_key, 100);
1235    }
1236
1237    #[test]
1238    fn test_event_time_temporal_join_basic() {
1239        let config = TemporalJoinConfig::builder()
1240            .stream_key_column("currency".to_string())
1241            .table_key_column("currency".to_string())
1242            .table_version_column("valid_from".to_string())
1243            .semantics(TemporalJoinSemantics::EventTime)
1244            .join_type(TemporalJoinType::Inner)
1245            .build()
1246            .unwrap();
1247
1248        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1249
1250        let mut timers = TimerService::new();
1251        let mut state = InMemoryStore::new();
1252        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1253
1254        // Insert rate versions
1255        // Rate 1.1 valid from t=500
1256        let rate1 = create_rate_event(500, "USD", 1.1, 500);
1257        {
1258            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1259            operator.process_table_insert(&rate1, &mut ctx);
1260        }
1261
1262        // Rate 1.2 valid from t=800
1263        let rate2 = create_rate_event(800, "USD", 1.2, 800);
1264        {
1265            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1266            operator.process_table_insert(&rate2, &mut ctx);
1267        }
1268
1269        // Rate 1.3 valid from t=1200
1270        let rate3 = create_rate_event(1200, "USD", 1.3, 1200);
1271        {
1272            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1273            operator.process_table_insert(&rate3, &mut ctx);
1274        }
1275
1276        // Order at t=1000 should join with rate 1.2 (valid from t=800)
1277        let order = create_order_event(1000, "USD", 100.0);
1278        let outputs = {
1279            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1280            operator.process_stream(&order, &mut ctx)
1281        };
1282
1283        assert_eq!(
1284            outputs
1285                .iter()
1286                .filter(|o| matches!(o, Output::Event(_)))
1287                .count(),
1288            1
1289        );
1290        assert_eq!(operator.metrics().matches, 1);
1291
1292        // Verify output has both order and rate columns
1293        if let Some(Output::Event(event)) = outputs.first() {
1294            assert_eq!(event.data.num_columns(), 5); // 2 order + 3 rate
1295        }
1296    }
1297
1298    #[test]
1299    fn test_event_time_multiple_versions() {
1300        let config = TemporalJoinConfig::builder()
1301            .stream_key_column("currency".to_string())
1302            .table_key_column("currency".to_string())
1303            .table_version_column("valid_from".to_string())
1304            .semantics(TemporalJoinSemantics::EventTime)
1305            .join_type(TemporalJoinType::Inner)
1306            .build()
1307            .unwrap();
1308
1309        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1310
1311        let mut timers = TimerService::new();
1312        let mut state = InMemoryStore::new();
1313        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1314
1315        // Insert multiple rate versions
1316        {
1317            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1318            operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1319            operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1320            operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1321        }
1322
1323        // Order at t=150 should join with rate 1.0 (valid from t=100)
1324        let order1 = create_order_event(150, "USD", 100.0);
1325        let outputs1 = {
1326            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1327            operator.process_stream(&order1, &mut ctx)
1328        };
1329        assert_eq!(
1330            outputs1
1331                .iter()
1332                .filter(|o| matches!(o, Output::Event(_)))
1333                .count(),
1334            1
1335        );
1336
1337        // Order at t=250 should join with rate 1.1 (valid from t=200)
1338        let order2 = create_order_event(250, "USD", 100.0);
1339        let outputs2 = {
1340            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1341            operator.process_stream(&order2, &mut ctx)
1342        };
1343        assert_eq!(
1344            outputs2
1345                .iter()
1346                .filter(|o| matches!(o, Output::Event(_)))
1347                .count(),
1348            1
1349        );
1350
1351        // Order at t=350 should join with rate 1.2 (valid from t=300)
1352        let order3 = create_order_event(350, "USD", 100.0);
1353        let outputs3 = {
1354            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1355            operator.process_stream(&order3, &mut ctx)
1356        };
1357        assert_eq!(
1358            outputs3
1359                .iter()
1360                .filter(|o| matches!(o, Output::Event(_)))
1361                .count(),
1362            1
1363        );
1364
1365        assert_eq!(operator.metrics().matches, 3);
1366    }
1367
1368    #[test]
1369    fn test_no_match_before_first_version() {
1370        let config = TemporalJoinConfig::builder()
1371            .stream_key_column("currency".to_string())
1372            .table_key_column("currency".to_string())
1373            .table_version_column("valid_from".to_string())
1374            .semantics(TemporalJoinSemantics::EventTime)
1375            .join_type(TemporalJoinType::Inner)
1376            .build()
1377            .unwrap();
1378
1379        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1380
1381        let mut timers = TimerService::new();
1382        let mut state = InMemoryStore::new();
1383        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1384
1385        // Insert rate valid from t=500
1386        {
1387            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1388            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1389        }
1390
1391        // Order at t=400 (before any rate is valid)
1392        let order = create_order_event(400, "USD", 100.0);
1393        let outputs = {
1394            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1395            operator.process_stream(&order, &mut ctx)
1396        };
1397
1398        // Inner join - no output
1399        assert_eq!(outputs.len(), 0);
1400        assert_eq!(operator.metrics().unmatched, 1);
1401    }
1402
1403    #[test]
1404    fn test_left_join_no_match() {
1405        let config = TemporalJoinConfig::builder()
1406            .stream_key_column("currency".to_string())
1407            .table_key_column("currency".to_string())
1408            .table_version_column("valid_from".to_string())
1409            .semantics(TemporalJoinSemantics::EventTime)
1410            .join_type(TemporalJoinType::Left)
1411            .build()
1412            .unwrap();
1413
1414        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1415
1416        let mut timers = TimerService::new();
1417        let mut state = InMemoryStore::new();
1418        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1419
1420        // First, establish schemas by processing a matched event
1421        {
1422            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1423            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1424        }
1425        {
1426            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1427            operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1428        }
1429
1430        // Order for different currency (no matching rate)
1431        let order = create_order_event(700, "EUR", 100.0);
1432        let outputs = {
1433            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1434            operator.process_stream(&order, &mut ctx)
1435        };
1436
1437        // Left join should emit with nulls
1438        assert_eq!(
1439            outputs
1440                .iter()
1441                .filter(|o| matches!(o, Output::Event(_)))
1442                .count(),
1443            1
1444        );
1445        assert_eq!(operator.metrics().unmatched, 1);
1446
1447        if let Some(Output::Event(event)) = outputs.first() {
1448            assert_eq!(event.data.num_columns(), 5); // Order cols + null rate cols
1449        }
1450    }
1451
1452    #[test]
1453    fn test_process_time_semantics() {
1454        let config = TemporalJoinConfig::builder()
1455            .stream_key_column("currency".to_string())
1456            .table_key_column("currency".to_string())
1457            .table_version_column("valid_from".to_string())
1458            .semantics(TemporalJoinSemantics::ProcessTime) // Process time
1459            .join_type(TemporalJoinType::Inner)
1460            .build()
1461            .unwrap();
1462
1463        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1464
1465        let mut timers = TimerService::new();
1466        let mut state = InMemoryStore::new();
1467        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1468
1469        // Insert rate versions
1470        {
1471            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1472            operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1473            operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1474            operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1475        }
1476
1477        // Process-time lookup always gets latest version
1478        // Order at t=150 should still get rate 1.2 (latest)
1479        let order = create_order_event(150, "USD", 100.0);
1480        let outputs = {
1481            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1482            ctx.processing_time = 1000; // Processing time doesn't matter for latest
1483            operator.process_stream(&order, &mut ctx)
1484        };
1485
1486        assert_eq!(
1487            outputs
1488                .iter()
1489                .filter(|o| matches!(o, Output::Event(_)))
1490                .count(),
1491            1
1492        );
1493    }
1494
1495    #[test]
1496    fn test_append_only_no_stream_state() {
1497        let config = TemporalJoinConfig::builder()
1498            .stream_key_column("currency".to_string())
1499            .table_key_column("currency".to_string())
1500            .table_version_column("valid_from".to_string())
1501            .table_characteristics(TableCharacteristics::AppendOnly)
1502            .build()
1503            .unwrap();
1504
1505        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1506
1507        let mut timers = TimerService::new();
1508        let mut state = InMemoryStore::new();
1509        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1510
1511        // Insert rate
1512        {
1513            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1514            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1515        }
1516
1517        // Process order
1518        let order = create_order_event(600, "USD", 100.0);
1519        {
1520            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1521            operator.process_stream(&order, &mut ctx);
1522        }
1523
1524        // Append-only should not track stream events
1525        assert_eq!(operator.stream_state_size(), 0);
1526    }
1527
1528    #[test]
1529    fn test_non_append_only_tracks_stream_state() {
1530        let config = TemporalJoinConfig::builder()
1531            .stream_key_column("currency".to_string())
1532            .table_key_column("currency".to_string())
1533            .table_version_column("valid_from".to_string())
1534            .table_characteristics(TableCharacteristics::NonAppendOnly)
1535            .build()
1536            .unwrap();
1537
1538        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1539
1540        let mut timers = TimerService::new();
1541        let mut state = InMemoryStore::new();
1542        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1543
1544        // Insert rate
1545        {
1546            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1547            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1548        }
1549
1550        // Process order
1551        let order = create_order_event(600, "USD", 100.0);
1552        {
1553            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1554            operator.process_stream(&order, &mut ctx);
1555        }
1556
1557        // Non-append-only should track stream events
1558        assert_eq!(operator.stream_state_size(), 1);
1559    }
1560
1561    #[test]
1562    fn test_table_delete_emits_retraction() {
1563        let config = TemporalJoinConfig::builder()
1564            .stream_key_column("currency".to_string())
1565            .table_key_column("currency".to_string())
1566            .table_version_column("valid_from".to_string())
1567            .table_characteristics(TableCharacteristics::NonAppendOnly)
1568            .build()
1569            .unwrap();
1570
1571        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1572
1573        let mut timers = TimerService::new();
1574        let mut state = InMemoryStore::new();
1575        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1576
1577        // Insert rate and process order
1578        let rate = create_rate_event(500, "USD", 1.1, 500);
1579        {
1580            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1581            operator.process_table_insert(&rate, &mut ctx);
1582        }
1583
1584        let order = create_order_event(600, "USD", 100.0);
1585        {
1586            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1587            operator.process_stream(&order, &mut ctx);
1588        }
1589
1590        // Create table row for delete
1591        let table_row = TableRow::new(500, b"USD".to_vec(), &rate.data).unwrap();
1592
1593        // Delete the rate
1594        let change = TableChange::Delete(table_row);
1595        let outputs = {
1596            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1597            operator.process_table_change(&change, &mut ctx)
1598        };
1599
1600        // Should emit retraction
1601        assert_eq!(operator.metrics().table_deletes, 1);
1602        assert!(
1603            operator.metrics().retractions >= 1
1604                || outputs.iter().any(|o| matches!(o, Output::LateEvent(_)))
1605        );
1606    }
1607
1608    #[test]
1609    fn test_multiple_keys() {
1610        let config = TemporalJoinConfig::builder()
1611            .stream_key_column("currency".to_string())
1612            .table_key_column("currency".to_string())
1613            .table_version_column("valid_from".to_string())
1614            .build()
1615            .unwrap();
1616
1617        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1618
1619        let mut timers = TimerService::new();
1620        let mut state = InMemoryStore::new();
1621        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1622
1623        // Insert rates for different currencies
1624        {
1625            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1626            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1627            operator.process_table_insert(&create_rate_event(500, "EUR", 0.9, 500), &mut ctx);
1628            operator.process_table_insert(&create_rate_event(500, "GBP", 0.8, 500), &mut ctx);
1629        }
1630
1631        // Orders for different currencies
1632        {
1633            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1634            let outputs1 =
1635                operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1636            assert_eq!(
1637                outputs1
1638                    .iter()
1639                    .filter(|o| matches!(o, Output::Event(_)))
1640                    .count(),
1641                1
1642            );
1643        }
1644
1645        {
1646            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1647            let outputs2 =
1648                operator.process_stream(&create_order_event(600, "EUR", 100.0), &mut ctx);
1649            assert_eq!(
1650                outputs2
1651                    .iter()
1652                    .filter(|o| matches!(o, Output::Event(_)))
1653                    .count(),
1654                1
1655            );
1656        }
1657
1658        assert_eq!(operator.metrics().matches, 2);
1659    }
1660
1661    #[test]
1662    fn test_max_versions_per_key() {
1663        let config = TemporalJoinConfig::builder()
1664            .stream_key_column("currency".to_string())
1665            .table_key_column("currency".to_string())
1666            .table_version_column("valid_from".to_string())
1667            .max_versions_per_key(2) // Only keep 2 versions
1668            .build()
1669            .unwrap();
1670
1671        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1672
1673        let mut timers = TimerService::new();
1674        let mut state = InMemoryStore::new();
1675        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1676
1677        // Insert 5 versions
1678        for i in 0..5 {
1679            let rate =
1680                create_rate_event(100 * (i + 1), "USD", 1.0 + (i as f64) * 0.1, 100 * (i + 1));
1681            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1682            operator.process_table_insert(&rate, &mut ctx);
1683        }
1684
1685        // Should only have 2 versions (most recent)
1686        assert_eq!(operator.table_state_size(), 2);
1687
1688        // Should have versions for t=400 and t=500 only
1689        let key_state = operator.table_state.get(&b"USD".to_vec()).unwrap();
1690        assert!(key_state.lookup_at_time(400).is_some());
1691        assert!(key_state.lookup_at_time(500).is_some());
1692        // Earlier versions should be gone
1693        assert!(key_state.lookup_at_time(100).is_none());
1694    }
1695
1696    #[test]
1697    fn test_checkpoint_restore() {
1698        let config = TemporalJoinConfig::builder()
1699            .stream_key_column("currency".to_string())
1700            .table_key_column("currency".to_string())
1701            .table_version_column("valid_from".to_string())
1702            .table_characteristics(TableCharacteristics::NonAppendOnly)
1703            .build()
1704            .unwrap();
1705
1706        let mut operator =
1707            TemporalJoinOperator::with_id(config.clone(), "test_temporal".to_string());
1708
1709        let mut timers = TimerService::new();
1710        let mut state = InMemoryStore::new();
1711        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1712
1713        // Add some state
1714        {
1715            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1716            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1717            operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1718        }
1719
1720        {
1721            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1722            operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1723        }
1724
1725        operator.watermark = 500;
1726        operator.metrics.matches = 10;
1727        operator.metrics.retractions = 2;
1728
1729        // Checkpoint
1730        let checkpoint = operator.checkpoint();
1731
1732        // Restore to new operator
1733        let mut restored = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1734        restored.restore(checkpoint).unwrap();
1735
1736        // Verify state restored
1737        assert_eq!(restored.watermark(), 500);
1738        assert_eq!(restored.metrics().matches, 10);
1739        assert_eq!(restored.metrics().retractions, 2);
1740        assert_eq!(restored.table_state_size(), 2);
1741        assert_eq!(restored.stream_state_size(), 1);
1742    }
1743
1744    #[test]
1745    fn test_schema_composition() {
1746        let config = TemporalJoinConfig::builder()
1747            .stream_key_column("currency".to_string())
1748            .table_key_column("currency".to_string())
1749            .table_version_column("valid_from".to_string())
1750            .build()
1751            .unwrap();
1752
1753        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1754
1755        let mut timers = TimerService::new();
1756        let mut state = InMemoryStore::new();
1757        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1758
1759        // Process table to capture schema
1760        {
1761            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1762            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1763        }
1764
1765        // Process stream to capture schema and produce output
1766        let order = create_order_event(600, "USD", 100.0);
1767        let outputs = {
1768            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1769            operator.process_stream(&order, &mut ctx)
1770        };
1771
1772        assert_eq!(outputs.len(), 1);
1773
1774        if let Some(Output::Event(event)) = outputs.first() {
1775            let schema = event.data.schema();
1776
1777            // Check stream columns (order)
1778            assert!(schema.field_with_name("amount").is_ok());
1779
1780            // Check table columns (rate) - currency is duplicated so prefixed
1781            assert!(schema.field_with_name("table_currency").is_ok());
1782            assert!(schema.field_with_name("rate").is_ok());
1783            assert!(schema.field_with_name("valid_from").is_ok());
1784        }
1785    }
1786
1787    #[test]
1788    fn test_metrics_tracking() {
1789        let config = TemporalJoinConfig::builder()
1790            .stream_key_column("currency".to_string())
1791            .table_key_column("currency".to_string())
1792            .table_version_column("valid_from".to_string())
1793            .build()
1794            .unwrap();
1795
1796        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1797
1798        let mut timers = TimerService::new();
1799        let mut state = InMemoryStore::new();
1800        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1801
1802        // Process events
1803        {
1804            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1805            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1806            operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1807        }
1808
1809        {
1810            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1811            operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1812            operator.process_stream(&create_order_event(650, "USD", 200.0), &mut ctx);
1813        }
1814
1815        assert_eq!(operator.metrics().table_inserts, 2);
1816        assert_eq!(operator.metrics().stream_events, 2);
1817        assert_eq!(operator.metrics().matches, 2);
1818    }
1819
1820    #[test]
1821    fn test_versioned_key_state_operations() {
1822        let mut key_state = VersionedKeyState::new();
1823        assert!(key_state.is_empty());
1824
1825        // Insert some rows
1826        let row1 = TableRow {
1827            version_timestamp: 100,
1828            key_value: b"test".to_vec(),
1829            data: vec![],
1830        };
1831        let row2 = TableRow {
1832            version_timestamp: 200,
1833            key_value: b"test".to_vec(),
1834            data: vec![],
1835        };
1836        let row3 = TableRow {
1837            version_timestamp: 300,
1838            key_value: b"test".to_vec(),
1839            data: vec![],
1840        };
1841
1842        key_state.insert(row1);
1843        key_state.insert(row2);
1844        key_state.insert(row3);
1845
1846        assert_eq!(key_state.len(), 3);
1847        assert_eq!(key_state.min_version, 100);
1848        assert_eq!(key_state.max_version, 300);
1849
1850        // Lookup at different times
1851        assert!(key_state.lookup_at_time(50).is_none()); // Before first version
1852        assert_eq!(
1853            key_state.lookup_at_time(150).unwrap().version_timestamp,
1854            100
1855        );
1856        assert_eq!(
1857            key_state.lookup_at_time(250).unwrap().version_timestamp,
1858            200
1859        );
1860        assert_eq!(
1861            key_state.lookup_at_time(350).unwrap().version_timestamp,
1862            300
1863        );
1864
1865        // Latest lookup
1866        assert_eq!(key_state.lookup_latest().unwrap().version_timestamp, 300);
1867
1868        // Cleanup before 200
1869        key_state.cleanup_before(200);
1870        assert_eq!(key_state.len(), 2);
1871        assert_eq!(key_state.min_version, 200);
1872
1873        // Remove specific version
1874        key_state.remove_version(200);
1875        assert_eq!(key_state.len(), 1);
1876    }
1877
1878    #[test]
1879    fn test_version_limiting() {
1880        let mut key_state = VersionedKeyState::new();
1881
1882        // Insert 10 versions
1883        for i in 0..10 {
1884            key_state.insert(TableRow {
1885                version_timestamp: 100 * (i + 1),
1886                key_value: b"test".to_vec(),
1887                data: vec![],
1888            });
1889        }
1890
1891        assert_eq!(key_state.len(), 10);
1892
1893        // Limit to 3 versions
1894        key_state.limit_versions(3);
1895        assert_eq!(key_state.len(), 3);
1896
1897        // Should have versions 800, 900, 1000
1898        assert!(key_state.lookup_at_time(700).is_none());
1899        assert!(key_state.lookup_at_time(800).is_some());
1900    }
1901
1902    #[test]
1903    fn test_metrics_reset() {
1904        let mut metrics = TemporalJoinMetrics::new();
1905        metrics.stream_events = 100;
1906        metrics.matches = 50;
1907        metrics.retractions = 5;
1908
1909        metrics.reset();
1910
1911        assert_eq!(metrics.stream_events, 0);
1912        assert_eq!(metrics.matches, 0);
1913        assert_eq!(metrics.retractions, 0);
1914    }
1915
1916    #[test]
1917    fn test_table_row_serialization() {
1918        let schema = Arc::new(Schema::new(vec![
1919            Field::new("currency", DataType::Utf8, false),
1920            Field::new("rate", DataType::Float64, false),
1921        ]));
1922        let batch = RecordBatch::try_new(
1923            schema,
1924            vec![
1925                Arc::new(StringArray::from(vec!["USD"])),
1926                Arc::new(Float64Array::from(vec![1.25])),
1927            ],
1928        )
1929        .unwrap();
1930
1931        let row = TableRow::new(1000, b"USD".to_vec(), &batch).unwrap();
1932
1933        // Verify round-trip
1934        let restored_batch = row.to_batch().unwrap();
1935        assert_eq!(restored_batch.num_rows(), 1);
1936        assert_eq!(restored_batch.num_columns(), 2);
1937    }
1938
1939    #[test]
1940    fn test_stream_state_cleanup() {
1941        let config = TemporalJoinConfig::builder()
1942            .stream_key_column("currency".to_string())
1943            .table_key_column("currency".to_string())
1944            .table_version_column("valid_from".to_string())
1945            .table_characteristics(TableCharacteristics::NonAppendOnly)
1946            .build()
1947            .unwrap();
1948
1949        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1950
1951        let mut timers = TimerService::new();
1952        let mut state = InMemoryStore::new();
1953        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1954
1955        // Add state
1956        {
1957            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1958            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1959        }
1960
1961        // Process some orders at different timestamps
1962        for i in 0..5 {
1963            let order = create_order_event(600 + i * 100, "USD", 100.0);
1964            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1965            operator.process_stream(&order, &mut ctx);
1966        }
1967
1968        assert_eq!(operator.stream_state_size(), 5);
1969
1970        // Advance watermark past some events
1971        {
1972            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1973            operator.on_watermark(900, &mut ctx);
1974        }
1975
1976        // Old events should be cleaned up
1977        assert!(operator.stream_state_size() < 5);
1978        assert!(operator.metrics().state_cleanups > 0);
1979    }
1980}