Skip to main content

laminar_core/operator/
stream_join.rs

1//! # Stream-Stream Join Operators
2//!
3//! Implementation of time-bounded joins between two event streams.
4//!
5//! Stream-stream joins match events from two streams based on a join key
6//! and a time bound. Events are matched if they share a key and their
7//! timestamps fall within the specified time window.
8//!
9//! ## Join Types
10//!
11//! - **Inner**: Only emit matched pairs
12//! - **Left**: Emit all left events, with right match if exists
13//! - **Right**: Emit all right events, with left match if exists
14//! - **Full**: Emit all events, with matches where they exist
15//! - **Left Semi**: Emit left rows that have at least one match (left columns only)
16//! - **Left Anti**: Emit left rows with no match within the time bound (left columns only)
17//! - **Right Semi/Anti**: Mirror of left variants
18//!
19//! ## Example
20//!
21//! ```rust,no_run
22//! use laminar_core::operator::stream_join::{
23//!     StreamJoinOperator, JoinType, JoinSide, StreamJoinConfig, JoinRowEncoding,
24//! };
25//! use std::time::Duration;
26//!
27//! // Basic join (backward compatible)
28//! let operator = StreamJoinOperator::new(
29//!     "order_id".to_string(),  // left key column
30//!     "order_id".to_string(),  // right key column
31//!     Duration::from_secs(3600), // 1 hour time bound
32//!     JoinType::Inner,
33//! );
34//!
35//! // Optimized join with CPU-friendly encoding
36//! let config = StreamJoinConfig::builder()
37//!     .left_key_column("order_id")
38//!     .right_key_column("order_id")
39//!     .time_bound(Duration::from_secs(3600))
40//!     .join_type(JoinType::Inner)
41//!     .row_encoding(JoinRowEncoding::CpuFriendly)  // 30-50% faster for memory-resident state
42//!     .asymmetric_compaction(true)                  // Skip compaction on finished sides
43//!     .per_key_tracking(true)                       // Aggressive cleanup for sparse keys
44//!     .build()
45//!     .unwrap();
46//! let optimized_operator = StreamJoinOperator::from_config(config);
47//! ```
48//!
49//! ## SQL Syntax
50//!
51//! ```sql
52//! SELECT o.*, p.status
53//! FROM orders o
54//! JOIN payments p
55//!     ON o.order_id = p.order_id
56//!     AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR;
57//!
58//! -- Session variables for optimization
59//! SET streaming_join_row_encoding = 'cpu_friendly';
60//! SET streaming_join_asymmetric_compaction = true;
61//! ```
62//!
63//! ## State Management
64//!
65//! Events are stored in state with keys formatted as:
66//! - `sjl:<key_hash>:<timestamp>:<event_id>` for left events
67//! - `sjr:<key_hash>:<timestamp>:<event_id>` for right events
68//!
69//! State is automatically cleaned up when watermark passes
70//! `event_timestamp + time_bound`.
71//!
72//! ## Optimizations
73//!
74//! - **CPU-Friendly Encoding**: Inlines primitive values for faster access (30-50% improvement)
75//! - **Asymmetric Compaction**: Skips compaction on finished/idle sides
76//! - **Per-Key Tracking**: Aggressive cleanup for sparse key patterns
77//! - **Build-Side Pruning**: Early pruning based on probe-side watermark
78
79use super::{
80    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
81    TimerKey,
82};
83use crate::state::{StateStore, StateStoreExt};
84use arrow_array::{Array, ArrayRef, Float64Array, Int64Array, RecordBatch, StringArray};
85use arrow_schema::{DataType, Field, Schema, SchemaRef};
86use bytes::Bytes;
87use rkyv::{
88    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
89};
90use rustc_hash::FxHashMap;
91use smallvec::SmallVec;
92use std::hash::{Hash, Hasher};
93use std::sync::atomic::{AtomicU64, Ordering};
94use std::sync::Arc;
95use std::time::Duration;
96
97/// Join type for stream-stream joins.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum JoinType {
100    /// Inner join - only emit matched pairs.
101    #[default]
102    Inner,
103    /// Left outer join - emit all left events, with right match if exists.
104    Left,
105    /// Right outer join - emit all right events, with left match if exists.
106    Right,
107    /// Full outer join - emit all events, with matches where they exist.
108    Full,
109    /// Left semi join - emit left rows that have at least one match (left columns only).
110    LeftSemi,
111    /// Left anti join - emit left rows that have no match (left columns only).
112    LeftAnti,
113    /// Right semi join - emit right rows that have at least one match (right columns only).
114    RightSemi,
115    /// Right anti join - emit right rows that have no match (right columns only).
116    RightAnti,
117}
118
119impl JoinType {
120    /// Returns true if unmatched left events should be emitted.
121    #[must_use]
122    pub fn emits_unmatched_left(&self) -> bool {
123        matches!(self, JoinType::Left | JoinType::Full | JoinType::LeftAnti)
124    }
125
126    /// Returns true if unmatched right events should be emitted.
127    #[must_use]
128    pub fn emits_unmatched_right(&self) -> bool {
129        matches!(self, JoinType::Right | JoinType::Full | JoinType::RightAnti)
130    }
131
132    /// Returns true if this is a semi join.
133    #[must_use]
134    pub fn is_semi(&self) -> bool {
135        matches!(self, JoinType::LeftSemi | JoinType::RightSemi)
136    }
137
138    /// Returns true if this is an anti join.
139    #[must_use]
140    pub fn is_anti(&self) -> bool {
141        matches!(self, JoinType::LeftAnti | JoinType::RightAnti)
142    }
143
144    /// For semi/anti joins, returns the side whose rows are kept in the output.
145    #[must_use]
146    pub fn kept_side(&self) -> Option<JoinSide> {
147        match self {
148            JoinType::LeftSemi | JoinType::LeftAnti => Some(JoinSide::Left),
149            JoinType::RightSemi | JoinType::RightAnti => Some(JoinSide::Right),
150            _ => None,
151        }
152    }
153}
154
155/// Identifies which side of the join an event came from.
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum JoinSide {
158    /// Left side of the join.
159    Left,
160    /// Right side of the join.
161    Right,
162}
163
164// Stream Join Optimizations
165
166/// Row encoding strategy for join state.
167///
168/// Controls how join rows are serialized for storage. The encoding choice
169/// affects the tradeoff between memory usage and CPU access speed.
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
171pub enum JoinRowEncoding {
172    /// Compact encoding using Arrow IPC format.
173    ///
174    /// - Smaller memory footprint
175    /// - Higher CPU decode cost per access
176    /// - Best when: state exceeds memory, disk spills frequent
177    #[default]
178    Compact,
179
180    /// CPU-friendly encoding with inlined primitive values.
181    ///
182    /// - Larger memory footprint (~20-40% more)
183    /// - Faster access (~30-50% improvement per `RisingWave` benchmarks)
184    /// - Best when: state fits in memory, CPU-bound workloads
185    CpuFriendly,
186}
187
188impl std::fmt::Display for JoinRowEncoding {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        match self {
191            Self::Compact => write!(f, "compact"),
192            Self::CpuFriendly => write!(f, "cpu_friendly"),
193        }
194    }
195}
196
197impl std::str::FromStr for JoinRowEncoding {
198    type Err = String;
199
200    fn from_str(s: &str) -> Result<Self, Self::Err> {
201        match s.to_lowercase().as_str() {
202            "compact" => Ok(Self::Compact),
203            "cpu_friendly" | "cpufriendly" | "cpu-friendly" => Ok(Self::CpuFriendly),
204            _ => Err(format!(
205                "Unknown encoding: {s}. Expected 'compact' or 'cpu_friendly'"
206            )),
207        }
208    }
209}
210
211/// Configuration for stream-stream joins.
212///
213/// Provides fine-grained control over join behavior and optimizations.
214/// Use the builder pattern for convenient construction.
215///
216/// # Example
217///
218/// ```rust
219/// use laminar_core::operator::stream_join::{StreamJoinConfig, JoinType, JoinRowEncoding};
220/// use std::time::Duration;
221///
222/// let config = StreamJoinConfig::builder()
223///     .left_key_column("order_id")
224///     .right_key_column("order_id")
225///     .time_bound(Duration::from_secs(3600))
226///     .join_type(JoinType::Inner)
227///     .row_encoding(JoinRowEncoding::CpuFriendly)
228///     .build()
229///     .unwrap();
230/// ```
231#[derive(Debug, Clone)]
232pub struct StreamJoinConfig {
233    /// Left stream key column name.
234    pub left_key_column: String,
235    /// Right stream key column name.
236    pub right_key_column: String,
237    /// Time bound for matching (milliseconds).
238    pub time_bound_ms: i64,
239    /// Type of join to perform.
240    pub join_type: JoinType,
241    /// Operator ID for checkpointing.
242    pub operator_id: Option<String>,
243
244    // Optimizations
245    /// Row encoding strategy.
246    pub row_encoding: JoinRowEncoding,
247    /// Enable asymmetric compaction optimization.
248    pub asymmetric_compaction: bool,
249    /// Threshold for considering a side "finished" (ms).
250    pub idle_threshold_ms: i64,
251    /// Enable per-key cleanup tracking.
252    pub per_key_tracking: bool,
253    /// Threshold for idle key cleanup (ms).
254    pub key_idle_threshold_ms: i64,
255    /// Enable build-side pruning.
256    pub build_side_pruning: bool,
257    /// Which side to use as build side (None = auto-select based on statistics).
258    pub build_side: Option<JoinSide>,
259}
260
261impl Default for StreamJoinConfig {
262    fn default() -> Self {
263        Self {
264            left_key_column: String::new(),
265            right_key_column: String::new(),
266            time_bound_ms: 0,
267            join_type: JoinType::Inner,
268            operator_id: None,
269            row_encoding: JoinRowEncoding::Compact,
270            asymmetric_compaction: true,
271            idle_threshold_ms: 60_000, // 1 minute
272            per_key_tracking: true,
273            key_idle_threshold_ms: 300_000, // 5 minutes
274            build_side_pruning: true,
275            build_side: None,
276        }
277    }
278}
279
280impl StreamJoinConfig {
281    /// Creates a new configuration builder.
282    #[must_use]
283    pub fn builder() -> StreamJoinConfigBuilder {
284        StreamJoinConfigBuilder::default()
285    }
286}
287
288/// Builder for `StreamJoinConfig`.
289#[derive(Debug, Default)]
290pub struct StreamJoinConfigBuilder {
291    config: StreamJoinConfig,
292}
293
294impl StreamJoinConfigBuilder {
295    /// Sets the left stream key column name.
296    #[must_use]
297    pub fn left_key_column(mut self, column: impl Into<String>) -> Self {
298        self.config.left_key_column = column.into();
299        self
300    }
301
302    /// Sets the right stream key column name.
303    #[must_use]
304    pub fn right_key_column(mut self, column: impl Into<String>) -> Self {
305        self.config.right_key_column = column.into();
306        self
307    }
308
309    /// Sets the time bound for matching events.
310    #[must_use]
311    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
312    pub fn time_bound(mut self, duration: Duration) -> Self {
313        self.config.time_bound_ms = duration.as_millis() as i64;
314        self
315    }
316
317    /// Sets the time bound in milliseconds.
318    #[must_use]
319    pub fn time_bound_ms(mut self, ms: i64) -> Self {
320        self.config.time_bound_ms = ms;
321        self
322    }
323
324    /// Sets the join type.
325    #[must_use]
326    pub fn join_type(mut self, join_type: JoinType) -> Self {
327        self.config.join_type = join_type;
328        self
329    }
330
331    /// Sets the operator ID for checkpointing.
332    #[must_use]
333    pub fn operator_id(mut self, id: impl Into<String>) -> Self {
334        self.config.operator_id = Some(id.into());
335        self
336    }
337
338    /// Sets the row encoding strategy.
339    #[must_use]
340    pub fn row_encoding(mut self, encoding: JoinRowEncoding) -> Self {
341        self.config.row_encoding = encoding;
342        self
343    }
344
345    /// Enables or disables asymmetric compaction.
346    #[must_use]
347    pub fn asymmetric_compaction(mut self, enabled: bool) -> Self {
348        self.config.asymmetric_compaction = enabled;
349        self
350    }
351
352    /// Sets the idle threshold for asymmetric compaction.
353    #[must_use]
354    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
355    pub fn idle_threshold(mut self, duration: Duration) -> Self {
356        self.config.idle_threshold_ms = duration.as_millis() as i64;
357        self
358    }
359
360    /// Enables or disables per-key tracking.
361    #[must_use]
362    pub fn per_key_tracking(mut self, enabled: bool) -> Self {
363        self.config.per_key_tracking = enabled;
364        self
365    }
366
367    /// Sets the key idle threshold for cleanup.
368    #[must_use]
369    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
370    pub fn key_idle_threshold(mut self, duration: Duration) -> Self {
371        self.config.key_idle_threshold_ms = duration.as_millis() as i64;
372        self
373    }
374
375    /// Enables or disables build-side pruning.
376    #[must_use]
377    pub fn build_side_pruning(mut self, enabled: bool) -> Self {
378        self.config.build_side_pruning = enabled;
379        self
380    }
381
382    /// Sets which side to use as the build side.
383    #[must_use]
384    pub fn build_side(mut self, side: JoinSide) -> Self {
385        self.config.build_side = Some(side);
386        self
387    }
388
389    /// Builds the configuration.
390    ///
391    /// # Errors
392    ///
393    /// Returns `OperatorError::ConfigError` if key columns are empty.
394    pub fn build(self) -> std::result::Result<StreamJoinConfig, OperatorError> {
395        if self.config.left_key_column.is_empty() {
396            return Err(OperatorError::ConfigError(
397                "left_key_column is required".into(),
398            ));
399        }
400        if self.config.right_key_column.is_empty() {
401            return Err(OperatorError::ConfigError(
402                "right_key_column is required".into(),
403            ));
404        }
405        Ok(self.config)
406    }
407}
408
409/// Per-side statistics for asymmetric optimization.
410#[derive(Debug, Clone, Default)]
411pub struct SideStats {
412    /// Total events received on this side.
413    pub events_received: u64,
414    /// Events in current tracking window.
415    pub events_this_window: u64,
416    /// Last event timestamp (processing time).
417    pub last_event_time: i64,
418    /// Estimated write rate (events/second).
419    pub write_rate: f64,
420    /// Window start time for rate calculation.
421    window_start: i64,
422}
423
424impl SideStats {
425    /// Creates new side statistics.
426    #[must_use]
427    pub fn new() -> Self {
428        Self::default()
429    }
430
431    /// Records an event arrival.
432    #[allow(clippy::cast_precision_loss)]
433    pub fn record_event(&mut self, processing_time: i64) {
434        self.events_received += 1;
435        self.events_this_window += 1;
436        self.last_event_time = processing_time;
437
438        // Update write rate every 1000ms
439        if self.window_start == 0 {
440            self.window_start = processing_time;
441        } else {
442            let elapsed_ms = processing_time - self.window_start;
443            if elapsed_ms >= 1000 {
444                // Precision loss is acceptable for rate estimation
445                self.write_rate = (self.events_this_window as f64 * 1000.0) / elapsed_ms as f64;
446                self.events_this_window = 0;
447                self.window_start = processing_time;
448            }
449        }
450    }
451
452    /// Checks if this side is considered "finished" (no recent activity).
453    #[must_use]
454    pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
455        if self.events_received == 0 {
456            return false; // Never received events, not idle
457        }
458        let time_since_last = current_time - self.last_event_time;
459        time_since_last > threshold_ms && self.events_this_window == 0
460    }
461}
462
463/// Per-key metadata for cleanup tracking.
464#[derive(Debug, Clone)]
465pub struct KeyMetadata {
466    /// Last event timestamp for this key (processing time).
467    pub last_activity: i64,
468    /// Number of events for this key.
469    pub event_count: u64,
470    /// Number of state entries for this key.
471    pub state_entries: u64,
472}
473
474impl KeyMetadata {
475    /// Creates new key metadata.
476    #[must_use]
477    pub fn new(processing_time: i64) -> Self {
478        Self {
479            last_activity: processing_time,
480            event_count: 1,
481            state_entries: 1,
482        }
483    }
484
485    /// Records an event for this key.
486    pub fn record_event(&mut self, processing_time: i64) {
487        self.last_activity = processing_time;
488        self.event_count += 1;
489        self.state_entries += 1;
490    }
491
492    /// Decrements state entry count (called on cleanup).
493    pub fn decrement_entries(&mut self) {
494        self.state_entries = self.state_entries.saturating_sub(1);
495    }
496
497    /// Checks if this key is idle.
498    #[must_use]
499    pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
500        current_time - self.last_activity > threshold_ms
501    }
502}
503
504/// State key prefixes for join state.
505const LEFT_STATE_PREFIX: &[u8; 4] = b"sjl:";
506const RIGHT_STATE_PREFIX: &[u8; 4] = b"sjr:";
507
508/// Timer key prefix for left-side cleanup.
509const LEFT_TIMER_PREFIX: u8 = 0x10;
510/// Timer key prefix for right-side cleanup.
511const RIGHT_TIMER_PREFIX: u8 = 0x20;
512/// Timer key prefix for unmatched event emission.
513const UNMATCHED_TIMER_PREFIX: u8 = 0x30;
514/// Prefix for matched-flag keys (separate from `JoinRow` to avoid serde roundtrips).
515const MATCHED_FLAG_PREFIX: u8 = 0x40;
516
517/// Build a matched-flag key from a 28-byte state key.
518#[inline]
519fn matched_flag_key(state_key: &[u8]) -> [u8; 29] {
520    debug_assert_eq!(state_key.len(), 28, "join state keys are always 28 bytes");
521    let mut k = [0u8; 29];
522    k[0] = MATCHED_FLAG_PREFIX;
523    k[1..].copy_from_slice(&state_key[..28]);
524    k
525}
526
527/// Mark a join row as matched by writing a 1-byte flag.
528#[inline]
529fn mark_matched(state: &mut dyn StateStore, state_key: &[u8]) {
530    let k = matched_flag_key(state_key);
531    let _ = state.put(&k, Bytes::from_static(&[1]));
532}
533
534/// Check if a join row has been matched.
535#[inline]
536fn is_matched(state: &dyn StateStore, state_key: &[u8]) -> bool {
537    let k = matched_flag_key(state_key);
538    state.get_ref(&k).is_some()
539}
540
541/// Static counter for generating unique operator IDs.
542static JOIN_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
543
544/// Static counter for generating unique event IDs within an operator.
545static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
546
547/// A stored join row containing serialized event data.
548#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
549pub struct JoinRow {
550    /// Event timestamp in milliseconds.
551    pub timestamp: i64,
552    /// Serialized key value (as bytes).
553    pub key_value: Vec<u8>,
554    /// Serialized record batch data.
555    pub data: Vec<u8>,
556    /// Whether this row has been matched (for outer joins).
557    pub matched: bool,
558    /// Encoding used for serialization.
559    /// 0 = Compact (Arrow IPC), 1 = `CpuFriendly`
560    encoding: u8,
561}
562
563/// Magic bytes for CPU-friendly encoding format.
564const CPU_FRIENDLY_MAGIC: [u8; 4] = *b"CPUF";
565
566/// Type tag for CPU-friendly encoding.
567#[repr(u8)]
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569enum CpuFriendlyType {
570    Null = 0,
571    Int64 = 1,
572    Float64 = 2,
573    Utf8 = 3,
574}
575
576impl JoinRow {
577    /// Creates a new join row from an event and extracted key.
578    /// Uses compact encoding by default.
579    #[cfg(test)]
580    fn new(timestamp: i64, key_value: Vec<u8>, batch: &RecordBatch) -> Result<Self, OperatorError> {
581        Self::with_encoding(timestamp, key_value, batch, JoinRowEncoding::Compact)
582    }
583
584    /// Creates a new join row with specified encoding.
585    fn with_encoding(
586        timestamp: i64,
587        key_value: Vec<u8>,
588        batch: &RecordBatch,
589        encoding: JoinRowEncoding,
590    ) -> Result<Self, OperatorError> {
591        let (data, encoding_byte) = match encoding {
592            JoinRowEncoding::Compact => (Self::serialize_compact(batch)?, 0),
593            JoinRowEncoding::CpuFriendly => (Self::serialize_cpu_friendly(batch)?, 1),
594        };
595        Ok(Self {
596            timestamp,
597            key_value,
598            data,
599            matched: false,
600            encoding: encoding_byte,
601        })
602    }
603
604    fn serialize_compact(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
605        Ok(crate::serialization::serialize_batch_stream(batch)?)
606    }
607
608    /// Serializes using CPU-friendly format.
609    ///
610    /// Format:
611    /// - Magic (4 bytes): "CPUF"
612    /// - Num columns (2 bytes): u16
613    /// - Num rows (4 bytes): u32
614    /// - For each column:
615    ///   - Name length (2 bytes): u16
616    ///   - Name bytes (variable)
617    ///   - Type tag (1 byte)
618    ///   - Nullable (1 byte): 0 or 1
619    ///   - Data (depends on type):
620    ///     - Int64: validity bitmap + raw i64 values
621    ///     - Float64: validity bitmap + raw f64 values
622    ///     - Utf8: validity bitmap + offsets (u32) + data bytes
623    #[allow(clippy::cast_possible_truncation)] // Wire format uses u32 for row/column counts and offsets
624    fn serialize_cpu_friendly(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
625        let schema = batch.schema();
626        let num_rows = batch.num_rows();
627        let num_cols = batch.num_columns();
628
629        // Estimate capacity
630        let mut buf = Vec::with_capacity(4 + 2 + 4 + num_cols * 64 + num_rows * num_cols * 8);
631
632        // Header
633        buf.extend_from_slice(&CPU_FRIENDLY_MAGIC);
634        buf.extend_from_slice(&(num_cols as u16).to_le_bytes());
635        buf.extend_from_slice(&(num_rows as u32).to_le_bytes());
636
637        // Columns
638        for (i, field) in schema.fields().iter().enumerate() {
639            let column = batch.column(i);
640
641            // Column name
642            let name_bytes = field.name().as_bytes();
643            buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
644            buf.extend_from_slice(name_bytes);
645
646            // Nullable flag
647            buf.push(u8::from(field.is_nullable()));
648
649            // Type and data
650            match field.data_type() {
651                DataType::Int64 => {
652                    buf.push(CpuFriendlyType::Int64 as u8);
653                    Self::write_int64_column(&mut buf, column, num_rows)?;
654                }
655                DataType::Float64 => {
656                    buf.push(CpuFriendlyType::Float64 as u8);
657                    Self::write_float64_column(&mut buf, column, num_rows)?;
658                }
659                DataType::Utf8 => {
660                    buf.push(CpuFriendlyType::Utf8 as u8);
661                    Self::write_utf8_column(&mut buf, column, num_rows)?;
662                }
663                _ => {
664                    // Fallback: encode as null for unsupported types
665                    buf.push(CpuFriendlyType::Null as u8);
666                }
667            }
668        }
669
670        Ok(buf)
671    }
672
673    /// Writes an Int64 column in CPU-friendly format.
674    fn write_int64_column(
675        buf: &mut Vec<u8>,
676        column: &ArrayRef,
677        num_rows: usize,
678    ) -> Result<(), OperatorError> {
679        let arr = column
680            .as_any()
681            .downcast_ref::<Int64Array>()
682            .ok_or_else(|| OperatorError::SerializationFailed("Expected Int64Array".into()))?;
683
684        // Validity bitmap (1 bit per row, padded to bytes)
685        let validity_bytes = num_rows.div_ceil(8);
686        if let Some(nulls) = arr.nulls() {
687            // Copy validity buffer
688            let buffer = nulls.buffer();
689            let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
690            buf.extend_from_slice(slice);
691            // Pad if needed
692            for _ in slice.len()..validity_bytes {
693                buf.push(0xFF);
694            }
695        } else {
696            // All valid
697            buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
698        }
699
700        // Raw values (8 bytes each)
701        // SAFETY: Converting i64 slice to bytes for zero-copy serialization.
702        // The pointer cast is safe because we're reinterpreting the same memory.
703        let values = arr.values();
704        let value_bytes =
705            unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
706        buf.extend_from_slice(value_bytes);
707
708        Ok(())
709    }
710
711    /// Writes a Float64 column in CPU-friendly format.
712    fn write_float64_column(
713        buf: &mut Vec<u8>,
714        column: &ArrayRef,
715        num_rows: usize,
716    ) -> Result<(), OperatorError> {
717        let arr = column
718            .as_any()
719            .downcast_ref::<Float64Array>()
720            .ok_or_else(|| OperatorError::SerializationFailed("Expected Float64Array".into()))?;
721
722        // Validity bitmap
723        let validity_bytes = num_rows.div_ceil(8);
724        if let Some(nulls) = arr.nulls() {
725            let buffer = nulls.buffer();
726            let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
727            buf.extend_from_slice(slice);
728            for _ in slice.len()..validity_bytes {
729                buf.push(0xFF);
730            }
731        } else {
732            buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
733        }
734
735        // Raw values (8 bytes each)
736        // SAFETY: Converting f64 slice to bytes for zero-copy serialization.
737        let values = arr.values();
738        let value_bytes =
739            unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
740        buf.extend_from_slice(value_bytes);
741
742        Ok(())
743    }
744
745    /// Writes a Utf8 column in CPU-friendly format.
746    #[allow(clippy::cast_sign_loss)]
747    fn write_utf8_column(
748        buf: &mut Vec<u8>,
749        column: &ArrayRef,
750        num_rows: usize,
751    ) -> Result<(), OperatorError> {
752        let arr = column
753            .as_any()
754            .downcast_ref::<StringArray>()
755            .ok_or_else(|| OperatorError::SerializationFailed("Expected StringArray".into()))?;
756
757        // Validity bitmap
758        let validity_bytes = num_rows.div_ceil(8);
759        if let Some(nulls) = arr.nulls() {
760            let buffer = nulls.buffer();
761            let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
762            buf.extend_from_slice(slice);
763            for _ in slice.len()..validity_bytes {
764                buf.push(0xFF);
765            }
766        } else {
767            buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
768        }
769
770        // Offsets (u32 for each row + 1)
771        // Note: Arrow string offsets are always non-negative
772        let offsets = arr.offsets();
773        for offset in offsets.iter() {
774            buf.extend_from_slice(&(*offset as u32).to_le_bytes());
775        }
776
777        // String data
778        let values = arr.values();
779        buf.extend_from_slice(values.as_slice());
780
781        Ok(())
782    }
783
784    /// Deserializes a record batch from bytes.
785    fn deserialize_batch(data: &[u8], encoding: u8) -> Result<RecordBatch, OperatorError> {
786        if encoding == 1 && data.starts_with(&CPU_FRIENDLY_MAGIC) {
787            Self::deserialize_cpu_friendly(data)
788        } else {
789            Self::deserialize_compact(data)
790        }
791    }
792
793    fn deserialize_compact(data: &[u8]) -> Result<RecordBatch, OperatorError> {
794        Ok(crate::serialization::deserialize_batch_stream(data)?)
795    }
796
797    /// Deserializes from CPU-friendly format.
798    fn deserialize_cpu_friendly(data: &[u8]) -> Result<RecordBatch, OperatorError> {
799        if data.len() < 10 {
800            return Err(OperatorError::SerializationFailed(
801                "Buffer too short".into(),
802            ));
803        }
804
805        // Parse header
806        let num_cols = u16::from_le_bytes([data[4], data[5]]) as usize;
807        let num_rows = u32::from_le_bytes([data[6], data[7], data[8], data[9]]) as usize;
808
809        let mut offset = 10;
810        let mut fields = Vec::with_capacity(num_cols);
811        let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_cols);
812
813        for _ in 0..num_cols {
814            if offset + 2 > data.len() {
815                return Err(OperatorError::SerializationFailed(
816                    "Truncated column header".into(),
817                ));
818            }
819
820            // Read column name
821            let name_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
822            offset += 2;
823
824            if offset + name_len > data.len() {
825                return Err(OperatorError::SerializationFailed(
826                    "Truncated column name".into(),
827                ));
828            }
829            let name = String::from_utf8_lossy(&data[offset..offset + name_len]).to_string();
830            offset += name_len;
831
832            if offset + 2 > data.len() {
833                return Err(OperatorError::SerializationFailed(
834                    "Truncated type info".into(),
835                ));
836            }
837
838            // Read nullable and type
839            let nullable = data[offset] != 0;
840            offset += 1;
841            let type_tag = data[offset];
842            offset += 1;
843
844            // Read column data based on type
845            let validity_bytes = num_rows.div_ceil(8);
846
847            match type_tag {
848                t if t == CpuFriendlyType::Int64 as u8 => {
849                    let (arr, new_offset) =
850                        Self::read_int64_column(data, offset, num_rows, validity_bytes)?;
851                    offset = new_offset;
852                    fields.push(Field::new(&name, DataType::Int64, nullable));
853                    columns.push(Arc::new(arr));
854                }
855                t if t == CpuFriendlyType::Float64 as u8 => {
856                    let (arr, new_offset) =
857                        Self::read_float64_column(data, offset, num_rows, validity_bytes)?;
858                    offset = new_offset;
859                    fields.push(Field::new(&name, DataType::Float64, nullable));
860                    columns.push(Arc::new(arr));
861                }
862                t if t == CpuFriendlyType::Utf8 as u8 => {
863                    let (arr, new_offset) =
864                        Self::read_utf8_column(data, offset, num_rows, validity_bytes)?;
865                    offset = new_offset;
866                    fields.push(Field::new(&name, DataType::Utf8, nullable));
867                    columns.push(Arc::new(arr));
868                }
869                _ => {
870                    // Null/unsupported type - create null array
871                    fields.push(Field::new(&name, DataType::Int64, true));
872                    columns.push(Arc::new(Int64Array::from(vec![None; num_rows])));
873                }
874            }
875        }
876
877        let schema = Arc::new(Schema::new(fields));
878        RecordBatch::try_new(schema, columns)
879            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
880    }
881
882    /// Reads an Int64 column from CPU-friendly format.
883    fn read_int64_column(
884        data: &[u8],
885        offset: usize,
886        num_rows: usize,
887        validity_bytes: usize,
888    ) -> Result<(Int64Array, usize), OperatorError> {
889        let mut pos = offset;
890
891        // Skip validity bitmap (we don't reconstruct it for simplicity)
892        if pos + validity_bytes > data.len() {
893            return Err(OperatorError::SerializationFailed(
894                "Truncated validity".into(),
895            ));
896        }
897        pos += validity_bytes;
898
899        // Read values
900        let values_bytes = num_rows * 8;
901        if pos + values_bytes > data.len() {
902            return Err(OperatorError::SerializationFailed(
903                "Truncated int64 values".into(),
904            ));
905        }
906
907        let mut values = Vec::with_capacity(num_rows);
908        for i in 0..num_rows {
909            let start = pos + i * 8;
910            let bytes = [
911                data[start],
912                data[start + 1],
913                data[start + 2],
914                data[start + 3],
915                data[start + 4],
916                data[start + 5],
917                data[start + 6],
918                data[start + 7],
919            ];
920            values.push(i64::from_le_bytes(bytes));
921        }
922        pos += values_bytes;
923
924        Ok((Int64Array::from(values), pos))
925    }
926
927    /// Reads a Float64 column from CPU-friendly format.
928    fn read_float64_column(
929        data: &[u8],
930        offset: usize,
931        num_rows: usize,
932        validity_bytes: usize,
933    ) -> Result<(Float64Array, usize), OperatorError> {
934        let mut pos = offset;
935
936        // Skip validity bitmap
937        if pos + validity_bytes > data.len() {
938            return Err(OperatorError::SerializationFailed(
939                "Truncated validity".into(),
940            ));
941        }
942        pos += validity_bytes;
943
944        // Read values
945        let values_bytes = num_rows * 8;
946        if pos + values_bytes > data.len() {
947            return Err(OperatorError::SerializationFailed(
948                "Truncated float64 values".into(),
949            ));
950        }
951
952        let mut values = Vec::with_capacity(num_rows);
953        for i in 0..num_rows {
954            let start = pos + i * 8;
955            let bytes = [
956                data[start],
957                data[start + 1],
958                data[start + 2],
959                data[start + 3],
960                data[start + 4],
961                data[start + 5],
962                data[start + 6],
963                data[start + 7],
964            ];
965            values.push(f64::from_le_bytes(bytes));
966        }
967        pos += values_bytes;
968
969        Ok((Float64Array::from(values), pos))
970    }
971
972    /// Reads a Utf8 column from CPU-friendly format.
973    fn read_utf8_column(
974        data: &[u8],
975        offset: usize,
976        num_rows: usize,
977        validity_bytes: usize,
978    ) -> Result<(StringArray, usize), OperatorError> {
979        let mut pos = offset;
980
981        // Skip validity bitmap
982        if pos + validity_bytes > data.len() {
983            return Err(OperatorError::SerializationFailed(
984                "Truncated validity".into(),
985            ));
986        }
987        pos += validity_bytes;
988
989        // Read offsets
990        let offsets_bytes = (num_rows + 1) * 4;
991        if pos + offsets_bytes > data.len() {
992            return Err(OperatorError::SerializationFailed(
993                "Truncated offsets".into(),
994            ));
995        }
996
997        let mut offsets = Vec::with_capacity(num_rows + 1);
998        for i in 0..=num_rows {
999            let start = pos + i * 4;
1000            let bytes = [
1001                data[start],
1002                data[start + 1],
1003                data[start + 2],
1004                data[start + 3],
1005            ];
1006            offsets.push(u32::from_le_bytes(bytes) as usize);
1007        }
1008        pos += offsets_bytes;
1009
1010        // Calculate data length and read string data
1011        let data_len = offsets.last().copied().unwrap_or(0);
1012        if pos + data_len > data.len() {
1013            return Err(OperatorError::SerializationFailed(
1014                "Truncated string data".into(),
1015            ));
1016        }
1017
1018        let string_data = &data[pos..pos + data_len];
1019        pos += data_len;
1020
1021        // Build strings
1022        let mut strings = Vec::with_capacity(num_rows);
1023        for i in 0..num_rows {
1024            let start = offsets[i];
1025            let end = offsets[i + 1];
1026            let s = String::from_utf8_lossy(&string_data[start..end]).to_string();
1027            strings.push(s);
1028        }
1029
1030        Ok((StringArray::from(strings), pos))
1031    }
1032
1033    /// Converts this join row back to a record batch.
1034    ///
1035    /// # Errors
1036    ///
1037    /// Returns `OperatorError::SerializationFailed` if the batch data is invalid.
1038    pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
1039        Self::deserialize_batch(&self.data, self.encoding)
1040    }
1041
1042    /// Returns the encoding used for this row.
1043    #[must_use]
1044    pub fn encoding(&self) -> JoinRowEncoding {
1045        if self.encoding == 1 {
1046            JoinRowEncoding::CpuFriendly
1047        } else {
1048            JoinRowEncoding::Compact
1049        }
1050    }
1051}
1052
1053/// Metrics for tracking join operations.
1054#[derive(Debug, Clone, Default)]
1055pub struct JoinMetrics {
1056    /// Number of left events processed.
1057    pub left_events: u64,
1058    /// Number of right events processed.
1059    pub right_events: u64,
1060    /// Number of join matches produced.
1061    pub matches: u64,
1062    /// Number of unmatched left events emitted (left/full joins).
1063    pub unmatched_left: u64,
1064    /// Number of unmatched right events emitted (right/full joins).
1065    pub unmatched_right: u64,
1066    /// Number of late events dropped.
1067    pub late_events: u64,
1068    /// Number of state entries cleaned up.
1069    pub state_cleanups: u64,
1070
1071    // Optimization Metrics
1072    /// Rows encoded with CPU-friendly format.
1073    pub cpu_friendly_encodes: u64,
1074    /// Rows encoded with compact format.
1075    pub compact_encodes: u64,
1076    /// Compactions skipped due to asymmetric optimization.
1077    pub asymmetric_skips: u64,
1078    /// Idle keys cleaned up.
1079    pub idle_key_cleanups: u64,
1080    /// Build-side entries pruned early.
1081    pub build_side_prunes: u64,
1082    /// Current number of tracked keys (for per-key tracking).
1083    pub tracked_keys: u64,
1084}
1085
1086impl JoinMetrics {
1087    /// Creates new metrics.
1088    #[must_use]
1089    pub fn new() -> Self {
1090        Self::default()
1091    }
1092
1093    /// Resets all counters.
1094    pub fn reset(&mut self) {
1095        *self = Self::default();
1096    }
1097}
1098
1099/// Stream-stream join operator.
1100///
1101/// Joins events from two streams based on a key column and time bound.
1102/// Events are matched if they share a key value and their timestamps
1103/// are within the specified time window.
1104///
1105/// # State Management
1106///
1107/// Events from both sides are stored in state until they can no longer
1108/// produce matches (watermark passes `timestamp + time_bound`). State
1109/// is automatically cleaned up via timers.
1110///
1111/// # Performance Considerations
1112///
1113/// - State grows linearly with the number of events within the time window
1114/// - For high-cardinality joins, consider using shorter time bounds
1115/// - Inner joins use less state than outer joins (no unmatched tracking)
1116///
1117/// # Optimizations
1118///
1119/// - **CPU-Friendly Encoding**: Use `JoinRowEncoding::CpuFriendly` for 30-50% faster access
1120/// - **Asymmetric Compaction**: Automatically skips compaction on finished/idle sides
1121/// - **Per-Key Tracking**: Aggressive cleanup for sparse key patterns
1122/// - **Build-Side Pruning**: Early pruning based on probe-side watermark progress
1123pub struct StreamJoinOperator {
1124    /// Left stream key column name.
1125    left_key_column: String,
1126    /// Right stream key column name.
1127    right_key_column: String,
1128    /// Time bound for matching (events match if within this duration).
1129    time_bound_ms: i64,
1130    /// Type of join to perform.
1131    join_type: JoinType,
1132    /// Operator ID for checkpointing.
1133    operator_id: String,
1134    /// Metrics for monitoring.
1135    metrics: JoinMetrics,
1136    /// Output schema (lazily initialized).
1137    output_schema: Option<SchemaRef>,
1138    /// Left schema (captured from first left event).
1139    left_schema: Option<SchemaRef>,
1140    /// Right schema (captured from first right event).
1141    right_schema: Option<SchemaRef>,
1142
1143    // Optimization Fields
1144    /// Row encoding strategy.
1145    row_encoding: JoinRowEncoding,
1146    /// Enable asymmetric compaction.
1147    asymmetric_compaction: bool,
1148    /// Idle threshold for asymmetric compaction (ms).
1149    idle_threshold_ms: i64,
1150    /// Enable per-key tracking.
1151    per_key_tracking: bool,
1152    /// Key idle threshold (ms).
1153    key_idle_threshold_ms: i64,
1154    /// Enable build-side pruning.
1155    build_side_pruning: bool,
1156    /// Configured build side.
1157    build_side: Option<JoinSide>,
1158    /// Left-side statistics.
1159    left_stats: SideStats,
1160    /// Right-side statistics.
1161    right_stats: SideStats,
1162    /// Per-key metadata (`key_hash` -> metadata).
1163    key_metadata: FxHashMap<u64, KeyMetadata>,
1164    /// Left-side watermark.
1165    left_watermark: i64,
1166    /// Right-side watermark.
1167    right_watermark: i64,
1168    /// Reusable buffer for `prune_build_side` to avoid per-call allocation.
1169    prune_buffer: Vec<Bytes>,
1170    /// Cached column index for left key — resolved on first left event.
1171    left_key_index: Option<usize>,
1172    /// Cached column index for right key — resolved on first right event.
1173    right_key_index: Option<usize>,
1174}
1175
1176impl StreamJoinOperator {
1177    /// Creates a new stream join operator.
1178    ///
1179    /// # Arguments
1180    ///
1181    /// * `left_key_column` - Name of the key column in left stream events
1182    /// * `right_key_column` - Name of the key column in right stream events
1183    /// * `time_bound` - Maximum time difference for matching events
1184    /// * `join_type` - Type of join to perform
1185    #[must_use]
1186    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
1187    pub fn new(
1188        left_key_column: String,
1189        right_key_column: String,
1190        time_bound: Duration,
1191        join_type: JoinType,
1192    ) -> Self {
1193        let operator_num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1194        Self {
1195            left_key_column,
1196            right_key_column,
1197            time_bound_ms: time_bound.as_millis() as i64,
1198            join_type,
1199            operator_id: format!("stream_join_{operator_num}"),
1200            metrics: JoinMetrics::new(),
1201            output_schema: None,
1202            left_schema: None,
1203            right_schema: None,
1204            // Default optimizations
1205            row_encoding: JoinRowEncoding::Compact,
1206            asymmetric_compaction: true,
1207            idle_threshold_ms: 60_000,
1208            per_key_tracking: true,
1209            key_idle_threshold_ms: 300_000,
1210            build_side_pruning: true,
1211            build_side: None,
1212            left_stats: SideStats::new(),
1213            right_stats: SideStats::new(),
1214            key_metadata: FxHashMap::default(),
1215            left_watermark: i64::MIN,
1216            right_watermark: i64::MIN,
1217            prune_buffer: Vec::with_capacity(100),
1218            left_key_index: None,
1219            right_key_index: None,
1220        }
1221    }
1222
1223    /// Creates a new stream join operator with a custom operator ID.
1224    #[must_use]
1225    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
1226    pub fn with_id(
1227        left_key_column: String,
1228        right_key_column: String,
1229        time_bound: Duration,
1230        join_type: JoinType,
1231        operator_id: String,
1232    ) -> Self {
1233        Self {
1234            left_key_column,
1235            right_key_column,
1236            time_bound_ms: time_bound.as_millis() as i64,
1237            join_type,
1238            operator_id,
1239            metrics: JoinMetrics::new(),
1240            output_schema: None,
1241            left_schema: None,
1242            right_schema: None,
1243            // Default optimizations
1244            row_encoding: JoinRowEncoding::Compact,
1245            asymmetric_compaction: true,
1246            idle_threshold_ms: 60_000,
1247            per_key_tracking: true,
1248            key_idle_threshold_ms: 300_000,
1249            build_side_pruning: true,
1250            build_side: None,
1251            left_stats: SideStats::new(),
1252            right_stats: SideStats::new(),
1253            key_metadata: FxHashMap::default(),
1254            left_watermark: i64::MIN,
1255            right_watermark: i64::MIN,
1256            prune_buffer: Vec::with_capacity(100),
1257            left_key_index: None,
1258            right_key_index: None,
1259        }
1260    }
1261
1262    /// Creates a new stream join operator from configuration.
1263    ///
1264    /// This is the recommended constructor for production use, allowing
1265    /// fine-grained control over optimization settings.
1266    ///
1267    /// # Example
1268    ///
1269    /// ```rust
1270    /// use laminar_core::operator::stream_join::{
1271    ///     StreamJoinOperator, StreamJoinConfig, JoinType, JoinRowEncoding,
1272    /// };
1273    /// use std::time::Duration;
1274    ///
1275    /// let config = StreamJoinConfig::builder()
1276    ///     .left_key_column("order_id")
1277    ///     .right_key_column("order_id")
1278    ///     .time_bound(Duration::from_secs(3600))
1279    ///     .join_type(JoinType::Inner)
1280    ///     .row_encoding(JoinRowEncoding::CpuFriendly)
1281    ///     .build()
1282    ///     .unwrap();
1283    ///
1284    /// let operator = StreamJoinOperator::from_config(config);
1285    /// ```
1286    #[must_use]
1287    pub fn from_config(config: StreamJoinConfig) -> Self {
1288        let operator_id = config.operator_id.unwrap_or_else(|| {
1289            let num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1290            format!("stream_join_{num}")
1291        });
1292
1293        Self {
1294            left_key_column: config.left_key_column,
1295            right_key_column: config.right_key_column,
1296            time_bound_ms: config.time_bound_ms,
1297            join_type: config.join_type,
1298            operator_id,
1299            metrics: JoinMetrics::new(),
1300            output_schema: None,
1301            left_schema: None,
1302            right_schema: None,
1303            row_encoding: config.row_encoding,
1304            asymmetric_compaction: config.asymmetric_compaction,
1305            idle_threshold_ms: config.idle_threshold_ms,
1306            per_key_tracking: config.per_key_tracking,
1307            key_idle_threshold_ms: config.key_idle_threshold_ms,
1308            build_side_pruning: config.build_side_pruning,
1309            build_side: config.build_side,
1310            left_stats: SideStats::new(),
1311            right_stats: SideStats::new(),
1312            key_metadata: FxHashMap::default(),
1313            left_watermark: i64::MIN,
1314            right_watermark: i64::MIN,
1315            prune_buffer: Vec::with_capacity(100),
1316            left_key_index: None,
1317            right_key_index: None,
1318        }
1319    }
1320
1321    /// Returns the join type.
1322    #[must_use]
1323    pub fn join_type(&self) -> JoinType {
1324        self.join_type
1325    }
1326
1327    /// Returns the time bound in milliseconds.
1328    #[must_use]
1329    pub fn time_bound_ms(&self) -> i64 {
1330        self.time_bound_ms
1331    }
1332
1333    /// Returns the metrics.
1334    #[must_use]
1335    pub fn metrics(&self) -> &JoinMetrics {
1336        &self.metrics
1337    }
1338
1339    /// Resets the metrics.
1340    pub fn reset_metrics(&mut self) {
1341        self.metrics.reset();
1342    }
1343
1344    /// Returns the row encoding strategy.
1345    #[must_use]
1346    pub fn row_encoding(&self) -> JoinRowEncoding {
1347        self.row_encoding
1348    }
1349
1350    /// Returns whether asymmetric compaction is enabled.
1351    #[must_use]
1352    pub fn asymmetric_compaction_enabled(&self) -> bool {
1353        self.asymmetric_compaction
1354    }
1355
1356    /// Returns whether per-key tracking is enabled.
1357    #[must_use]
1358    pub fn per_key_tracking_enabled(&self) -> bool {
1359        self.per_key_tracking
1360    }
1361
1362    /// Returns the left-side statistics.
1363    #[must_use]
1364    pub fn left_stats(&self) -> &SideStats {
1365        &self.left_stats
1366    }
1367
1368    /// Returns the right-side statistics.
1369    #[must_use]
1370    pub fn right_stats(&self) -> &SideStats {
1371        &self.right_stats
1372    }
1373
1374    /// Returns the number of tracked keys.
1375    #[must_use]
1376    pub fn tracked_key_count(&self) -> usize {
1377        self.key_metadata.len()
1378    }
1379
1380    /// Checks if a side is considered "finished" (idle).
1381    #[must_use]
1382    pub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool {
1383        match side {
1384            JoinSide::Left => self
1385                .left_stats
1386                .is_idle(current_time, self.idle_threshold_ms),
1387            JoinSide::Right => self
1388                .right_stats
1389                .is_idle(current_time, self.idle_threshold_ms),
1390        }
1391    }
1392
1393    /// Determines the effective build side based on configuration or heuristics.
1394    #[must_use]
1395    pub fn effective_build_side(&self) -> JoinSide {
1396        // Use configured build side if set
1397        if let Some(side) = self.build_side {
1398            return side;
1399        }
1400
1401        // Auto-select based on statistics: smaller side is typically better as build
1402        if self.left_stats.events_received < self.right_stats.events_received {
1403            JoinSide::Left
1404        } else {
1405            JoinSide::Right
1406        }
1407    }
1408
1409    /// Processes an event from either the left or right side.
1410    ///
1411    /// This is the main entry point for the join operator. Call this with
1412    /// the appropriate `JoinSide` to indicate which stream the event came from.
1413    pub fn process_side(
1414        &mut self,
1415        event: &Event,
1416        side: JoinSide,
1417        ctx: &mut OperatorContext,
1418    ) -> OutputVec {
1419        match side {
1420            JoinSide::Left => self.process_left(event, ctx),
1421            JoinSide::Right => self.process_right(event, ctx),
1422        }
1423    }
1424
1425    /// Processes a left-side event.
1426    fn process_left(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1427        self.metrics.left_events += 1;
1428
1429        // Track side statistics for asymmetric compaction
1430        self.left_stats.record_event(ctx.processing_time);
1431
1432        // Capture left schema on first event
1433        if self.left_schema.is_none() {
1434            self.left_schema = Some(event.data.schema());
1435            self.update_output_schema();
1436        }
1437
1438        self.process_event(event, JoinSide::Left, ctx)
1439    }
1440
1441    /// Processes a right-side event.
1442    fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1443        self.metrics.right_events += 1;
1444
1445        // Track side statistics for asymmetric compaction
1446        self.right_stats.record_event(ctx.processing_time);
1447
1448        // Capture right schema on first event
1449        if self.right_schema.is_none() {
1450            self.right_schema = Some(event.data.schema());
1451            self.update_output_schema();
1452        }
1453
1454        self.process_event(event, JoinSide::Right, ctx)
1455    }
1456
1457    /// Updates the output schema when both input schemas are known.
1458    fn update_output_schema(&mut self) {
1459        if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
1460            // Semi/anti joins output only the kept side's columns
1461            match self.join_type.kept_side() {
1462                Some(JoinSide::Left) => {
1463                    self.output_schema = Some(Arc::clone(left));
1464                    return;
1465                }
1466                Some(JoinSide::Right) => {
1467                    self.output_schema = Some(Arc::clone(right));
1468                    return;
1469                }
1470                None => {}
1471            }
1472
1473            let mut fields: Vec<Field> =
1474                Vec::with_capacity(left.fields().len() + right.fields().len());
1475            fields.extend(left.fields().iter().map(|f| f.as_ref().clone()));
1476
1477            // Add right fields, prefixing duplicates
1478            for field in right.fields() {
1479                let name = if left.field_with_name(field.name()).is_ok() {
1480                    format!("right_{}", field.name())
1481                } else {
1482                    field.name().clone()
1483                };
1484                fields.push(Field::new(
1485                    name,
1486                    field.data_type().clone(),
1487                    true, // Nullable for outer joins
1488                ));
1489            }
1490
1491            self.output_schema = Some(Arc::new(Schema::new(fields)));
1492        }
1493    }
1494
1495    /// Core event processing logic.
1496    fn process_event(
1497        &mut self,
1498        event: &Event,
1499        side: JoinSide,
1500        ctx: &mut OperatorContext,
1501    ) -> OutputVec {
1502        let mut output = OutputVec::new();
1503        let event_time = event.timestamp;
1504
1505        // Update watermark
1506        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
1507
1508        // Track per-side watermarks for build-side pruning using actual watermark
1509        let current_wm = ctx.watermark_generator.current_watermark();
1510        match side {
1511            JoinSide::Left => self.left_watermark = self.left_watermark.max(current_wm),
1512            JoinSide::Right => self.right_watermark = self.right_watermark.max(current_wm),
1513        }
1514
1515        // Check if event is too late
1516        if current_wm > i64::MIN && event_time + self.time_bound_ms < current_wm {
1517            self.metrics.late_events += 1;
1518            output.push(Output::LateEvent(event.clone()));
1519            return output;
1520        }
1521
1522        // Extract join key
1523        let (key_column, cached_index) = match side {
1524            JoinSide::Left => (&self.left_key_column, &mut self.left_key_index),
1525            JoinSide::Right => (&self.right_key_column, &mut self.right_key_index),
1526        };
1527        let Some(key_value) = Self::extract_key(&event.data, key_column, cached_index) else {
1528            // Can't extract key, skip this event
1529            return output;
1530        };
1531
1532        // Compute key hash for per-key tracking
1533        let key_hash = {
1534            let mut hasher = rustc_hash::FxHasher::default();
1535            key_value.hash(&mut hasher);
1536            hasher.finish()
1537        };
1538
1539        // Track per-key metadata
1540        if self.per_key_tracking {
1541            self.key_metadata
1542                .entry(key_hash)
1543                .and_modify(|meta| meta.record_event(ctx.processing_time))
1544                .or_insert_with(|| KeyMetadata::new(ctx.processing_time));
1545            self.metrics.tracked_keys = self.key_metadata.len() as u64;
1546        }
1547
1548        // Create join row with configured encoding
1549        let join_row = match JoinRow::with_encoding(
1550            event_time,
1551            key_value.to_vec(),
1552            &event.data,
1553            self.row_encoding,
1554        ) {
1555            Ok(row) => {
1556                // Track encoding metrics
1557                match self.row_encoding {
1558                    JoinRowEncoding::Compact => self.metrics.compact_encodes += 1,
1559                    JoinRowEncoding::CpuFriendly => self.metrics.cpu_friendly_encodes += 1,
1560                }
1561                row
1562            }
1563            Err(_) => return output,
1564        };
1565
1566        // Store the event in state
1567        let state_key = Self::make_state_key(side, &key_value, event_time);
1568        if ctx.state.put_typed(&state_key, &join_row).is_err() {
1569            return output;
1570        }
1571
1572        // Register cleanup timer
1573        let cleanup_time = event_time + self.time_bound_ms;
1574        let timer_key = Self::make_timer_key(side, &state_key);
1575        ctx.timers
1576            .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
1577
1578        // For outer joins, register unmatched emission timer
1579        if (side == JoinSide::Left && self.join_type.emits_unmatched_left())
1580            || (side == JoinSide::Right && self.join_type.emits_unmatched_right())
1581        {
1582            let unmatched_timer_key = Self::make_unmatched_timer_key(side, &state_key);
1583            ctx.timers.register_timer(
1584                cleanup_time,
1585                Some(unmatched_timer_key),
1586                Some(ctx.operator_index),
1587            );
1588        }
1589
1590        // Build-side pruning - prune entries that can no longer produce matches
1591        if self.build_side_pruning {
1592            self.prune_build_side(side, ctx);
1593        }
1594
1595        // Probe the opposite side for matches
1596        let matches = self.probe_opposite_side(side, &key_value, event_time, ctx.state);
1597
1598        self.process_matches(
1599            matches,
1600            side,
1601            event,
1602            event_time,
1603            &join_row,
1604            &state_key,
1605            ctx,
1606            &mut output,
1607        );
1608
1609        // Emit watermark if generated
1610        if let Some(wm) = emitted_watermark {
1611            output.push(Output::Watermark(wm.timestamp()));
1612        }
1613
1614        output
1615    }
1616
1617    /// Processes matched rows according to the join type (semi, anti, or standard).
1618    #[allow(clippy::too_many_arguments)]
1619    fn process_matches(
1620        &mut self,
1621        matches: SmallVec<[([u8; 28], JoinRow); 4]>,
1622        side: JoinSide,
1623        event: &Event,
1624        event_time: i64,
1625        join_row: &JoinRow,
1626        state_key: &[u8],
1627        ctx: &mut OperatorContext,
1628        output: &mut OutputVec,
1629    ) {
1630        if self.join_type.is_semi() {
1631            // Semi join: emit the kept side's row on first match only
1632            let kept_side = self.join_type.kept_side().unwrap();
1633            for (other_row_key, other_row) in matches {
1634                self.metrics.matches += 1;
1635                let was_other_matched = is_matched(&*ctx.state, &other_row_key);
1636                mark_matched(&mut *ctx.state, &other_row_key);
1637
1638                let was_our_matched = is_matched(&*ctx.state, state_key);
1639                mark_matched(&mut *ctx.state, state_key);
1640
1641                if side == kept_side && !was_our_matched {
1642                    output.push(Output::Event(Event::new(
1643                        event_time,
1644                        RecordBatch::clone(&event.data),
1645                    )));
1646                    break; // One match is enough for semi
1647                } else if side != kept_side && !was_other_matched {
1648                    if let Ok(batch) = other_row.to_batch() {
1649                        output.push(Output::Event(Event::new(other_row.timestamp, batch)));
1650                    }
1651                }
1652            }
1653        } else if self.join_type.is_anti() {
1654            // Anti join: mark rows as matched (suppresses unmatched timer emission)
1655            for (other_row_key, _other_row) in matches {
1656                self.metrics.matches += 1;
1657                mark_matched(&mut *ctx.state, &other_row_key);
1658                mark_matched(&mut *ctx.state, state_key);
1659            }
1660        } else {
1661            // Standard join: emit concatenated left+right rows
1662            let needs_matched_flag =
1663                self.join_type.emits_unmatched_left() || self.join_type.emits_unmatched_right();
1664            for (other_row_key, other_row) in matches {
1665                self.metrics.matches += 1;
1666
1667                // Mark both sides as matched (only needed for outer joins)
1668                if needs_matched_flag {
1669                    mark_matched(&mut *ctx.state, &other_row_key);
1670                    mark_matched(&mut *ctx.state, state_key);
1671                }
1672
1673                // Create joined output
1674                if let Some(joined_event) = self.create_joined_event(
1675                    side,
1676                    join_row,
1677                    &other_row,
1678                    std::cmp::max(event_time, other_row.timestamp),
1679                ) {
1680                    output.push(Output::Event(joined_event));
1681                }
1682            }
1683        }
1684    }
1685
1686    /// Prunes build-side entries that cannot produce future matches.
1687    ///
1688    /// An entry can be pruned if its `timestamp + time_bound < probe_side_watermark`,
1689    /// meaning the probe side has advanced beyond any possible match window.
1690    fn prune_build_side(&mut self, current_side: JoinSide, ctx: &mut OperatorContext) {
1691        let build_side = self.effective_build_side();
1692
1693        // Only prune when processing from probe side
1694        if current_side == build_side {
1695            return;
1696        }
1697
1698        // Get probe side watermark
1699        let probe_watermark = match build_side {
1700            JoinSide::Left => self.right_watermark,
1701            JoinSide::Right => self.left_watermark,
1702        };
1703
1704        if probe_watermark == i64::MIN {
1705            return;
1706        }
1707
1708        // Calculate prune threshold
1709        let prune_threshold = probe_watermark - self.time_bound_ms;
1710        if prune_threshold == i64::MIN {
1711            return;
1712        }
1713
1714        // For inner joins, we can prune more aggressively
1715        if self.join_type == JoinType::Inner {
1716            let prefix = match build_side {
1717                JoinSide::Left => LEFT_STATE_PREFIX,
1718                JoinSide::Right => RIGHT_STATE_PREFIX,
1719            };
1720
1721            // Reuse prune_buffer to avoid per-call allocation
1722            self.prune_buffer.clear();
1723            let time_bound = self.time_bound_ms;
1724            for (key, value) in ctx.state.prefix_scan(prefix) {
1725                if self.prune_buffer.len() >= 100 {
1726                    break; // Limit per-event pruning to avoid latency spikes
1727                }
1728                // Try to get timestamp from the key (bytes 12-20)
1729                if key.len() >= 20 {
1730                    if let Ok(ts_bytes) = <[u8; 8]>::try_from(&key[12..20]) {
1731                        let timestamp = i64::from_be_bytes(ts_bytes);
1732                        if timestamp + time_bound < prune_threshold {
1733                            // Also verify via deserialization
1734                            if let Ok(row) =
1735                                rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1736                                    .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1737                            {
1738                                if row.timestamp + time_bound < prune_threshold {
1739                                    self.prune_buffer.push(key);
1740                                }
1741                            }
1742                        }
1743                    }
1744                }
1745            }
1746
1747            for key in &self.prune_buffer {
1748                if ctx.state.delete(key).is_ok() {
1749                    let _ = ctx.state.delete(&matched_flag_key(key));
1750                    self.metrics.build_side_prunes += 1;
1751                }
1752            }
1753        }
1754    }
1755
1756    /// Scans for idle keys and cleans them up aggressively.
1757    ///
1758    /// Called periodically (e.g., on timer) to identify keys with no recent
1759    /// activity and remove their state entries.
1760    pub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext) {
1761        if !self.per_key_tracking {
1762            return;
1763        }
1764
1765        let threshold = ctx.processing_time - self.key_idle_threshold_ms;
1766
1767        // Find idle keys
1768        let idle_keys: Vec<u64> = self
1769            .key_metadata
1770            .iter()
1771            .filter(|(_, meta)| meta.last_activity < threshold && meta.state_entries == 0)
1772            .map(|(k, _)| *k)
1773            .collect();
1774
1775        // Remove idle key metadata
1776        for key_hash in idle_keys {
1777            self.key_metadata.remove(&key_hash);
1778            self.metrics.idle_key_cleanups += 1;
1779        }
1780
1781        self.metrics.tracked_keys = self.key_metadata.len() as u64;
1782    }
1783
1784    /// Checks if compaction should be skipped for a side due to asymmetric optimization.
1785    #[must_use]
1786    pub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool {
1787        if !self.asymmetric_compaction {
1788            return false;
1789        }
1790
1791        // Skip compaction if side is idle
1792        let is_idle = match side {
1793            JoinSide::Left => self
1794                .left_stats
1795                .is_idle(current_time, self.idle_threshold_ms),
1796            JoinSide::Right => self
1797                .right_stats
1798                .is_idle(current_time, self.idle_threshold_ms),
1799        };
1800
1801        if is_idle {
1802            // Note: metrics update happens in the caller
1803            true
1804        } else {
1805            false
1806        }
1807    }
1808
1809    /// Extracts the join key value from a record batch.
1810    ///
1811    /// Caches the column index on first call to avoid per-event schema lookups.
1812    fn extract_key(
1813        batch: &RecordBatch,
1814        column_name: &str,
1815        cached_index: &mut Option<usize>,
1816    ) -> Option<SmallVec<[u8; 16]>> {
1817        let column_index = if let Some(idx) = *cached_index {
1818            idx
1819        } else {
1820            let idx = batch.schema().index_of(column_name).ok()?;
1821            *cached_index = Some(idx);
1822            idx
1823        };
1824        let column = batch.column(column_index);
1825
1826        // Handle different column types
1827        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
1828            if string_array.is_empty() || string_array.is_null(0) {
1829                return None;
1830            }
1831            return Some(SmallVec::from_slice(string_array.value(0).as_bytes()));
1832        }
1833
1834        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
1835            if int_array.is_empty() || int_array.is_null(0) {
1836                return None;
1837            }
1838            return Some(SmallVec::from_slice(&int_array.value(0).to_le_bytes()));
1839        }
1840
1841        // For other types, use the raw bytes if available
1842        // This is a fallback - in practice, keys should be string or integer
1843        None
1844    }
1845
1846    /// Creates a state key for storing a join row.
1847    #[allow(clippy::cast_sign_loss)]
1848    fn make_state_key(side: JoinSide, key_value: &[u8], timestamp: i64) -> [u8; 28] {
1849        let prefix = match side {
1850            JoinSide::Left => LEFT_STATE_PREFIX,
1851            JoinSide::Right => RIGHT_STATE_PREFIX,
1852        };
1853
1854        let event_id = EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
1855
1856        // Key format: prefix (4) + key_hash (8) + timestamp (8) + event_id (8) = 28 bytes
1857        let mut key = [0u8; 28];
1858        key[..4].copy_from_slice(prefix);
1859
1860        // Use FxHash for the key value
1861        let key_hash = {
1862            let mut hasher = rustc_hash::FxHasher::default();
1863            key_value.hash(&mut hasher);
1864            hasher.finish()
1865        };
1866        key[4..12].copy_from_slice(&key_hash.to_be_bytes());
1867        key[12..20].copy_from_slice(&timestamp.to_be_bytes());
1868        key[20..28].copy_from_slice(&event_id.to_be_bytes());
1869
1870        key
1871    }
1872
1873    /// Creates a timer key for cleanup.
1874    fn make_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1875        let prefix = match side {
1876            JoinSide::Left => LEFT_TIMER_PREFIX,
1877            JoinSide::Right => RIGHT_TIMER_PREFIX,
1878        };
1879
1880        let mut key = TimerKey::new();
1881        key.push(prefix);
1882        key.extend_from_slice(state_key);
1883        key
1884    }
1885
1886    /// Creates a timer key for unmatched event emission.
1887    fn make_unmatched_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1888        let side_byte = match side {
1889            JoinSide::Left => 0x01,
1890            JoinSide::Right => 0x02,
1891        };
1892
1893        let mut key = TimerKey::new();
1894        key.push(UNMATCHED_TIMER_PREFIX);
1895        key.push(side_byte);
1896        key.extend_from_slice(state_key);
1897        key
1898    }
1899
1900    /// Probes the opposite side for matching events.
1901    fn probe_opposite_side(
1902        &self,
1903        current_side: JoinSide,
1904        key_value: &[u8],
1905        timestamp: i64,
1906        state: &dyn StateStore,
1907    ) -> SmallVec<[([u8; 28], JoinRow); 4]> {
1908        let mut matches = SmallVec::new();
1909
1910        let prefix = match current_side {
1911            JoinSide::Left => RIGHT_STATE_PREFIX,
1912            JoinSide::Right => LEFT_STATE_PREFIX,
1913        };
1914
1915        // Build prefix for scanning: prefix + key_hash
1916        let key_hash = {
1917            let mut hasher = rustc_hash::FxHasher::default();
1918            key_value.hash(&mut hasher);
1919            hasher.finish()
1920        };
1921        let mut scan_prefix = [0u8; 12];
1922        scan_prefix[..4].copy_from_slice(prefix);
1923        scan_prefix[4..12].copy_from_slice(&key_hash.to_be_bytes());
1924
1925        // Scan for matching keys
1926        for (state_key, value) in state.prefix_scan(&scan_prefix) {
1927            // Deserialize the join row
1928            let Ok(row) = rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1929                .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1930            else {
1931                continue;
1932            };
1933
1934            // Check if timestamps are within time bound
1935            let time_diff = (timestamp - row.timestamp).abs();
1936            if time_diff <= self.time_bound_ms {
1937                // Verify key matches (in case of hash collision)
1938                if row.key_value == key_value {
1939                    let mut key_buf = [0u8; 28];
1940                    if state_key.len() == 28 {
1941                        key_buf.copy_from_slice(&state_key);
1942                    }
1943                    matches.push((key_buf, row));
1944                }
1945            }
1946        }
1947
1948        matches
1949    }
1950
1951    /// Creates a joined event from two matching rows.
1952    fn create_joined_event(
1953        &self,
1954        current_side: JoinSide,
1955        current_row: &JoinRow,
1956        other_row: &JoinRow,
1957        output_timestamp: i64,
1958    ) -> Option<Event> {
1959        let (left_row, right_row) = match current_side {
1960            JoinSide::Left => (current_row, other_row),
1961            JoinSide::Right => (other_row, current_row),
1962        };
1963
1964        let left_batch = left_row.to_batch().ok()?;
1965        let right_batch = right_row.to_batch().ok()?;
1966
1967        let joined_batch = self.concat_batches(&left_batch, &right_batch)?;
1968
1969        Some(Event::new(output_timestamp, joined_batch))
1970    }
1971
1972    /// Concatenates two batches horizontally.
1973    fn concat_batches(&self, left: &RecordBatch, right: &RecordBatch) -> Option<RecordBatch> {
1974        let schema = self.output_schema.as_ref()?;
1975
1976        let mut columns: Vec<ArrayRef> =
1977            Vec::with_capacity(left.num_columns() + right.num_columns());
1978        columns.extend(left.columns().iter().cloned());
1979        columns.extend(right.columns().iter().cloned());
1980
1981        RecordBatch::try_new(Arc::clone(schema), columns).ok()
1982    }
1983
1984    /// Creates an unmatched event for outer joins.
1985    fn create_unmatched_event(&self, side: JoinSide, row: &JoinRow) -> Option<Event> {
1986        let batch = row.to_batch().ok()?;
1987        let schema = self.output_schema.as_ref()?;
1988
1989        let num_rows = batch.num_rows();
1990        let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1991
1992        match side {
1993            JoinSide::Left => {
1994                // Left columns are populated, right columns are null
1995                columns.extend(batch.columns().iter().cloned());
1996
1997                // Add null columns for right side
1998                if let Some(right_schema) = &self.right_schema {
1999                    for field in right_schema.fields() {
2000                        columns.push(Self::create_null_array(field.data_type(), num_rows));
2001                    }
2002                }
2003            }
2004            JoinSide::Right => {
2005                // Left columns are null, right columns are populated
2006                if let Some(left_schema) = &self.left_schema {
2007                    for field in left_schema.fields() {
2008                        columns.push(Self::create_null_array(field.data_type(), num_rows));
2009                    }
2010                }
2011
2012                columns.extend(batch.columns().iter().cloned());
2013            }
2014        }
2015
2016        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
2017
2018        Some(Event::new(row.timestamp, joined_batch))
2019    }
2020
2021    /// Creates a null array of the given type and length.
2022    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
2023        match data_type {
2024            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
2025            // Default to Int64 for all other types (add more specific cases as needed)
2026            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
2027        }
2028    }
2029
2030    /// Handles cleanup timer expiration.
2031    fn handle_cleanup_timer(
2032        &mut self,
2033        side: JoinSide,
2034        state_key: &[u8],
2035        ctx: &mut OperatorContext,
2036    ) -> OutputVec {
2037        let output = OutputVec::new();
2038
2039        // Check asymmetric compaction - skip if side is idle
2040        if self.should_skip_compaction(side, ctx.processing_time) {
2041            self.metrics.asymmetric_skips += 1;
2042            // Don't skip the actual cleanup, but could be used for compaction
2043        }
2044
2045        // Update per-key metadata before deleting
2046        if self.per_key_tracking && state_key.len() >= 12 {
2047            // Extract key hash from state key (bytes 4-12)
2048            if let Ok(hash_bytes) = state_key[4..12].try_into() {
2049                let key_hash = u64::from_be_bytes(hash_bytes);
2050                if let Some(meta) = self.key_metadata.get_mut(&key_hash) {
2051                    meta.decrement_entries();
2052                }
2053            }
2054        }
2055
2056        // For outer/anti joins, defer state deletion to the unmatched timer handler
2057        // so the row is still readable when the unmatched timer fires.
2058        let defer_cleanup = match side {
2059            JoinSide::Left => self.join_type.emits_unmatched_left(),
2060            JoinSide::Right => self.join_type.emits_unmatched_right(),
2061        };
2062        if !defer_cleanup && ctx.state.delete(state_key).is_ok() {
2063            let _ = ctx.state.delete(&matched_flag_key(state_key));
2064            self.metrics.state_cleanups += 1;
2065        }
2066
2067        output
2068    }
2069
2070    /// Handles unmatched timer expiration for outer/anti joins.
2071    fn handle_unmatched_timer(
2072        &mut self,
2073        side: JoinSide,
2074        state_key: &[u8],
2075        ctx: &mut OperatorContext,
2076    ) -> OutputVec {
2077        let mut output = OutputVec::new();
2078
2079        // Get the join row
2080        let Ok(Some(row)) = ctx.state.get_typed::<JoinRow>(state_key) else {
2081            return output;
2082        };
2083
2084        // Only emit if not matched (check separate flag key, not JoinRow field)
2085        if !is_matched(&*ctx.state, state_key) {
2086            match side {
2087                JoinSide::Left if self.join_type.emits_unmatched_left() => {
2088                    self.metrics.unmatched_left += 1;
2089                    if self.join_type.is_anti() {
2090                        // Anti join: emit just the kept side's data
2091                        if let Ok(batch) = row.to_batch() {
2092                            output.push(Output::Event(Event::new(row.timestamp, batch)));
2093                        }
2094                    } else if let Some(event) = self.create_unmatched_event(side, &row) {
2095                        output.push(Output::Event(event));
2096                    }
2097                }
2098                JoinSide::Right if self.join_type.emits_unmatched_right() => {
2099                    self.metrics.unmatched_right += 1;
2100                    if self.join_type.is_anti() {
2101                        // Anti join: emit just the kept side's data
2102                        if let Ok(batch) = row.to_batch() {
2103                            output.push(Output::Event(Event::new(row.timestamp, batch)));
2104                        }
2105                    } else if let Some(event) = self.create_unmatched_event(side, &row) {
2106                        output.push(Output::Event(event));
2107                    }
2108                }
2109                _ => {}
2110            }
2111        }
2112
2113        // Clean up state (deferred from cleanup timer for outer/anti joins)
2114        if ctx.state.delete(state_key).is_ok() {
2115            let _ = ctx.state.delete(&matched_flag_key(state_key));
2116            self.metrics.state_cleanups += 1;
2117        }
2118
2119        output
2120    }
2121
2122    /// Parses a timer key to determine its type and extract the state key.
2123    fn parse_timer_key(key: &[u8]) -> Option<(TimerKeyType, JoinSide, Vec<u8>)> {
2124        if key.is_empty() {
2125            return None;
2126        }
2127
2128        match key[0] {
2129            LEFT_TIMER_PREFIX => {
2130                let state_key = key[1..].to_vec();
2131                Some((TimerKeyType::Cleanup, JoinSide::Left, state_key))
2132            }
2133            RIGHT_TIMER_PREFIX => {
2134                let state_key = key[1..].to_vec();
2135                Some((TimerKeyType::Cleanup, JoinSide::Right, state_key))
2136            }
2137            UNMATCHED_TIMER_PREFIX => {
2138                if key.len() < 2 {
2139                    return None;
2140                }
2141                let side = match key[1] {
2142                    0x01 => JoinSide::Left,
2143                    0x02 => JoinSide::Right,
2144                    _ => return None,
2145                };
2146                let state_key = key[2..].to_vec();
2147                Some((TimerKeyType::Unmatched, side, state_key))
2148            }
2149            _ => None,
2150        }
2151    }
2152}
2153
2154/// Type of timer key.
2155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2156enum TimerKeyType {
2157    /// Cleanup timer - delete state entry.
2158    Cleanup,
2159    /// Unmatched timer - emit unmatched event for outer joins.
2160    Unmatched,
2161}
2162
2163impl Operator for StreamJoinOperator {
2164    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
2165        // Default to left side - in practice, users should call process_side directly
2166        self.process_left(event, ctx)
2167    }
2168
2169    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
2170        let Some((timer_type, side, state_key)) = Self::parse_timer_key(&timer.key) else {
2171            return OutputVec::new();
2172        };
2173
2174        match timer_type {
2175            TimerKeyType::Cleanup => self.handle_cleanup_timer(side, &state_key, ctx),
2176            TimerKeyType::Unmatched => self.handle_unmatched_timer(side, &state_key, ctx),
2177        }
2178    }
2179
2180    fn checkpoint(&self) -> OperatorState {
2181        // Checkpoint the metrics, configuration, and optimization state
2182        // Use nested tuples to stay within rkyv's tuple size limit (max 12)
2183        // Format: ((config), (core_metrics), (f057_metrics, side_stats))
2184        let checkpoint_data = (
2185            // Part 1: Configuration (3 elements)
2186            (
2187                self.left_key_column.clone(),
2188                self.right_key_column.clone(),
2189                self.time_bound_ms,
2190            ),
2191            // Part 2: Core metrics (3 elements)
2192            (
2193                self.metrics.left_events,
2194                self.metrics.right_events,
2195                self.metrics.matches,
2196            ),
2197            // Part 3: optimization metrics (5 elements)
2198            (
2199                self.metrics.cpu_friendly_encodes,
2200                self.metrics.compact_encodes,
2201                self.metrics.asymmetric_skips,
2202                self.metrics.idle_key_cleanups,
2203                self.metrics.build_side_prunes,
2204            ),
2205            // Part 4: side stats (4 elements)
2206            (
2207                self.left_stats.events_received,
2208                self.right_stats.events_received,
2209                self.left_watermark,
2210                self.right_watermark,
2211            ),
2212        );
2213
2214        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
2215            .map(|v| v.to_vec())
2216            .unwrap_or_default();
2217
2218        OperatorState {
2219            operator_id: self.operator_id.clone(),
2220            data,
2221        }
2222    }
2223
2224    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
2225        // Extended checkpoint data type with optimization fields using nested tuples
2226        type CheckpointData = (
2227            (String, String, i64),     // config
2228            (u64, u64, u64),           // core metrics
2229            (u64, u64, u64, u64, u64), // optimization metrics
2230            (u64, u64, i64, i64),      // side stats
2231        );
2232        // Legacy checkpoint type for backward compatibility
2233        type LegacyCheckpointData = (String, String, i64, u64, u64, u64);
2234
2235        if state.operator_id != self.operator_id {
2236            return Err(OperatorError::StateAccessFailed(format!(
2237                "Operator ID mismatch: expected {}, got {}",
2238                self.operator_id, state.operator_id
2239            )));
2240        }
2241
2242        // Try to restore full optimization checkpoint first
2243        if let Ok(archived) = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
2244        {
2245            if let Ok(data) = rkyv::deserialize::<CheckpointData, RkyvError>(archived) {
2246                let (
2247                    _config,
2248                    (left_events, right_events, matches),
2249                    (
2250                        cpu_friendly_encodes,
2251                        compact_encodes,
2252                        asymmetric_skips,
2253                        idle_key_cleanups,
2254                        build_side_prunes,
2255                    ),
2256                    (left_received, right_received, left_wm, right_wm),
2257                ) = data;
2258
2259                self.metrics.left_events = left_events;
2260                self.metrics.right_events = right_events;
2261                self.metrics.matches = matches;
2262                self.metrics.cpu_friendly_encodes = cpu_friendly_encodes;
2263                self.metrics.compact_encodes = compact_encodes;
2264                self.metrics.asymmetric_skips = asymmetric_skips;
2265                self.metrics.idle_key_cleanups = idle_key_cleanups;
2266                self.metrics.build_side_prunes = build_side_prunes;
2267                self.left_stats.events_received = left_received;
2268                self.right_stats.events_received = right_received;
2269                self.left_watermark = left_wm;
2270                self.right_watermark = right_wm;
2271
2272                return Ok(());
2273            }
2274        }
2275
2276        // Fall back to legacy checkpoint format
2277        let archived = rkyv::access::<rkyv::Archived<LegacyCheckpointData>, RkyvError>(&state.data)
2278            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2279        let (_, _, _, left_events, right_events, matches) =
2280            rkyv::deserialize::<LegacyCheckpointData, RkyvError>(archived)
2281                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2282
2283        self.metrics.left_events = left_events;
2284        self.metrics.right_events = right_events;
2285        self.metrics.matches = matches;
2286
2287        Ok(())
2288    }
2289}
2290
2291#[cfg(test)]
2292#[allow(clippy::cast_possible_wrap)]
2293mod tests {
2294    use super::*;
2295    use crate::state::InMemoryStore;
2296    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
2297    use arrow_array::{Int64Array, StringArray};
2298    use arrow_schema::{DataType, Field, Schema};
2299
2300    fn create_order_event(timestamp: i64, order_id: &str, amount: i64) -> Event {
2301        let schema = Arc::new(Schema::new(vec![
2302            Field::new("order_id", DataType::Utf8, false),
2303            Field::new("amount", DataType::Int64, false),
2304        ]));
2305        let batch = RecordBatch::try_new(
2306            schema,
2307            vec![
2308                Arc::new(StringArray::from(vec![order_id])),
2309                Arc::new(Int64Array::from(vec![amount])),
2310            ],
2311        )
2312        .unwrap();
2313        Event::new(timestamp, batch)
2314    }
2315
2316    fn create_payment_event(timestamp: i64, order_id: &str, status: &str) -> Event {
2317        let schema = Arc::new(Schema::new(vec![
2318            Field::new("order_id", DataType::Utf8, false),
2319            Field::new("status", DataType::Utf8, false),
2320        ]));
2321        let batch = RecordBatch::try_new(
2322            schema,
2323            vec![
2324                Arc::new(StringArray::from(vec![order_id])),
2325                Arc::new(StringArray::from(vec![status])),
2326            ],
2327        )
2328        .unwrap();
2329        Event::new(timestamp, batch)
2330    }
2331
2332    fn create_test_context<'a>(
2333        timers: &'a mut TimerService,
2334        state: &'a mut dyn StateStore,
2335        watermark_gen: &'a mut dyn WatermarkGenerator,
2336    ) -> OperatorContext<'a> {
2337        OperatorContext {
2338            event_time: 0,
2339            processing_time: 0,
2340            timers,
2341            state,
2342            watermark_generator: watermark_gen,
2343            operator_index: 0,
2344        }
2345    }
2346
2347    #[test]
2348    fn test_join_type_properties() {
2349        assert!(!JoinType::Inner.emits_unmatched_left());
2350        assert!(!JoinType::Inner.emits_unmatched_right());
2351
2352        assert!(JoinType::Left.emits_unmatched_left());
2353        assert!(!JoinType::Left.emits_unmatched_right());
2354
2355        assert!(!JoinType::Right.emits_unmatched_left());
2356        assert!(JoinType::Right.emits_unmatched_right());
2357
2358        assert!(JoinType::Full.emits_unmatched_left());
2359        assert!(JoinType::Full.emits_unmatched_right());
2360
2361        // Semi joins: never emit unmatched
2362        assert!(!JoinType::LeftSemi.emits_unmatched_left());
2363        assert!(!JoinType::LeftSemi.emits_unmatched_right());
2364        assert!(!JoinType::RightSemi.emits_unmatched_left());
2365        assert!(!JoinType::RightSemi.emits_unmatched_right());
2366
2367        // Anti joins: emit unmatched for the kept side only
2368        assert!(JoinType::LeftAnti.emits_unmatched_left());
2369        assert!(!JoinType::LeftAnti.emits_unmatched_right());
2370        assert!(!JoinType::RightAnti.emits_unmatched_left());
2371        assert!(JoinType::RightAnti.emits_unmatched_right());
2372
2373        // Semi/Anti helpers
2374        assert!(JoinType::LeftSemi.is_semi());
2375        assert!(JoinType::RightSemi.is_semi());
2376        assert!(!JoinType::LeftAnti.is_semi());
2377        assert!(JoinType::LeftAnti.is_anti());
2378        assert!(JoinType::RightAnti.is_anti());
2379        assert!(!JoinType::Inner.is_semi());
2380        assert!(!JoinType::Inner.is_anti());
2381
2382        // Kept side
2383        assert_eq!(JoinType::LeftSemi.kept_side(), Some(JoinSide::Left));
2384        assert_eq!(JoinType::LeftAnti.kept_side(), Some(JoinSide::Left));
2385        assert_eq!(JoinType::RightSemi.kept_side(), Some(JoinSide::Right));
2386        assert_eq!(JoinType::RightAnti.kept_side(), Some(JoinSide::Right));
2387        assert_eq!(JoinType::Inner.kept_side(), None);
2388    }
2389
2390    #[test]
2391    fn test_left_semi_join_only_emits_matched() {
2392        let mut operator = StreamJoinOperator::with_id(
2393            "order_id".to_string(),
2394            "order_id".to_string(),
2395            Duration::from_secs(3600),
2396            JoinType::LeftSemi,
2397            "test_semi".to_string(),
2398        );
2399
2400        let mut timers = TimerService::new();
2401        let mut state = InMemoryStore::new();
2402        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2403
2404        // Left event (order) — no match yet
2405        let order = create_order_event(1000, "order_1", 100);
2406        {
2407            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2408            let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2409            let events: Vec<_> = outputs
2410                .iter()
2411                .filter(|o| matches!(o, Output::Event(_)))
2412                .collect();
2413            assert_eq!(events.len(), 0, "semi join should not emit without match");
2414        }
2415
2416        // Right event (payment) — match found, should emit left row
2417        let payment = create_payment_event(2000, "order_1", "paid");
2418        let outputs = {
2419            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2420            operator.process_side(&payment, JoinSide::Right, &mut ctx)
2421        };
2422        let events: Vec<_> = outputs
2423            .iter()
2424            .filter_map(|o| match o {
2425                Output::Event(e) => Some(e),
2426                _ => None,
2427            })
2428            .collect();
2429        assert_eq!(events.len(), 1, "semi join should emit left row on match");
2430        // Output should only have left columns (order_id, amount) — NOT concatenated
2431        assert_eq!(events[0].data.num_columns(), 2);
2432        assert_eq!(events[0].data.schema().field(0).name(), "order_id");
2433        assert_eq!(events[0].data.schema().field(1).name(), "amount");
2434
2435        // Second right match for same left — should NOT emit again
2436        let payment2 = create_payment_event(2500, "order_1", "refunded");
2437        {
2438            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2439            let outputs = operator.process_side(&payment2, JoinSide::Right, &mut ctx);
2440            let events: Vec<_> = outputs
2441                .iter()
2442                .filter(|o| matches!(o, Output::Event(_)))
2443                .collect();
2444            assert_eq!(events.len(), 0, "semi join should not emit duplicate");
2445        }
2446    }
2447
2448    #[test]
2449    fn test_left_anti_join_only_emits_unmatched() {
2450        let mut operator = StreamJoinOperator::with_id(
2451            "order_id".to_string(),
2452            "order_id".to_string(),
2453            Duration::from_secs(3600),
2454            JoinType::LeftAnti,
2455            "test_anti".to_string(),
2456        );
2457
2458        let mut timers = TimerService::new();
2459        let mut state = InMemoryStore::new();
2460        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2461
2462        // Left event for a key that WILL have a match (should NOT be emitted)
2463        let matched_order = create_order_event(1000, "order_1", 100);
2464        {
2465            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2466            let outputs = operator.process_side(&matched_order, JoinSide::Left, &mut ctx);
2467            let events: Vec<_> = outputs
2468                .iter()
2469                .filter(|o| matches!(o, Output::Event(_)))
2470                .collect();
2471            assert_eq!(events.len(), 0, "anti join should not emit on insert");
2472        }
2473
2474        // Left event for a key that will NOT have a match (should be emitted at timer)
2475        let unmatched_order = create_order_event(1100, "order_2", 200);
2476        {
2477            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2478            let outputs = operator.process_side(&unmatched_order, JoinSide::Left, &mut ctx);
2479            let events: Vec<_> = outputs
2480                .iter()
2481                .filter(|o| matches!(o, Output::Event(_)))
2482                .collect();
2483            assert_eq!(events.len(), 0, "anti join should not emit on insert");
2484        }
2485
2486        // Right event matches order_1 — suppresses anti emission
2487        let payment = create_payment_event(2000, "order_1", "paid");
2488        {
2489            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2490            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2491            let events: Vec<_> = outputs
2492                .iter()
2493                .filter(|o| matches!(o, Output::Event(_)))
2494                .collect();
2495            assert_eq!(events.len(), 0, "anti join should never emit on match");
2496        }
2497
2498        // Fire timers — order_2 should be emitted (unmatched), order_1 should NOT
2499        let pending_timers = timers.poll_timers(i64::MAX);
2500        let mut anti_outputs = 0;
2501        for timer_reg in pending_timers {
2502            let timer = Timer {
2503                key: timer_reg.key.unwrap_or_default(),
2504                timestamp: timer_reg.timestamp,
2505            };
2506            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2507            let outputs = operator.on_timer(timer, &mut ctx);
2508            for out in &outputs {
2509                if let Output::Event(e) = out {
2510                    anti_outputs += 1;
2511                    // Should only have left columns
2512                    assert_eq!(e.data.num_columns(), 2);
2513                    assert_eq!(e.data.schema().field(0).name(), "order_id");
2514                }
2515            }
2516        }
2517        assert!(
2518            anti_outputs >= 1,
2519            "anti join should emit unmatched left rows on timer"
2520        );
2521    }
2522
2523    #[test]
2524    fn test_join_operator_creation() {
2525        let operator = StreamJoinOperator::new(
2526            "order_id".to_string(),
2527            "order_id".to_string(),
2528            Duration::from_secs(3600),
2529            JoinType::Inner,
2530        );
2531
2532        assert_eq!(operator.join_type(), JoinType::Inner);
2533        assert_eq!(operator.time_bound_ms(), 3_600_000);
2534    }
2535
2536    #[test]
2537    fn test_join_operator_with_id() {
2538        let operator = StreamJoinOperator::with_id(
2539            "order_id".to_string(),
2540            "order_id".to_string(),
2541            Duration::from_secs(3600),
2542            JoinType::Left,
2543            "test_join".to_string(),
2544        );
2545
2546        assert_eq!(operator.operator_id, "test_join");
2547        assert_eq!(operator.join_type(), JoinType::Left);
2548    }
2549
2550    #[test]
2551    fn test_inner_join_basic() {
2552        let mut operator = StreamJoinOperator::with_id(
2553            "order_id".to_string(),
2554            "order_id".to_string(),
2555            Duration::from_secs(3600),
2556            JoinType::Inner,
2557            "test_join".to_string(),
2558        );
2559
2560        let mut timers = TimerService::new();
2561        let mut state = InMemoryStore::new();
2562        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2563
2564        // Process left event (order)
2565        let order = create_order_event(1000, "order_1", 100);
2566        {
2567            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2568            let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2569            // No match yet, should produce no output
2570            assert!(
2571                outputs
2572                    .iter()
2573                    .filter(|o| matches!(o, Output::Event(_)))
2574                    .count()
2575                    == 0
2576            );
2577        }
2578
2579        // Process right event (payment) - should produce a match
2580        let payment = create_payment_event(2000, "order_1", "paid");
2581        {
2582            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2583            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2584            // Should have one match
2585            assert_eq!(
2586                outputs
2587                    .iter()
2588                    .filter(|o| matches!(o, Output::Event(_)))
2589                    .count(),
2590                1
2591            );
2592        }
2593
2594        assert_eq!(operator.metrics().matches, 1);
2595        assert_eq!(operator.metrics().left_events, 1);
2596        assert_eq!(operator.metrics().right_events, 1);
2597    }
2598
2599    #[test]
2600    fn test_inner_join_no_match_different_key() {
2601        let mut operator = StreamJoinOperator::with_id(
2602            "order_id".to_string(),
2603            "order_id".to_string(),
2604            Duration::from_secs(3600),
2605            JoinType::Inner,
2606            "test_join".to_string(),
2607        );
2608
2609        let mut timers = TimerService::new();
2610        let mut state = InMemoryStore::new();
2611        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2612
2613        // Process left event
2614        let order = create_order_event(1000, "order_1", 100);
2615        {
2616            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2617            operator.process_side(&order, JoinSide::Left, &mut ctx);
2618        }
2619
2620        // Process right event with different key
2621        let payment = create_payment_event(2000, "order_2", "paid");
2622        {
2623            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2624            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2625            // No match due to different key
2626            assert_eq!(
2627                outputs
2628                    .iter()
2629                    .filter(|o| matches!(o, Output::Event(_)))
2630                    .count(),
2631                0
2632            );
2633        }
2634
2635        assert_eq!(operator.metrics().matches, 0);
2636    }
2637
2638    #[test]
2639    fn test_inner_join_no_match_outside_time_bound() {
2640        let mut operator = StreamJoinOperator::with_id(
2641            "order_id".to_string(),
2642            "order_id".to_string(),
2643            Duration::from_secs(1), // 1 second time bound
2644            JoinType::Inner,
2645            "test_join".to_string(),
2646        );
2647
2648        let mut timers = TimerService::new();
2649        let mut state = InMemoryStore::new();
2650        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2651
2652        // Process left event at t=1000
2653        let order = create_order_event(1000, "order_1", 100);
2654        {
2655            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2656            operator.process_side(&order, JoinSide::Left, &mut ctx);
2657        }
2658
2659        // Process right event at t=5000 (4 seconds later, outside 1s bound)
2660        let payment = create_payment_event(5000, "order_1", "paid");
2661        {
2662            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2663            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2664            // No match due to time bound
2665            assert_eq!(
2666                outputs
2667                    .iter()
2668                    .filter(|o| matches!(o, Output::Event(_)))
2669                    .count(),
2670                0
2671            );
2672        }
2673
2674        assert_eq!(operator.metrics().matches, 0);
2675    }
2676
2677    #[test]
2678    fn test_join_multiple_matches() {
2679        let mut operator = StreamJoinOperator::with_id(
2680            "order_id".to_string(),
2681            "order_id".to_string(),
2682            Duration::from_secs(3600),
2683            JoinType::Inner,
2684            "test_join".to_string(),
2685        );
2686
2687        let mut timers = TimerService::new();
2688        let mut state = InMemoryStore::new();
2689        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2690
2691        // Process two left events with same key
2692        for ts in [1000, 2000] {
2693            let order = create_order_event(ts, "order_1", 100);
2694            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2695            operator.process_side(&order, JoinSide::Left, &mut ctx);
2696        }
2697
2698        // Process right event - should match both
2699        let payment = create_payment_event(1500, "order_1", "paid");
2700        {
2701            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2702            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2703            // Should have two matches
2704            assert_eq!(
2705                outputs
2706                    .iter()
2707                    .filter(|o| matches!(o, Output::Event(_)))
2708                    .count(),
2709                2
2710            );
2711        }
2712
2713        assert_eq!(operator.metrics().matches, 2);
2714    }
2715
2716    #[test]
2717    fn test_join_late_event() {
2718        let mut operator = StreamJoinOperator::with_id(
2719            "order_id".to_string(),
2720            "order_id".to_string(),
2721            Duration::from_secs(1),
2722            JoinType::Inner,
2723            "test_join".to_string(),
2724        );
2725
2726        let mut timers = TimerService::new();
2727        let mut state = InMemoryStore::new();
2728        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2729
2730        // Advance watermark significantly
2731        let future_order = create_order_event(10000, "order_2", 200);
2732        {
2733            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2734            operator.process_side(&future_order, JoinSide::Left, &mut ctx);
2735        }
2736
2737        // Process very late event
2738        let late_payment = create_payment_event(100, "order_1", "paid");
2739        {
2740            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2741            let outputs = operator.process_side(&late_payment, JoinSide::Right, &mut ctx);
2742            // Should be marked as late
2743            assert!(outputs.iter().any(|o| matches!(o, Output::LateEvent(_))));
2744        }
2745
2746        assert_eq!(operator.metrics().late_events, 1);
2747    }
2748
2749    #[test]
2750    fn test_join_row_serialization() {
2751        let schema = Arc::new(Schema::new(vec![
2752            Field::new("id", DataType::Utf8, false),
2753            Field::new("value", DataType::Int64, false),
2754        ]));
2755        let batch = RecordBatch::try_new(
2756            schema,
2757            vec![
2758                Arc::new(StringArray::from(vec!["test"])),
2759                Arc::new(Int64Array::from(vec![42])),
2760            ],
2761        )
2762        .unwrap();
2763
2764        let row = JoinRow::new(1000, b"key".to_vec(), &batch).unwrap();
2765
2766        // Verify we can deserialize back
2767        let restored_batch = row.to_batch().unwrap();
2768        assert_eq!(restored_batch.num_rows(), 1);
2769        assert_eq!(restored_batch.num_columns(), 2);
2770    }
2771
2772    #[test]
2773    fn test_cleanup_timer() {
2774        let mut operator = StreamJoinOperator::with_id(
2775            "order_id".to_string(),
2776            "order_id".to_string(),
2777            Duration::from_secs(1),
2778            JoinType::Inner,
2779            "test_join".to_string(),
2780        );
2781
2782        let mut timers = TimerService::new();
2783        let mut state = InMemoryStore::new();
2784        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2785
2786        // Process an event
2787        let order = create_order_event(1000, "order_1", 100);
2788        {
2789            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2790            operator.process_side(&order, JoinSide::Left, &mut ctx);
2791        }
2792
2793        // State should have one entry
2794        assert!(state.len() > 0);
2795        let initial_state_len = state.len();
2796
2797        // Get the registered timers and fire them
2798        let registered_timers = timers.poll_timers(2001); // After cleanup time
2799        assert!(!registered_timers.is_empty());
2800
2801        // Fire the cleanup timer - convert TimerRegistration to Timer
2802        for timer_reg in registered_timers {
2803            let timer = Timer {
2804                key: timer_reg.key.unwrap_or_default(),
2805                timestamp: timer_reg.timestamp,
2806            };
2807            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2808            operator.on_timer(timer, &mut ctx);
2809        }
2810
2811        // Verify cleanup happened (state decreased)
2812        assert!(state.len() < initial_state_len || operator.metrics().state_cleanups > 0);
2813    }
2814
2815    #[test]
2816    fn test_checkpoint_restore() {
2817        let mut operator = StreamJoinOperator::with_id(
2818            "order_id".to_string(),
2819            "order_id".to_string(),
2820            Duration::from_secs(3600),
2821            JoinType::Inner,
2822            "test_join".to_string(),
2823        );
2824
2825        // Simulate some activity
2826        operator.metrics.left_events = 10;
2827        operator.metrics.right_events = 5;
2828        operator.metrics.matches = 3;
2829
2830        // Checkpoint
2831        let checkpoint = operator.checkpoint();
2832
2833        // Create new operator and restore
2834        let mut restored = StreamJoinOperator::with_id(
2835            "order_id".to_string(),
2836            "order_id".to_string(),
2837            Duration::from_secs(3600),
2838            JoinType::Inner,
2839            "test_join".to_string(),
2840        );
2841
2842        restored.restore(checkpoint).unwrap();
2843
2844        assert_eq!(restored.metrics().left_events, 10);
2845        assert_eq!(restored.metrics().right_events, 5);
2846        assert_eq!(restored.metrics().matches, 3);
2847    }
2848
2849    #[test]
2850    fn test_metrics_reset() {
2851        let mut operator = StreamJoinOperator::new(
2852            "order_id".to_string(),
2853            "order_id".to_string(),
2854            Duration::from_secs(3600),
2855            JoinType::Inner,
2856        );
2857
2858        operator.metrics.left_events = 10;
2859        operator.metrics.matches = 5;
2860
2861        operator.reset_metrics();
2862
2863        assert_eq!(operator.metrics().left_events, 0);
2864        assert_eq!(operator.metrics().matches, 0);
2865    }
2866
2867    #[test]
2868    fn test_bidirectional_join() {
2869        let mut operator = StreamJoinOperator::with_id(
2870            "order_id".to_string(),
2871            "order_id".to_string(),
2872            Duration::from_secs(3600),
2873            JoinType::Inner,
2874            "test_join".to_string(),
2875        );
2876
2877        let mut timers = TimerService::new();
2878        let mut state = InMemoryStore::new();
2879        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2880
2881        // Right event arrives first
2882        let payment = create_payment_event(1000, "order_1", "paid");
2883        {
2884            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2885            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2886            assert_eq!(
2887                outputs
2888                    .iter()
2889                    .filter(|o| matches!(o, Output::Event(_)))
2890                    .count(),
2891                0
2892            );
2893        }
2894
2895        // Left event arrives and matches
2896        let order = create_order_event(1500, "order_1", 100);
2897        {
2898            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2899            let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2900            assert_eq!(
2901                outputs
2902                    .iter()
2903                    .filter(|o| matches!(o, Output::Event(_)))
2904                    .count(),
2905                1
2906            );
2907        }
2908
2909        assert_eq!(operator.metrics().matches, 1);
2910    }
2911
2912    #[test]
2913    fn test_integer_key_join() {
2914        fn create_int_key_event(timestamp: i64, key: i64, value: i64) -> Event {
2915            let schema = Arc::new(Schema::new(vec![
2916                Field::new("key", DataType::Int64, false),
2917                Field::new("value", DataType::Int64, false),
2918            ]));
2919            let batch = RecordBatch::try_new(
2920                schema,
2921                vec![
2922                    Arc::new(Int64Array::from(vec![key])),
2923                    Arc::new(Int64Array::from(vec![value])),
2924                ],
2925            )
2926            .unwrap();
2927            Event::new(timestamp, batch)
2928        }
2929
2930        let mut operator = StreamJoinOperator::with_id(
2931            "key".to_string(),
2932            "key".to_string(),
2933            Duration::from_secs(3600),
2934            JoinType::Inner,
2935            "test_join".to_string(),
2936        );
2937
2938        let mut timers = TimerService::new();
2939        let mut state = InMemoryStore::new();
2940        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2941
2942        // Process left with integer key
2943        let left = create_int_key_event(1000, 42, 100);
2944        {
2945            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2946            operator.process_side(&left, JoinSide::Left, &mut ctx);
2947        }
2948
2949        // Process right with same integer key
2950        let right = create_int_key_event(1500, 42, 200);
2951        {
2952            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2953            let outputs = operator.process_side(&right, JoinSide::Right, &mut ctx);
2954            assert_eq!(
2955                outputs
2956                    .iter()
2957                    .filter(|o| matches!(o, Output::Event(_)))
2958                    .count(),
2959                1
2960            );
2961        }
2962
2963        assert_eq!(operator.metrics().matches, 1);
2964    }
2965
2966    // Stream Join Optimization Tests
2967
2968    #[test]
2969    fn test_f057_join_row_encoding_enum() {
2970        assert_eq!(JoinRowEncoding::default(), JoinRowEncoding::Compact);
2971        assert_eq!(format!("{}", JoinRowEncoding::Compact), "compact");
2972        assert_eq!(format!("{}", JoinRowEncoding::CpuFriendly), "cpu_friendly");
2973
2974        assert_eq!(
2975            "compact".parse::<JoinRowEncoding>().unwrap(),
2976            JoinRowEncoding::Compact
2977        );
2978        assert_eq!(
2979            "cpu_friendly".parse::<JoinRowEncoding>().unwrap(),
2980            JoinRowEncoding::CpuFriendly
2981        );
2982        assert_eq!(
2983            "cpu-friendly".parse::<JoinRowEncoding>().unwrap(),
2984            JoinRowEncoding::CpuFriendly
2985        );
2986        assert!("invalid".parse::<JoinRowEncoding>().is_err());
2987    }
2988
2989    #[test]
2990    fn test_f057_config_builder() {
2991        let config = StreamJoinConfig::builder()
2992            .left_key_column("order_id")
2993            .right_key_column("payment_id")
2994            .time_bound(Duration::from_secs(3600))
2995            .join_type(JoinType::Left)
2996            .operator_id("test_join")
2997            .row_encoding(JoinRowEncoding::CpuFriendly)
2998            .asymmetric_compaction(true)
2999            .idle_threshold(Duration::from_secs(120))
3000            .per_key_tracking(true)
3001            .key_idle_threshold(Duration::from_secs(600))
3002            .build_side_pruning(true)
3003            .build_side(JoinSide::Left)
3004            .build()
3005            .unwrap();
3006
3007        assert_eq!(config.left_key_column, "order_id");
3008        assert_eq!(config.right_key_column, "payment_id");
3009        assert_eq!(config.time_bound_ms, 3_600_000);
3010        assert_eq!(config.join_type, JoinType::Left);
3011        assert_eq!(config.operator_id, Some("test_join".to_string()));
3012        assert_eq!(config.row_encoding, JoinRowEncoding::CpuFriendly);
3013        assert!(config.asymmetric_compaction);
3014        assert_eq!(config.idle_threshold_ms, 120_000);
3015        assert!(config.per_key_tracking);
3016        assert_eq!(config.key_idle_threshold_ms, 600_000);
3017        assert!(config.build_side_pruning);
3018        assert_eq!(config.build_side, Some(JoinSide::Left));
3019    }
3020
3021    #[test]
3022    fn test_f057_from_config() {
3023        let config = StreamJoinConfig::builder()
3024            .left_key_column("key")
3025            .right_key_column("key")
3026            .time_bound(Duration::from_secs(60))
3027            .join_type(JoinType::Inner)
3028            .row_encoding(JoinRowEncoding::CpuFriendly)
3029            .build()
3030            .unwrap();
3031
3032        let operator = StreamJoinOperator::from_config(config);
3033
3034        assert_eq!(operator.row_encoding(), JoinRowEncoding::CpuFriendly);
3035        assert!(operator.asymmetric_compaction_enabled());
3036        assert!(operator.per_key_tracking_enabled());
3037    }
3038
3039    #[test]
3040    fn test_f057_cpu_friendly_encoding_roundtrip() {
3041        let schema = Arc::new(Schema::new(vec![
3042            Field::new("id", DataType::Utf8, false),
3043            Field::new("value", DataType::Int64, false),
3044            Field::new("price", DataType::Float64, false),
3045        ]));
3046        let batch = RecordBatch::try_new(
3047            schema,
3048            vec![
3049                Arc::new(StringArray::from(vec!["test_key"])),
3050                Arc::new(Int64Array::from(vec![42])),
3051                Arc::new(Float64Array::from(vec![99.99])),
3052            ],
3053        )
3054        .unwrap();
3055
3056        // Test CPU-friendly encoding
3057        let row =
3058            JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
3059                .unwrap();
3060        assert_eq!(row.encoding(), JoinRowEncoding::CpuFriendly);
3061
3062        // Verify roundtrip
3063        let restored = row.to_batch().unwrap();
3064        assert_eq!(restored.num_rows(), 1);
3065        assert_eq!(restored.num_columns(), 3);
3066
3067        // Verify values
3068        let id_col = restored
3069            .column(0)
3070            .as_any()
3071            .downcast_ref::<StringArray>()
3072            .unwrap();
3073        assert_eq!(id_col.value(0), "test_key");
3074
3075        let value_col = restored
3076            .column(1)
3077            .as_any()
3078            .downcast_ref::<Int64Array>()
3079            .unwrap();
3080        assert_eq!(value_col.value(0), 42);
3081
3082        let price_col = restored
3083            .column(2)
3084            .as_any()
3085            .downcast_ref::<Float64Array>()
3086            .unwrap();
3087        assert!((price_col.value(0) - 99.99).abs() < 0.001);
3088    }
3089
3090    #[test]
3091    fn test_f057_compact_encoding_still_works() {
3092        let schema = Arc::new(Schema::new(vec![
3093            Field::new("id", DataType::Utf8, false),
3094            Field::new("value", DataType::Int64, false),
3095        ]));
3096        let batch = RecordBatch::try_new(
3097            schema,
3098            vec![
3099                Arc::new(StringArray::from(vec!["test"])),
3100                Arc::new(Int64Array::from(vec![100])),
3101            ],
3102        )
3103        .unwrap();
3104
3105        let row = JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::Compact)
3106            .unwrap();
3107        assert_eq!(row.encoding(), JoinRowEncoding::Compact);
3108
3109        let restored = row.to_batch().unwrap();
3110        assert_eq!(restored.num_rows(), 1);
3111    }
3112
3113    #[test]
3114    fn test_f057_side_stats_tracking() {
3115        let mut stats = SideStats::new();
3116        assert_eq!(stats.events_received, 0);
3117        assert!(!stats.is_idle(1000, 60_000)); // No events yet, not idle
3118
3119        // Record events
3120        stats.record_event(1000);
3121        assert_eq!(stats.events_received, 1);
3122        assert_eq!(stats.last_event_time, 1000);
3123
3124        stats.record_event(2000);
3125        assert_eq!(stats.events_received, 2);
3126        assert_eq!(stats.last_event_time, 2000);
3127
3128        // Check idle detection
3129        assert!(!stats.is_idle(2000, 60_000)); // Just received event
3130        assert!(!stats.is_idle(50_000, 60_000)); // Within threshold
3131
3132        // After threshold with no new events in window
3133        stats.events_this_window = 0;
3134        assert!(stats.is_idle(100_000, 60_000)); // Past threshold
3135    }
3136
3137    #[test]
3138    fn test_f057_key_metadata_tracking() {
3139        let mut meta = KeyMetadata::new(1000);
3140        assert_eq!(meta.last_activity, 1000);
3141        assert_eq!(meta.event_count, 1);
3142        assert_eq!(meta.state_entries, 1);
3143
3144        meta.record_event(2000);
3145        assert_eq!(meta.last_activity, 2000);
3146        assert_eq!(meta.event_count, 2);
3147        assert_eq!(meta.state_entries, 2);
3148
3149        meta.decrement_entries();
3150        assert_eq!(meta.state_entries, 1);
3151
3152        assert!(!meta.is_idle(2000, 60_000));
3153        assert!(meta.is_idle(100_000, 60_000));
3154    }
3155
3156    #[test]
3157    fn test_f057_per_key_tracking_in_operator() {
3158        let config = StreamJoinConfig::builder()
3159            .left_key_column("order_id")
3160            .right_key_column("order_id")
3161            .time_bound(Duration::from_secs(3600))
3162            .join_type(JoinType::Inner)
3163            .per_key_tracking(true)
3164            .build()
3165            .unwrap();
3166
3167        let mut operator = StreamJoinOperator::from_config(config);
3168        let mut timers = TimerService::new();
3169        let mut state = InMemoryStore::new();
3170        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3171
3172        // Process events with different keys
3173        for (i, key) in ["order_1", "order_2", "order_3"].iter().enumerate() {
3174            let event = create_order_event(1000 + i as i64 * 100, key, 100);
3175            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3176            operator.process_side(&event, JoinSide::Left, &mut ctx);
3177        }
3178
3179        // Verify key tracking
3180        assert_eq!(operator.tracked_key_count(), 3);
3181        assert_eq!(operator.metrics().tracked_keys, 3);
3182    }
3183
3184    #[test]
3185    fn test_f057_encoding_metrics() {
3186        // Test compact encoding
3187        let mut compact_op = StreamJoinOperator::from_config(
3188            StreamJoinConfig::builder()
3189                .left_key_column("order_id")
3190                .right_key_column("order_id")
3191                .time_bound(Duration::from_secs(3600))
3192                .join_type(JoinType::Inner)
3193                .row_encoding(JoinRowEncoding::Compact)
3194                .build()
3195                .unwrap(),
3196        );
3197
3198        let mut timers = TimerService::new();
3199        let mut state = InMemoryStore::new();
3200        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3201
3202        let event = create_order_event(1000, "order_1", 100);
3203        {
3204            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3205            compact_op.process_side(&event, JoinSide::Left, &mut ctx);
3206        }
3207        assert_eq!(compact_op.metrics().compact_encodes, 1);
3208        assert_eq!(compact_op.metrics().cpu_friendly_encodes, 0);
3209
3210        // Test CPU-friendly encoding
3211        let mut cpu_op = StreamJoinOperator::from_config(
3212            StreamJoinConfig::builder()
3213                .left_key_column("order_id")
3214                .right_key_column("order_id")
3215                .time_bound(Duration::from_secs(3600))
3216                .join_type(JoinType::Inner)
3217                .row_encoding(JoinRowEncoding::CpuFriendly)
3218                .build()
3219                .unwrap(),
3220        );
3221
3222        let mut state2 = InMemoryStore::new();
3223        {
3224            let mut ctx = create_test_context(&mut timers, &mut state2, &mut watermark_gen);
3225            cpu_op.process_side(&event, JoinSide::Left, &mut ctx);
3226        }
3227        assert_eq!(cpu_op.metrics().cpu_friendly_encodes, 1);
3228        assert_eq!(cpu_op.metrics().compact_encodes, 0);
3229    }
3230
3231    #[test]
3232    fn test_f057_asymmetric_compaction_detection() {
3233        let config = StreamJoinConfig::builder()
3234            .left_key_column("order_id")
3235            .right_key_column("order_id")
3236            .time_bound(Duration::from_secs(60))
3237            .join_type(JoinType::Inner)
3238            .asymmetric_compaction(true)
3239            .idle_threshold(Duration::from_secs(10)) // 10 seconds
3240            .build()
3241            .unwrap();
3242
3243        let mut operator = StreamJoinOperator::from_config(config);
3244        let mut timers = TimerService::new();
3245        let mut state = InMemoryStore::new();
3246        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3247
3248        // Process left events
3249        for i in 0..5 {
3250            let event = create_order_event(1000 + i * 100, "order_1", 100);
3251            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3252            ctx.processing_time = 1000 + i * 100;
3253            operator.process_side(&event, JoinSide::Left, &mut ctx);
3254        }
3255
3256        // Left side is not idle (just processed events)
3257        assert!(!operator.is_side_idle(JoinSide::Left, 1500));
3258
3259        // Right side has no events - but is_idle returns false when no events received
3260        assert!(!operator.is_side_idle(JoinSide::Right, 1500));
3261
3262        // Simulate time passing with no left events
3263        operator.left_stats.events_this_window = 0;
3264        assert!(operator.is_side_idle(JoinSide::Left, 100_000));
3265    }
3266
3267    #[test]
3268    fn test_f057_effective_build_side_selection() {
3269        // Test configured build side
3270        let config = StreamJoinConfig::builder()
3271            .left_key_column("key")
3272            .right_key_column("key")
3273            .time_bound(Duration::from_secs(60))
3274            .join_type(JoinType::Inner)
3275            .build_side(JoinSide::Right)
3276            .build()
3277            .unwrap();
3278
3279        let operator = StreamJoinOperator::from_config(config);
3280        assert_eq!(operator.effective_build_side(), JoinSide::Right);
3281
3282        // Test auto-selection (smaller side)
3283        let config2 = StreamJoinConfig::builder()
3284            .left_key_column("key")
3285            .right_key_column("key")
3286            .time_bound(Duration::from_secs(60))
3287            .join_type(JoinType::Inner)
3288            .build()
3289            .unwrap();
3290
3291        let mut operator2 = StreamJoinOperator::from_config(config2);
3292        operator2.left_stats.events_received = 100;
3293        operator2.right_stats.events_received = 1000;
3294
3295        // Left is smaller, so should be build side
3296        assert_eq!(operator2.effective_build_side(), JoinSide::Left);
3297    }
3298
3299    #[test]
3300    fn test_f057_join_with_cpu_friendly_encoding() {
3301        let config = StreamJoinConfig::builder()
3302            .left_key_column("order_id")
3303            .right_key_column("order_id")
3304            .time_bound(Duration::from_secs(3600))
3305            .join_type(JoinType::Inner)
3306            .row_encoding(JoinRowEncoding::CpuFriendly)
3307            .build()
3308            .unwrap();
3309
3310        let mut operator = StreamJoinOperator::from_config(config);
3311        let mut timers = TimerService::new();
3312        let mut state = InMemoryStore::new();
3313        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3314
3315        // Process left event
3316        let order = create_order_event(1000, "order_1", 100);
3317        {
3318            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3319            operator.process_side(&order, JoinSide::Left, &mut ctx);
3320        }
3321
3322        // Process right event - should produce a match
3323        let payment = create_payment_event(2000, "order_1", "paid");
3324        {
3325            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3326            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
3327            assert_eq!(
3328                outputs
3329                    .iter()
3330                    .filter(|o| matches!(o, Output::Event(_)))
3331                    .count(),
3332                1
3333            );
3334        }
3335
3336        assert_eq!(operator.metrics().matches, 1);
3337        assert_eq!(operator.metrics().cpu_friendly_encodes, 2); // Both events encoded
3338    }
3339
3340    #[test]
3341    fn test_f057_checkpoint_restore_with_optimization_state() {
3342        let config = StreamJoinConfig::builder()
3343            .left_key_column("key")
3344            .right_key_column("key")
3345            .time_bound(Duration::from_secs(60))
3346            .join_type(JoinType::Inner)
3347            .operator_id("test_join")
3348            .build()
3349            .unwrap();
3350
3351        let mut operator = StreamJoinOperator::from_config(config);
3352
3353        // Simulate activity
3354        operator.metrics.left_events = 100;
3355        operator.metrics.right_events = 50;
3356        operator.metrics.matches = 25;
3357        operator.metrics.cpu_friendly_encodes = 10;
3358        operator.metrics.compact_encodes = 140;
3359        operator.metrics.asymmetric_skips = 5;
3360        operator.metrics.idle_key_cleanups = 3;
3361        operator.metrics.build_side_prunes = 2;
3362        operator.left_stats.events_received = 100;
3363        operator.right_stats.events_received = 50;
3364        operator.left_watermark = 5000;
3365        operator.right_watermark = 4000;
3366
3367        // Checkpoint
3368        let checkpoint = operator.checkpoint();
3369
3370        // Restore
3371        let config2 = StreamJoinConfig::builder()
3372            .left_key_column("key")
3373            .right_key_column("key")
3374            .time_bound(Duration::from_secs(60))
3375            .join_type(JoinType::Inner)
3376            .operator_id("test_join")
3377            .build()
3378            .unwrap();
3379
3380        let mut restored = StreamJoinOperator::from_config(config2);
3381        restored.restore(checkpoint).unwrap();
3382
3383        // Verify optimization state was restored
3384        assert_eq!(restored.metrics().left_events, 100);
3385        assert_eq!(restored.metrics().right_events, 50);
3386        assert_eq!(restored.metrics().matches, 25);
3387        assert_eq!(restored.metrics().cpu_friendly_encodes, 10);
3388        assert_eq!(restored.metrics().compact_encodes, 140);
3389        assert_eq!(restored.metrics().asymmetric_skips, 5);
3390        assert_eq!(restored.metrics().idle_key_cleanups, 3);
3391        assert_eq!(restored.metrics().build_side_prunes, 2);
3392        assert_eq!(restored.left_stats.events_received, 100);
3393        assert_eq!(restored.right_stats.events_received, 50);
3394        assert_eq!(restored.left_watermark, 5000);
3395        assert_eq!(restored.right_watermark, 4000);
3396    }
3397
3398    #[test]
3399    fn test_f057_should_skip_compaction() {
3400        let config = StreamJoinConfig::builder()
3401            .left_key_column("key")
3402            .right_key_column("key")
3403            .time_bound(Duration::from_secs(60))
3404            .join_type(JoinType::Inner)
3405            .asymmetric_compaction(true)
3406            .idle_threshold(Duration::from_secs(10))
3407            .build()
3408            .unwrap();
3409
3410        let mut operator = StreamJoinOperator::from_config(config);
3411
3412        // Record some left events
3413        operator.left_stats.record_event(1000);
3414        operator.left_stats.events_this_window = 0; // Simulate window rollover
3415
3416        // Should skip compaction for idle left side
3417        assert!(operator.should_skip_compaction(JoinSide::Left, 100_000));
3418
3419        // Should not skip when asymmetric compaction is disabled
3420        operator.asymmetric_compaction = false;
3421        assert!(!operator.should_skip_compaction(JoinSide::Left, 100_000));
3422    }
3423
3424    #[test]
3425    fn test_f057_multiple_rows_cpu_friendly() {
3426        // Test with multiple values in arrays
3427        let schema = Arc::new(Schema::new(vec![
3428            Field::new("id", DataType::Int64, false),
3429            Field::new("name", DataType::Utf8, false),
3430        ]));
3431
3432        // Single row (typical for streaming)
3433        let batch = RecordBatch::try_new(
3434            schema.clone(),
3435            vec![
3436                Arc::new(Int64Array::from(vec![1])),
3437                Arc::new(StringArray::from(vec!["Alice"])),
3438            ],
3439        )
3440        .unwrap();
3441
3442        let row =
3443            JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
3444                .unwrap();
3445        let restored = row.to_batch().unwrap();
3446
3447        let id_col = restored
3448            .column(0)
3449            .as_any()
3450            .downcast_ref::<Int64Array>()
3451            .unwrap();
3452        let name_col = restored
3453            .column(1)
3454            .as_any()
3455            .downcast_ref::<StringArray>()
3456            .unwrap();
3457
3458        assert_eq!(id_col.value(0), 1);
3459        assert_eq!(name_col.value(0), "Alice");
3460    }
3461}