Skip to main content

laminar_sql/datafusion/
lookup_join_exec.rs

1//! Physical execution plan for lookup joins.
2//!
3//! Bridges `LookupJoinNode` (logical) to a hash-probe executor that
4//! joins streaming input against a pre-indexed lookup table snapshot.
5//!
6//! ## Data flow
7//!
8//! ```text
9//! Stream input ──► LookupJoinExec ──► Output (stream + lookup columns)
10//!                       │
11//!                  HashIndex probe
12//!                       │
13//!                  LookupSnapshot (pre-indexed RecordBatch)
14//! ```
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Debug, Formatter};
19use std::sync::Arc;
20
21use parking_lot::RwLock;
22
23use std::collections::BTreeMap;
24
25use arrow::compute::take;
26use arrow::row::{RowConverter, SortField};
27use arrow_array::{RecordBatch, UInt32Array};
28use arrow_schema::{Schema, SchemaRef};
29use async_trait::async_trait;
30use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext};
31use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
32use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
35use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
36use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
37use datafusion_common::{DataFusionError, Result};
38use datafusion_expr::Expr;
39use futures::StreamExt;
40use laminar_core::lookup::foyer_cache::FoyerMemoryCache;
41use laminar_core::lookup::source::LookupSourceDyn;
42use tokio::sync::Semaphore;
43
44use super::lookup_join::{LookupJoinNode, LookupJoinType};
45
46// ── Registry ─────────────────────────────────────────────────────
47
48/// Thread-safe registry of lookup table entries (snapshot or partial).
49///
50/// The db layer populates this when `CREATE LOOKUP TABLE` executes;
51/// the [`LookupJoinExtensionPlanner`] reads it at physical plan time.
52#[derive(Default)]
53pub struct LookupTableRegistry {
54    tables: RwLock<HashMap<String, RegisteredLookup>>,
55}
56
57/// A registered lookup table entry — snapshot, partial (on-demand), or
58/// versioned (temporal join with version history).
59pub enum RegisteredLookup {
60    /// Full snapshot: all rows pre-loaded in a single batch.
61    Snapshot(Arc<LookupSnapshot>),
62    /// Partial (on-demand): bounded foyer cache with S3-FIFO eviction.
63    Partial(Arc<PartialLookupState>),
64    /// Versioned: all versions of all keys for temporal joins.
65    Versioned(Arc<VersionedLookupState>),
66}
67
68/// Point-in-time snapshot of a lookup table for join execution.
69pub struct LookupSnapshot {
70    /// All rows concatenated into a single batch.
71    pub batch: RecordBatch,
72    /// Primary key column names used to build the hash index.
73    pub key_columns: Vec<String>,
74}
75
76/// State for a versioned (temporal) lookup table.
77///
78/// Holds all versions of all keys in a single `RecordBatch`, plus a
79/// pre-built `VersionedIndex` for efficient point-in-time lookups.
80/// The index is built once at registration time and rebuilt only when
81/// the table is updated via CDC.
82pub struct VersionedLookupState {
83    /// All rows (all versions) concatenated into a single batch.
84    pub batch: RecordBatch,
85    /// Pre-built versioned index (built at registration time, not per-cycle).
86    pub index: Arc<VersionedIndex>,
87    /// Primary key column names for the equi-join.
88    pub key_columns: Vec<String>,
89    /// Column containing the version timestamp in the table.
90    pub version_column: String,
91    /// Stream-side column name for event time (the AS OF column).
92    pub stream_time_column: String,
93    /// Maximum versions to retain per key. `usize::MAX` = unbounded.
94    pub max_versions_per_key: usize,
95}
96
97/// State for a partial (on-demand) lookup table.
98pub struct PartialLookupState {
99    /// Bounded foyer memory cache with S3-FIFO eviction.
100    pub foyer_cache: Arc<FoyerMemoryCache>,
101    /// Schema of the lookup table.
102    pub schema: SchemaRef,
103    /// Key column names for row encoding.
104    pub key_columns: Vec<String>,
105    /// `SortField` descriptors for key encoding via `RowConverter`.
106    pub key_sort_fields: Vec<SortField>,
107    /// Async source for cache miss fallback (None = cache-only mode).
108    pub source: Option<Arc<dyn LookupSourceDyn>>,
109    /// Limits concurrent source queries to avoid overloading the source.
110    pub fetch_semaphore: Arc<Semaphore>,
111}
112
113impl LookupTableRegistry {
114    /// Creates an empty registry.
115    #[must_use]
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Registers or replaces a lookup table snapshot.
121    ///
122    /// # Panics
123    ///
124    /// Panics if the internal lock is poisoned.
125    pub fn register(&self, name: &str, snapshot: LookupSnapshot) {
126        self.tables.write().insert(
127            name.to_lowercase(),
128            RegisteredLookup::Snapshot(Arc::new(snapshot)),
129        );
130    }
131
132    /// Registers or replaces a partial (on-demand) lookup table.
133    ///
134    /// # Panics
135    ///
136    /// Panics if the internal lock is poisoned.
137    pub fn register_partial(&self, name: &str, state: PartialLookupState) {
138        self.tables.write().insert(
139            name.to_lowercase(),
140            RegisteredLookup::Partial(Arc::new(state)),
141        );
142    }
143
144    /// Registers or replaces a versioned (temporal) lookup table.
145    ///
146    /// # Panics
147    ///
148    /// Panics if the internal lock is poisoned.
149    pub fn register_versioned(&self, name: &str, state: VersionedLookupState) {
150        self.tables.write().insert(
151            name.to_lowercase(),
152            RegisteredLookup::Versioned(Arc::new(state)),
153        );
154    }
155
156    /// Removes a lookup table from the registry.
157    ///
158    /// # Panics
159    ///
160    /// Panics if the internal lock is poisoned.
161    pub fn unregister(&self, name: &str) {
162        self.tables.write().remove(&name.to_lowercase());
163    }
164
165    /// Returns the current snapshot for a table, if registered as a snapshot.
166    ///
167    /// # Panics
168    ///
169    /// Panics if the internal lock is poisoned.
170    #[must_use]
171    pub fn get(&self, name: &str) -> Option<Arc<LookupSnapshot>> {
172        let tables = self.tables.read();
173        match tables.get(&name.to_lowercase())? {
174            RegisteredLookup::Snapshot(s) => Some(Arc::clone(s)),
175            RegisteredLookup::Partial(_) | RegisteredLookup::Versioned(_) => None,
176        }
177    }
178
179    /// Returns the registered lookup entry (snapshot, partial, or versioned).
180    ///
181    /// # Panics
182    ///
183    /// Panics if the internal lock is poisoned.
184    pub fn get_entry(&self, name: &str) -> Option<RegisteredLookup> {
185        let tables = self.tables.read();
186        tables.get(&name.to_lowercase()).map(|e| match e {
187            RegisteredLookup::Snapshot(s) => RegisteredLookup::Snapshot(Arc::clone(s)),
188            RegisteredLookup::Partial(p) => RegisteredLookup::Partial(Arc::clone(p)),
189            RegisteredLookup::Versioned(v) => RegisteredLookup::Versioned(Arc::clone(v)),
190        })
191    }
192}
193
194// ── Hash Index ───────────────────────────────────────────────────
195
196/// Pre-built hash index mapping encoded key bytes to row indices.
197struct HashIndex {
198    map: HashMap<Box<[u8]>, Vec<u32>>,
199}
200
201impl HashIndex {
202    /// Builds an index over `key_indices` columns in `batch`.
203    ///
204    /// Uses Arrow's `RowConverter` for binary-comparable key encoding
205    /// so any Arrow data type is handled without manual serialization.
206    fn build(batch: &RecordBatch, key_indices: &[usize]) -> Result<Self> {
207        if batch.num_rows() == 0 {
208            return Ok(Self {
209                map: HashMap::new(),
210            });
211        }
212
213        let sort_fields: Vec<SortField> = key_indices
214            .iter()
215            .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
216            .collect();
217        let converter = RowConverter::new(sort_fields)?;
218
219        let key_cols: Vec<_> = key_indices
220            .iter()
221            .map(|&i| batch.column(i).clone())
222            .collect();
223        let rows = converter.convert_columns(&key_cols)?;
224
225        let num_rows = batch.num_rows();
226        let mut map: HashMap<Box<[u8]>, Vec<u32>> = HashMap::with_capacity(num_rows);
227        #[allow(clippy::cast_possible_truncation)] // batch row count fits u32
228        for i in 0..num_rows {
229            map.entry(Box::from(rows.row(i).as_ref()))
230                .or_default()
231                .push(i as u32);
232        }
233
234        Ok(Self { map })
235    }
236
237    fn probe(&self, key: &[u8]) -> Option<&[u32]> {
238        self.map.get(key).map(Vec::as_slice)
239    }
240}
241
242// ── Versioned Index ──────────────────────────────────────────────
243
244/// Pre-built versioned index mapping encoded key bytes to a BTreeMap
245/// of version timestamps to row indices. Supports point-in-time lookups
246/// via `probe_at_time` for temporal joins.
247#[derive(Default)]
248pub struct VersionedIndex {
249    map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>>,
250}
251
252impl VersionedIndex {
253    /// Builds a versioned index over `key_indices` and `version_col_idx`
254    /// columns in `batch`.
255    ///
256    /// Uses Arrow's `RowConverter` for binary-comparable key encoding.
257    /// Null keys and null version timestamps are skipped.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if key encoding or timestamp extraction fails.
262    pub fn build(
263        batch: &RecordBatch,
264        key_indices: &[usize],
265        version_col_idx: usize,
266        max_versions_per_key: usize,
267    ) -> Result<Self> {
268        if batch.num_rows() == 0 {
269            return Ok(Self {
270                map: HashMap::new(),
271            });
272        }
273
274        let sort_fields: Vec<SortField> = key_indices
275            .iter()
276            .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
277            .collect();
278        let converter = RowConverter::new(sort_fields)?;
279
280        let key_cols: Vec<_> = key_indices
281            .iter()
282            .map(|&i| batch.column(i).clone())
283            .collect();
284        let rows = converter.convert_columns(&key_cols)?;
285
286        let timestamps = extract_i64_timestamps(batch.column(version_col_idx))?;
287
288        let num_rows = batch.num_rows();
289        let mut map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>> = HashMap::with_capacity(num_rows);
290        #[allow(clippy::cast_possible_truncation)]
291        for (i, ts_opt) in timestamps.iter().enumerate() {
292            // Skip rows with null keys or null version timestamps.
293            let Some(version_ts) = ts_opt else { continue };
294            if key_cols.iter().any(|c| c.is_null(i)) {
295                continue;
296            }
297            let key = Box::from(rows.row(i).as_ref());
298            map.entry(key)
299                .or_default()
300                .entry(*version_ts)
301                .or_default()
302                .push(i as u32);
303        }
304
305        // GC: keep only the N most recent versions per key.
306        if max_versions_per_key < usize::MAX {
307            for versions in map.values_mut() {
308                while versions.len() > max_versions_per_key {
309                    versions.pop_first();
310                }
311            }
312        }
313
314        Ok(Self { map })
315    }
316
317    /// Finds the row index for the latest version `<= event_ts` for the
318    /// given key. Returns the last row index at that version.
319    fn probe_at_time(&self, key: &[u8], event_ts: i64) -> Option<u32> {
320        let versions = self.map.get(key)?;
321        let (_, indices) = versions.range(..=event_ts).next_back()?;
322        indices.last().copied()
323    }
324}
325
326/// Extracts `Option<i64>` timestamp values from an Arrow array column.
327///
328/// Returns `None` for null entries (callers must handle nulls explicitly).
329/// Supports `Int64`, all `Timestamp` variants (scaled to milliseconds),
330/// and `Float64` (truncated to `i64`).
331fn extract_i64_timestamps(col: &dyn arrow_array::Array) -> Result<Vec<Option<i64>>> {
332    use arrow_array::{
333        Float64Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
334        TimestampNanosecondArray, TimestampSecondArray,
335    };
336    use arrow_schema::{DataType, TimeUnit};
337
338    let n = col.len();
339    let mut out = Vec::with_capacity(n);
340    macro_rules! extract_typed {
341        ($arr_type:ty, $scale:expr) => {{
342            let arr = col.as_any().downcast_ref::<$arr_type>().ok_or_else(|| {
343                DataFusionError::Internal(concat!("expected ", stringify!($arr_type)).into())
344            })?;
345            for i in 0..n {
346                out.push(if col.is_null(i) {
347                    None
348                } else {
349                    Some(arr.value(i) * $scale)
350                });
351            }
352        }};
353    }
354
355    match col.data_type() {
356        DataType::Int64 => extract_typed!(Int64Array, 1),
357        DataType::Timestamp(TimeUnit::Millisecond, _) => {
358            extract_typed!(TimestampMillisecondArray, 1);
359        }
360        DataType::Timestamp(TimeUnit::Microsecond, _) => {
361            let arr = col
362                .as_any()
363                .downcast_ref::<TimestampMicrosecondArray>()
364                .ok_or_else(|| {
365                    DataFusionError::Internal("expected TimestampMicrosecondArray".into())
366                })?;
367            for i in 0..n {
368                out.push(if col.is_null(i) {
369                    None
370                } else {
371                    Some(arr.value(i) / 1000)
372                });
373            }
374        }
375        DataType::Timestamp(TimeUnit::Second, _) => {
376            extract_typed!(TimestampSecondArray, 1000);
377        }
378        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
379            let arr = col
380                .as_any()
381                .downcast_ref::<TimestampNanosecondArray>()
382                .ok_or_else(|| {
383                    DataFusionError::Internal("expected TimestampNanosecondArray".into())
384                })?;
385            for i in 0..n {
386                out.push(if col.is_null(i) {
387                    None
388                } else {
389                    Some(arr.value(i) / 1_000_000)
390                });
391            }
392        }
393        DataType::Float64 => {
394            let arr = col
395                .as_any()
396                .downcast_ref::<Float64Array>()
397                .ok_or_else(|| DataFusionError::Internal("expected Float64Array".into()))?;
398            #[allow(clippy::cast_possible_truncation)]
399            for i in 0..n {
400                out.push(if col.is_null(i) {
401                    None
402                } else {
403                    Some(arr.value(i) as i64)
404                });
405            }
406        }
407        other => {
408            return Err(DataFusionError::Plan(format!(
409                "unsupported timestamp type for temporal join: {other:?}"
410            )));
411        }
412    }
413
414    Ok(out)
415}
416
417// ── Physical Execution Plan ──────────────────────────────────────
418
419/// Physical plan that hash-probes a pre-indexed lookup table for
420/// each batch from the streaming input.
421pub struct LookupJoinExec {
422    input: Arc<dyn ExecutionPlan>,
423    index: Arc<HashIndex>,
424    lookup_batch: Arc<RecordBatch>,
425    stream_key_indices: Vec<usize>,
426    join_type: LookupJoinType,
427    schema: SchemaRef,
428    properties: PlanProperties,
429    /// Prebuilt `RowConverter` for encoding probe keys. Shared across
430    /// every `execute()` call so we don't rebuild per-type encoders on
431    /// every cycle of a cached physical plan.
432    converter: Arc<RowConverter>,
433    stream_field_count: usize,
434}
435
436impl LookupJoinExec {
437    /// Creates a new lookup join executor.
438    ///
439    /// `stream_key_indices` and `lookup_key_indices` must be the same
440    /// length and correspond pairwise (stream key 0 matches lookup key 0).
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if the hash index cannot be built (e.g., unsupported key type).
445    #[allow(clippy::needless_pass_by_value)] // lookup_batch is moved into Arc
446    pub fn try_new(
447        input: Arc<dyn ExecutionPlan>,
448        lookup_batch: RecordBatch,
449        stream_key_indices: Vec<usize>,
450        lookup_key_indices: Vec<usize>,
451        join_type: LookupJoinType,
452        output_schema: SchemaRef,
453    ) -> Result<Self> {
454        let index = HashIndex::build(&lookup_batch, &lookup_key_indices)?;
455
456        let key_sort_fields: Vec<SortField> = lookup_key_indices
457            .iter()
458            .map(|&i| SortField::new(lookup_batch.schema().field(i).data_type().clone()))
459            .collect();
460        let converter = Arc::new(RowConverter::new(key_sort_fields)?);
461
462        // Left outer joins produce NULLs for non-matching lookup rows,
463        // so force all lookup columns nullable in the output schema.
464        let output_schema = if join_type == LookupJoinType::LeftOuter {
465            let stream_count = input.schema().fields().len();
466            let mut fields = output_schema.fields().to_vec();
467            for f in &mut fields[stream_count..] {
468                if !f.is_nullable() {
469                    *f = Arc::new(f.as_ref().clone().with_nullable(true));
470                }
471            }
472            Arc::new(Schema::new_with_metadata(
473                fields,
474                output_schema.metadata().clone(),
475            ))
476        } else {
477            output_schema
478        };
479
480        let properties = PlanProperties::new(
481            EquivalenceProperties::new(Arc::clone(&output_schema)),
482            Partitioning::UnknownPartitioning(1),
483            EmissionType::Incremental,
484            Boundedness::Unbounded {
485                requires_infinite_memory: false,
486            },
487        );
488
489        let stream_field_count = input.schema().fields().len();
490
491        Ok(Self {
492            input,
493            index: Arc::new(index),
494            lookup_batch: Arc::new(lookup_batch),
495            stream_key_indices,
496            join_type,
497            schema: output_schema,
498            properties,
499            converter,
500            stream_field_count,
501        })
502    }
503}
504
505impl Debug for LookupJoinExec {
506    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
507        f.debug_struct("LookupJoinExec")
508            .field("join_type", &self.join_type)
509            .field("stream_keys", &self.stream_key_indices)
510            .field("lookup_rows", &self.lookup_batch.num_rows())
511            .finish_non_exhaustive()
512    }
513}
514
515impl DisplayAs for LookupJoinExec {
516    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
517        match t {
518            DisplayFormatType::Default | DisplayFormatType::Verbose => {
519                write!(
520                    f,
521                    "LookupJoinExec: type={}, stream_keys={:?}, lookup_rows={}",
522                    self.join_type,
523                    self.stream_key_indices,
524                    self.lookup_batch.num_rows(),
525                )
526            }
527            DisplayFormatType::TreeRender => write!(f, "LookupJoinExec"),
528        }
529    }
530}
531
532impl ExecutionPlan for LookupJoinExec {
533    fn name(&self) -> &'static str {
534        "LookupJoinExec"
535    }
536
537    fn as_any(&self) -> &dyn Any {
538        self
539    }
540
541    fn schema(&self) -> SchemaRef {
542        Arc::clone(&self.schema)
543    }
544
545    fn properties(&self) -> &PlanProperties {
546        &self.properties
547    }
548
549    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
550        vec![&self.input]
551    }
552
553    fn with_new_children(
554        self: Arc<Self>,
555        mut children: Vec<Arc<dyn ExecutionPlan>>,
556    ) -> Result<Arc<dyn ExecutionPlan>> {
557        if children.len() != 1 {
558            return Err(DataFusionError::Plan(
559                "LookupJoinExec requires exactly one child".into(),
560            ));
561        }
562        Ok(Arc::new(Self {
563            input: children.swap_remove(0),
564            index: Arc::clone(&self.index),
565            lookup_batch: Arc::clone(&self.lookup_batch),
566            stream_key_indices: self.stream_key_indices.clone(),
567            join_type: self.join_type,
568            schema: Arc::clone(&self.schema),
569            properties: self.properties.clone(),
570            converter: Arc::clone(&self.converter),
571            stream_field_count: self.stream_field_count,
572        }))
573    }
574
575    fn execute(
576        &self,
577        partition: usize,
578        context: Arc<TaskContext>,
579    ) -> Result<SendableRecordBatchStream> {
580        let input_stream = self.input.execute(partition, context)?;
581        let converter = Arc::clone(&self.converter);
582        let index = Arc::clone(&self.index);
583        let lookup_batch = Arc::clone(&self.lookup_batch);
584        let stream_key_indices = self.stream_key_indices.clone();
585        let join_type = self.join_type;
586        let schema = self.schema();
587        let stream_field_count = self.stream_field_count;
588
589        let output = input_stream.map(move |result| {
590            let batch = result?;
591            if batch.num_rows() == 0 {
592                return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
593            }
594            probe_batch(
595                &batch,
596                &converter,
597                &index,
598                &lookup_batch,
599                &stream_key_indices,
600                join_type,
601                &schema,
602                stream_field_count,
603            )
604        });
605
606        Ok(Box::pin(RecordBatchStreamAdapter::new(
607            self.schema(),
608            output,
609        )))
610    }
611}
612
613impl datafusion::physical_plan::ExecutionPlanProperties for LookupJoinExec {
614    fn output_partitioning(&self) -> &Partitioning {
615        self.properties.output_partitioning()
616    }
617
618    fn output_ordering(&self) -> Option<&LexOrdering> {
619        self.properties.output_ordering()
620    }
621
622    fn boundedness(&self) -> Boundedness {
623        Boundedness::Unbounded {
624            requires_infinite_memory: false,
625        }
626    }
627
628    fn pipeline_behavior(&self) -> EmissionType {
629        EmissionType::Incremental
630    }
631
632    fn equivalence_properties(&self) -> &EquivalenceProperties {
633        self.properties.equivalence_properties()
634    }
635}
636
637// ── Probe Logic ──────────────────────────────────────────────────
638
639/// Probes the hash index for each row in `stream_batch` and builds
640/// the joined output batch.
641#[allow(clippy::too_many_arguments)]
642fn probe_batch(
643    stream_batch: &RecordBatch,
644    converter: &RowConverter,
645    index: &HashIndex,
646    lookup_batch: &RecordBatch,
647    stream_key_indices: &[usize],
648    join_type: LookupJoinType,
649    output_schema: &SchemaRef,
650    stream_field_count: usize,
651) -> Result<RecordBatch> {
652    let key_cols: Vec<_> = stream_key_indices
653        .iter()
654        .map(|&i| stream_batch.column(i).clone())
655        .collect();
656    let rows = converter.convert_columns(&key_cols)?;
657
658    let num_rows = stream_batch.num_rows();
659    let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
660    let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
661
662    #[allow(clippy::cast_possible_truncation)] // batch row count fits u32
663    for row in 0..num_rows {
664        // SQL semantics: NULL != NULL, so rows with any null key never match.
665        if key_cols.iter().any(|c| c.is_null(row)) {
666            if join_type == LookupJoinType::LeftOuter {
667                stream_indices.push(row as u32);
668                lookup_indices.push(None);
669            }
670            continue;
671        }
672
673        let key = rows.row(row);
674        match index.probe(key.as_ref()) {
675            Some(matches) => {
676                for &lookup_row in matches {
677                    stream_indices.push(row as u32);
678                    lookup_indices.push(Some(lookup_row));
679                }
680            }
681            None if join_type == LookupJoinType::LeftOuter => {
682                stream_indices.push(row as u32);
683                lookup_indices.push(None);
684            }
685            None => {}
686        }
687    }
688
689    if stream_indices.is_empty() {
690        return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
691    }
692
693    // Gather stream-side columns
694    let take_stream = UInt32Array::from(stream_indices);
695    let mut columns = Vec::with_capacity(output_schema.fields().len());
696
697    for col in stream_batch.columns() {
698        columns.push(take(col.as_ref(), &take_stream, None)?);
699    }
700
701    // Gather lookup-side columns (None → null in output)
702    let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
703    for col in lookup_batch.columns() {
704        columns.push(take(col.as_ref(), &take_lookup, None)?);
705    }
706
707    debug_assert_eq!(
708        columns.len(),
709        stream_field_count + lookup_batch.num_columns(),
710        "output column count mismatch"
711    );
712
713    Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
714}
715
716// ── Versioned Lookup Join Exec ────────────────────────────────────
717
718/// Physical plan that probes a versioned (temporal) index for each
719/// batch from the streaming input. For each stream row, finds the
720/// table row with the latest version timestamp `<= event_ts`.
721pub struct VersionedLookupJoinExec {
722    input: Arc<dyn ExecutionPlan>,
723    index: Arc<VersionedIndex>,
724    table_batch: Arc<RecordBatch>,
725    stream_key_indices: Vec<usize>,
726    stream_time_col_idx: usize,
727    join_type: LookupJoinType,
728    schema: SchemaRef,
729    properties: PlanProperties,
730    /// Prebuilt `RowConverter` — built once at planning time and reused
731    /// across every `execute()` call. Previously rebuilt per cycle.
732    converter: Arc<RowConverter>,
733    stream_field_count: usize,
734}
735
736impl VersionedLookupJoinExec {
737    /// Creates a new versioned lookup join executor.
738    ///
739    /// The `index` should be pre-built via `VersionedIndex::build()` and
740    /// cached in `VersionedLookupState`. The index is only rebuilt when
741    /// the table data changes (CDC update), not per execution cycle.
742    ///
743    /// # Errors
744    ///
745    /// Returns an error if the output schema cannot be constructed.
746    #[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
747    pub fn try_new(
748        input: Arc<dyn ExecutionPlan>,
749        table_batch: RecordBatch,
750        index: Arc<VersionedIndex>,
751        stream_key_indices: Vec<usize>,
752        stream_time_col_idx: usize,
753        join_type: LookupJoinType,
754        output_schema: SchemaRef,
755        key_sort_fields: Vec<SortField>,
756    ) -> Result<Self> {
757        let output_schema = if join_type == LookupJoinType::LeftOuter {
758            let stream_count = input.schema().fields().len();
759            let mut fields = output_schema.fields().to_vec();
760            for f in &mut fields[stream_count..] {
761                if !f.is_nullable() {
762                    *f = Arc::new(f.as_ref().clone().with_nullable(true));
763                }
764            }
765            Arc::new(Schema::new_with_metadata(
766                fields,
767                output_schema.metadata().clone(),
768            ))
769        } else {
770            output_schema
771        };
772
773        let properties = PlanProperties::new(
774            EquivalenceProperties::new(Arc::clone(&output_schema)),
775            Partitioning::UnknownPartitioning(1),
776            EmissionType::Incremental,
777            Boundedness::Unbounded {
778                requires_infinite_memory: false,
779            },
780        );
781
782        let stream_field_count = input.schema().fields().len();
783        let converter = Arc::new(RowConverter::new(key_sort_fields)?);
784
785        Ok(Self {
786            input,
787            index,
788            table_batch: Arc::new(table_batch),
789            stream_key_indices,
790            stream_time_col_idx,
791            join_type,
792            schema: output_schema,
793            properties,
794            converter,
795            stream_field_count,
796        })
797    }
798}
799
800impl Debug for VersionedLookupJoinExec {
801    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
802        f.debug_struct("VersionedLookupJoinExec")
803            .field("join_type", &self.join_type)
804            .field("stream_keys", &self.stream_key_indices)
805            .field("table_rows", &self.table_batch.num_rows())
806            .finish_non_exhaustive()
807    }
808}
809
810impl DisplayAs for VersionedLookupJoinExec {
811    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
812        match t {
813            DisplayFormatType::Default | DisplayFormatType::Verbose => {
814                write!(
815                    f,
816                    "VersionedLookupJoinExec: type={}, stream_keys={:?}, table_rows={}",
817                    self.join_type,
818                    self.stream_key_indices,
819                    self.table_batch.num_rows(),
820                )
821            }
822            DisplayFormatType::TreeRender => write!(f, "VersionedLookupJoinExec"),
823        }
824    }
825}
826
827impl ExecutionPlan for VersionedLookupJoinExec {
828    fn name(&self) -> &'static str {
829        "VersionedLookupJoinExec"
830    }
831
832    fn as_any(&self) -> &dyn Any {
833        self
834    }
835
836    fn schema(&self) -> SchemaRef {
837        Arc::clone(&self.schema)
838    }
839
840    fn properties(&self) -> &PlanProperties {
841        &self.properties
842    }
843
844    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
845        vec![&self.input]
846    }
847
848    fn with_new_children(
849        self: Arc<Self>,
850        mut children: Vec<Arc<dyn ExecutionPlan>>,
851    ) -> Result<Arc<dyn ExecutionPlan>> {
852        if children.len() != 1 {
853            return Err(DataFusionError::Plan(
854                "VersionedLookupJoinExec requires exactly one child".into(),
855            ));
856        }
857        Ok(Arc::new(Self {
858            input: children.swap_remove(0),
859            index: Arc::clone(&self.index),
860            table_batch: Arc::clone(&self.table_batch),
861            stream_key_indices: self.stream_key_indices.clone(),
862            stream_time_col_idx: self.stream_time_col_idx,
863            join_type: self.join_type,
864            schema: Arc::clone(&self.schema),
865            properties: self.properties.clone(),
866            converter: Arc::clone(&self.converter),
867            stream_field_count: self.stream_field_count,
868        }))
869    }
870
871    fn execute(
872        &self,
873        partition: usize,
874        context: Arc<TaskContext>,
875    ) -> Result<SendableRecordBatchStream> {
876        let input_stream = self.input.execute(partition, context)?;
877        let converter = Arc::clone(&self.converter);
878        let index = Arc::clone(&self.index);
879        let table_batch = Arc::clone(&self.table_batch);
880        let stream_key_indices = self.stream_key_indices.clone();
881        let stream_time_col_idx = self.stream_time_col_idx;
882        let join_type = self.join_type;
883        let schema = self.schema();
884        let stream_field_count = self.stream_field_count;
885
886        let output = input_stream.map(move |result| {
887            let batch = result?;
888            if batch.num_rows() == 0 {
889                return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
890            }
891            probe_versioned_batch(
892                &batch,
893                &converter,
894                &index,
895                &table_batch,
896                &stream_key_indices,
897                stream_time_col_idx,
898                join_type,
899                &schema,
900                stream_field_count,
901            )
902        });
903
904        Ok(Box::pin(RecordBatchStreamAdapter::new(
905            self.schema(),
906            output,
907        )))
908    }
909}
910
911impl datafusion::physical_plan::ExecutionPlanProperties for VersionedLookupJoinExec {
912    fn output_partitioning(&self) -> &Partitioning {
913        self.properties.output_partitioning()
914    }
915
916    fn output_ordering(&self) -> Option<&LexOrdering> {
917        self.properties.output_ordering()
918    }
919
920    fn boundedness(&self) -> Boundedness {
921        Boundedness::Unbounded {
922            requires_infinite_memory: false,
923        }
924    }
925
926    fn pipeline_behavior(&self) -> EmissionType {
927        EmissionType::Incremental
928    }
929
930    fn equivalence_properties(&self) -> &EquivalenceProperties {
931        self.properties.equivalence_properties()
932    }
933}
934
935/// Probes the versioned index for each row in `stream_batch`, finding
936/// the table row with the latest version `<= event_ts`.
937#[allow(clippy::too_many_arguments)]
938fn probe_versioned_batch(
939    stream_batch: &RecordBatch,
940    converter: &RowConverter,
941    index: &VersionedIndex,
942    table_batch: &RecordBatch,
943    stream_key_indices: &[usize],
944    stream_time_col_idx: usize,
945    join_type: LookupJoinType,
946    output_schema: &SchemaRef,
947    stream_field_count: usize,
948) -> Result<RecordBatch> {
949    let key_cols: Vec<_> = stream_key_indices
950        .iter()
951        .map(|&i| stream_batch.column(i).clone())
952        .collect();
953    let rows = converter.convert_columns(&key_cols)?;
954    let event_timestamps =
955        extract_i64_timestamps(stream_batch.column(stream_time_col_idx).as_ref())?;
956
957    let num_rows = stream_batch.num_rows();
958    let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
959    let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
960
961    #[allow(clippy::cast_possible_truncation)]
962    for (row, event_ts_opt) in event_timestamps.iter().enumerate() {
963        // Null keys or null event timestamps cannot match.
964        if key_cols.iter().any(|c| c.is_null(row)) || event_ts_opt.is_none() {
965            if join_type == LookupJoinType::LeftOuter {
966                stream_indices.push(row as u32);
967                lookup_indices.push(None);
968            }
969            continue;
970        }
971
972        let key = rows.row(row);
973        let event_ts = event_ts_opt.unwrap();
974        match index.probe_at_time(key.as_ref(), event_ts) {
975            Some(table_row_idx) => {
976                stream_indices.push(row as u32);
977                lookup_indices.push(Some(table_row_idx));
978            }
979            None if join_type == LookupJoinType::LeftOuter => {
980                stream_indices.push(row as u32);
981                lookup_indices.push(None);
982            }
983            None => {}
984        }
985    }
986
987    if stream_indices.is_empty() {
988        return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
989    }
990
991    let take_stream = UInt32Array::from(stream_indices);
992    let mut columns = Vec::with_capacity(output_schema.fields().len());
993
994    for col in stream_batch.columns() {
995        columns.push(take(col.as_ref(), &take_stream, None)?);
996    }
997
998    let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
999    for col in table_batch.columns() {
1000        columns.push(take(col.as_ref(), &take_lookup, None)?);
1001    }
1002
1003    debug_assert_eq!(
1004        columns.len(),
1005        stream_field_count + table_batch.num_columns(),
1006        "output column count mismatch"
1007    );
1008
1009    Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
1010}
1011
1012// ── Partial Lookup Join Exec ──────────────────────────────────────
1013
1014/// Physical plan that probes a bounded foyer cache per key for each
1015/// batch from the streaming input. Used for on-demand/partial tables
1016/// where the full dataset does not fit in memory.
1017pub struct PartialLookupJoinExec {
1018    input: Arc<dyn ExecutionPlan>,
1019    foyer_cache: Arc<FoyerMemoryCache>,
1020    stream_key_indices: Vec<usize>,
1021    join_type: LookupJoinType,
1022    schema: SchemaRef,
1023    properties: PlanProperties,
1024    /// Prebuilt `RowConverter` — built once at planning time, reused on
1025    /// every `execute()`. Previously rebuilt per-cycle.
1026    converter: Arc<RowConverter>,
1027    stream_field_count: usize,
1028    lookup_schema: SchemaRef,
1029    source: Option<Arc<dyn LookupSourceDyn>>,
1030    fetch_semaphore: Arc<Semaphore>,
1031}
1032
1033impl PartialLookupJoinExec {
1034    /// Creates a new partial lookup join executor.
1035    ///
1036    /// # Errors
1037    ///
1038    /// Returns an error if the output schema cannot be constructed.
1039    pub fn try_new(
1040        input: Arc<dyn ExecutionPlan>,
1041        foyer_cache: Arc<FoyerMemoryCache>,
1042        stream_key_indices: Vec<usize>,
1043        key_sort_fields: Vec<SortField>,
1044        join_type: LookupJoinType,
1045        lookup_schema: SchemaRef,
1046        output_schema: SchemaRef,
1047    ) -> Result<Self> {
1048        Self::try_new_with_source(
1049            input,
1050            foyer_cache,
1051            stream_key_indices,
1052            key_sort_fields,
1053            join_type,
1054            lookup_schema,
1055            output_schema,
1056            None,
1057            Arc::new(Semaphore::new(64)),
1058        )
1059    }
1060
1061    /// Creates a new partial lookup join executor with optional source fallback.
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns an error if the output schema cannot be constructed.
1066    #[allow(clippy::too_many_arguments)]
1067    pub fn try_new_with_source(
1068        input: Arc<dyn ExecutionPlan>,
1069        foyer_cache: Arc<FoyerMemoryCache>,
1070        stream_key_indices: Vec<usize>,
1071        key_sort_fields: Vec<SortField>,
1072        join_type: LookupJoinType,
1073        lookup_schema: SchemaRef,
1074        output_schema: SchemaRef,
1075        source: Option<Arc<dyn LookupSourceDyn>>,
1076        fetch_semaphore: Arc<Semaphore>,
1077    ) -> Result<Self> {
1078        let output_schema = if join_type == LookupJoinType::LeftOuter {
1079            let stream_count = input.schema().fields().len();
1080            let mut fields = output_schema.fields().to_vec();
1081            for f in &mut fields[stream_count..] {
1082                if !f.is_nullable() {
1083                    *f = Arc::new(f.as_ref().clone().with_nullable(true));
1084                }
1085            }
1086            Arc::new(Schema::new_with_metadata(
1087                fields,
1088                output_schema.metadata().clone(),
1089            ))
1090        } else {
1091            output_schema
1092        };
1093
1094        let properties = PlanProperties::new(
1095            EquivalenceProperties::new(Arc::clone(&output_schema)),
1096            Partitioning::UnknownPartitioning(1),
1097            EmissionType::Incremental,
1098            Boundedness::Unbounded {
1099                requires_infinite_memory: false,
1100            },
1101        );
1102
1103        let stream_field_count = input.schema().fields().len();
1104        let converter = Arc::new(RowConverter::new(key_sort_fields)?);
1105
1106        Ok(Self {
1107            input,
1108            foyer_cache,
1109            stream_key_indices,
1110            join_type,
1111            schema: output_schema,
1112            properties,
1113            converter,
1114            stream_field_count,
1115            lookup_schema,
1116            source,
1117            fetch_semaphore,
1118        })
1119    }
1120}
1121
1122impl Debug for PartialLookupJoinExec {
1123    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1124        f.debug_struct("PartialLookupJoinExec")
1125            .field("join_type", &self.join_type)
1126            .field("stream_keys", &self.stream_key_indices)
1127            .field("cache_table_id", &self.foyer_cache.table_id())
1128            .finish_non_exhaustive()
1129    }
1130}
1131
1132impl DisplayAs for PartialLookupJoinExec {
1133    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1134        match t {
1135            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1136                write!(
1137                    f,
1138                    "PartialLookupJoinExec: type={}, stream_keys={:?}, cache_entries={}",
1139                    self.join_type,
1140                    self.stream_key_indices,
1141                    self.foyer_cache.len(),
1142                )
1143            }
1144            DisplayFormatType::TreeRender => write!(f, "PartialLookupJoinExec"),
1145        }
1146    }
1147}
1148
1149impl ExecutionPlan for PartialLookupJoinExec {
1150    fn name(&self) -> &'static str {
1151        "PartialLookupJoinExec"
1152    }
1153
1154    fn as_any(&self) -> &dyn Any {
1155        self
1156    }
1157
1158    fn schema(&self) -> SchemaRef {
1159        Arc::clone(&self.schema)
1160    }
1161
1162    fn properties(&self) -> &PlanProperties {
1163        &self.properties
1164    }
1165
1166    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1167        vec![&self.input]
1168    }
1169
1170    fn with_new_children(
1171        self: Arc<Self>,
1172        mut children: Vec<Arc<dyn ExecutionPlan>>,
1173    ) -> Result<Arc<dyn ExecutionPlan>> {
1174        if children.len() != 1 {
1175            return Err(DataFusionError::Plan(
1176                "PartialLookupJoinExec requires exactly one child".into(),
1177            ));
1178        }
1179        Ok(Arc::new(Self {
1180            input: children.swap_remove(0),
1181            foyer_cache: Arc::clone(&self.foyer_cache),
1182            stream_key_indices: self.stream_key_indices.clone(),
1183            join_type: self.join_type,
1184            schema: Arc::clone(&self.schema),
1185            properties: self.properties.clone(),
1186            converter: Arc::clone(&self.converter),
1187            stream_field_count: self.stream_field_count,
1188            lookup_schema: Arc::clone(&self.lookup_schema),
1189            source: self.source.clone(),
1190            fetch_semaphore: Arc::clone(&self.fetch_semaphore),
1191        }))
1192    }
1193
1194    fn execute(
1195        &self,
1196        partition: usize,
1197        context: Arc<TaskContext>,
1198    ) -> Result<SendableRecordBatchStream> {
1199        let input_stream = self.input.execute(partition, context)?;
1200        let converter = Arc::clone(&self.converter);
1201        let foyer_cache = Arc::clone(&self.foyer_cache);
1202        let stream_key_indices = self.stream_key_indices.clone();
1203        let join_type = self.join_type;
1204        let schema = self.schema();
1205        let stream_field_count = self.stream_field_count;
1206        let lookup_schema = Arc::clone(&self.lookup_schema);
1207        let source = self.source.clone();
1208        let fetch_semaphore = Arc::clone(&self.fetch_semaphore);
1209
1210        let output = input_stream.then(move |result| {
1211            let foyer_cache = Arc::clone(&foyer_cache);
1212            let converter = Arc::clone(&converter);
1213            let stream_key_indices = stream_key_indices.clone();
1214            let schema = Arc::clone(&schema);
1215            let lookup_schema = Arc::clone(&lookup_schema);
1216            let source = source.clone();
1217            let fetch_semaphore = Arc::clone(&fetch_semaphore);
1218            async move {
1219                let batch = result?;
1220                if batch.num_rows() == 0 {
1221                    return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
1222                }
1223                probe_partial_batch_with_fallback(
1224                    &batch,
1225                    &converter,
1226                    &foyer_cache,
1227                    &stream_key_indices,
1228                    join_type,
1229                    &schema,
1230                    stream_field_count,
1231                    &lookup_schema,
1232                    source.as_deref(),
1233                    &fetch_semaphore,
1234                )
1235                .await
1236            }
1237        });
1238
1239        Ok(Box::pin(RecordBatchStreamAdapter::new(
1240            self.schema(),
1241            output,
1242        )))
1243    }
1244}
1245
1246impl datafusion::physical_plan::ExecutionPlanProperties for PartialLookupJoinExec {
1247    fn output_partitioning(&self) -> &Partitioning {
1248        self.properties.output_partitioning()
1249    }
1250
1251    fn output_ordering(&self) -> Option<&LexOrdering> {
1252        self.properties.output_ordering()
1253    }
1254
1255    fn boundedness(&self) -> Boundedness {
1256        Boundedness::Unbounded {
1257            requires_infinite_memory: false,
1258        }
1259    }
1260
1261    fn pipeline_behavior(&self) -> EmissionType {
1262        EmissionType::Incremental
1263    }
1264
1265    fn equivalence_properties(&self) -> &EquivalenceProperties {
1266        self.properties.equivalence_properties()
1267    }
1268}
1269
1270/// Probes the foyer cache for each row in `stream_batch`, falling back
1271/// to the async source for cache misses. Inserts source results into
1272/// the cache before building the output.
1273#[allow(clippy::too_many_arguments)]
1274async fn probe_partial_batch_with_fallback(
1275    stream_batch: &RecordBatch,
1276    converter: &RowConverter,
1277    foyer_cache: &FoyerMemoryCache,
1278    stream_key_indices: &[usize],
1279    join_type: LookupJoinType,
1280    output_schema: &SchemaRef,
1281    stream_field_count: usize,
1282    lookup_schema: &SchemaRef,
1283    source: Option<&dyn LookupSourceDyn>,
1284    fetch_semaphore: &Semaphore,
1285) -> Result<RecordBatch> {
1286    let key_cols: Vec<_> = stream_key_indices
1287        .iter()
1288        .map(|&i| stream_batch.column(i).clone())
1289        .collect();
1290    let rows = converter.convert_columns(&key_cols)?;
1291
1292    let num_rows = stream_batch.num_rows();
1293    let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
1294    let mut lookup_batches: Vec<Option<RecordBatch>> = Vec::with_capacity(num_rows);
1295    let mut miss_keys: Vec<(usize, Vec<u8>)> = Vec::new();
1296
1297    #[allow(clippy::cast_possible_truncation)]
1298    for row in 0..num_rows {
1299        // SQL semantics: NULL != NULL, so rows with any null key never match.
1300        if key_cols.iter().any(|c| c.is_null(row)) {
1301            if join_type == LookupJoinType::LeftOuter {
1302                stream_indices.push(row as u32);
1303                lookup_batches.push(None);
1304            }
1305            continue;
1306        }
1307
1308        let key = rows.row(row);
1309        let result = foyer_cache.get_cached(key.as_ref());
1310        if let Some(batch) = result.into_batch() {
1311            stream_indices.push(row as u32);
1312            lookup_batches.push(Some(batch));
1313        } else {
1314            let idx = stream_indices.len();
1315            stream_indices.push(row as u32);
1316            lookup_batches.push(None);
1317            miss_keys.push((idx, key.as_ref().to_vec()));
1318        }
1319    }
1320
1321    // Fetch missed keys from the source in a single batch query
1322    if let Some(source) = source {
1323        if !miss_keys.is_empty() {
1324            let _permit = fetch_semaphore
1325                .acquire()
1326                .await
1327                .map_err(|_| DataFusionError::Internal("fetch semaphore closed".into()))?;
1328
1329            let key_refs: Vec<&[u8]> = miss_keys.iter().map(|(_, k)| k.as_slice()).collect();
1330            let source_results = source.query_batch(&key_refs, &[], &[]).await;
1331
1332            match source_results {
1333                Ok(results) => {
1334                    for ((idx, key_bytes), maybe_batch) in miss_keys.iter().zip(results) {
1335                        if let Some(batch) = maybe_batch {
1336                            foyer_cache.insert(key_bytes, batch.clone());
1337                            lookup_batches[*idx] = Some(batch);
1338                        }
1339                    }
1340                }
1341                Err(e) => {
1342                    tracing::warn!(
1343                        error = %e,
1344                        missed_keys = miss_keys.len(),
1345                        "partial lookup: source fallback failed, serving cache-only results"
1346                    );
1347                }
1348            }
1349        }
1350    }
1351
1352    // For inner joins, remove rows that still have no match
1353    if join_type == LookupJoinType::Inner {
1354        let mut write = 0;
1355        for read in 0..stream_indices.len() {
1356            if lookup_batches[read].is_some() {
1357                stream_indices[write] = stream_indices[read];
1358                lookup_batches.swap(write, read);
1359                write += 1;
1360            }
1361        }
1362        stream_indices.truncate(write);
1363        lookup_batches.truncate(write);
1364    }
1365
1366    if stream_indices.is_empty() {
1367        return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
1368    }
1369
1370    let take_indices = UInt32Array::from(stream_indices);
1371    let mut columns = Vec::with_capacity(output_schema.fields().len());
1372
1373    for col in stream_batch.columns() {
1374        columns.push(take(col.as_ref(), &take_indices, None)?);
1375    }
1376
1377    let lookup_col_count = lookup_schema.fields().len();
1378    for col_idx in 0..lookup_col_count {
1379        let arrays: Vec<_> = lookup_batches
1380            .iter()
1381            .map(|opt| match opt {
1382                Some(b) => b.column(col_idx).clone(),
1383                None => arrow_array::new_null_array(lookup_schema.field(col_idx).data_type(), 1),
1384            })
1385            .collect();
1386        let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(AsRef::as_ref).collect();
1387        columns.push(arrow::compute::concat(&refs)?);
1388    }
1389
1390    debug_assert_eq!(
1391        columns.len(),
1392        stream_field_count + lookup_col_count,
1393        "output column count mismatch"
1394    );
1395
1396    Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
1397}
1398
1399// ── Extension Planner ────────────────────────────────────────────
1400
1401/// Converts `LookupJoinNode` logical plans to [`LookupJoinExec`]
1402/// or [`PartialLookupJoinExec`] physical plans by resolving table
1403/// data from the registry.
1404pub struct LookupJoinExtensionPlanner {
1405    registry: Arc<LookupTableRegistry>,
1406}
1407
1408impl LookupJoinExtensionPlanner {
1409    /// Creates a planner backed by the given registry.
1410    pub fn new(registry: Arc<LookupTableRegistry>) -> Self {
1411        Self { registry }
1412    }
1413}
1414
1415#[async_trait]
1416impl ExtensionPlanner for LookupJoinExtensionPlanner {
1417    #[allow(clippy::too_many_lines)]
1418    async fn plan_extension(
1419        &self,
1420        _planner: &dyn PhysicalPlanner,
1421        node: &dyn UserDefinedLogicalNode,
1422        _logical_inputs: &[&LogicalPlan],
1423        physical_inputs: &[Arc<dyn ExecutionPlan>],
1424        session_state: &SessionState,
1425    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1426        let Some(lookup_node) = node.as_any().downcast_ref::<LookupJoinNode>() else {
1427            return Ok(None);
1428        };
1429
1430        let entry = self
1431            .registry
1432            .get_entry(lookup_node.lookup_table_name())
1433            .ok_or_else(|| {
1434                DataFusionError::Plan(format!(
1435                    "lookup table '{}' not registered",
1436                    lookup_node.lookup_table_name()
1437                ))
1438            })?;
1439
1440        let input = Arc::clone(&physical_inputs[0]);
1441        let stream_schema = input.schema();
1442
1443        match entry {
1444            RegisteredLookup::Partial(partial_state) => {
1445                let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1446
1447                let mut output_fields = stream_schema.fields().to_vec();
1448                output_fields.extend(partial_state.schema.fields().iter().cloned());
1449                let output_schema = Arc::new(Schema::new(output_fields));
1450
1451                let exec = PartialLookupJoinExec::try_new_with_source(
1452                    input,
1453                    Arc::clone(&partial_state.foyer_cache),
1454                    stream_key_indices,
1455                    partial_state.key_sort_fields.clone(),
1456                    lookup_node.join_type(),
1457                    Arc::clone(&partial_state.schema),
1458                    output_schema,
1459                    partial_state.source.clone(),
1460                    Arc::clone(&partial_state.fetch_semaphore),
1461                )?;
1462                Ok(Some(Arc::new(exec)))
1463            }
1464            RegisteredLookup::Snapshot(snapshot) => {
1465                let lookup_schema = snapshot.batch.schema();
1466                let lookup_key_indices = resolve_lookup_keys(lookup_node, &lookup_schema)?;
1467
1468                let lookup_batch = if lookup_node.pushdown_predicates().is_empty()
1469                    || snapshot.batch.num_rows() == 0
1470                {
1471                    snapshot.batch.clone()
1472                } else {
1473                    apply_pushdown_predicates(
1474                        &snapshot.batch,
1475                        lookup_node.pushdown_predicates(),
1476                        session_state,
1477                    )?
1478                };
1479
1480                let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1481
1482                // Validate join key types are compatible
1483                for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1484                    let st = stream_schema.field(*si).data_type();
1485                    let lt = lookup_schema.field(*li).data_type();
1486                    if st != lt {
1487                        return Err(DataFusionError::Plan(format!(
1488                            "Lookup join key type mismatch: stream '{}' is {st:?} \
1489                             but lookup '{}' is {lt:?}",
1490                            stream_schema.field(*si).name(),
1491                            lookup_schema.field(*li).name(),
1492                        )));
1493                    }
1494                }
1495
1496                let mut output_fields = stream_schema.fields().to_vec();
1497                output_fields.extend(lookup_batch.schema().fields().iter().cloned());
1498                let output_schema = Arc::new(Schema::new(output_fields));
1499
1500                let exec = LookupJoinExec::try_new(
1501                    input,
1502                    lookup_batch,
1503                    stream_key_indices,
1504                    lookup_key_indices,
1505                    lookup_node.join_type(),
1506                    output_schema,
1507                )?;
1508
1509                Ok(Some(Arc::new(exec)))
1510            }
1511            RegisteredLookup::Versioned(versioned_state) => {
1512                let table_schema = versioned_state.batch.schema();
1513                let lookup_key_indices = resolve_lookup_keys(lookup_node, &table_schema)?;
1514                let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1515
1516                // Validate key type compatibility.
1517                for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1518                    let st = stream_schema.field(*si).data_type();
1519                    let lt = table_schema.field(*li).data_type();
1520                    if st != lt {
1521                        return Err(DataFusionError::Plan(format!(
1522                            "Temporal join key type mismatch: stream '{}' is {st:?} \
1523                             but table '{}' is {lt:?}",
1524                            stream_schema.field(*si).name(),
1525                            table_schema.field(*li).name(),
1526                        )));
1527                    }
1528                }
1529
1530                let stream_time_col_idx = stream_schema
1531                    .index_of(&versioned_state.stream_time_column)
1532                    .map_err(|_| {
1533                        DataFusionError::Plan(format!(
1534                            "stream time column '{}' not found in stream schema",
1535                            versioned_state.stream_time_column
1536                        ))
1537                    })?;
1538
1539                let key_sort_fields: Vec<SortField> = lookup_key_indices
1540                    .iter()
1541                    .map(|&i| SortField::new(table_schema.field(i).data_type().clone()))
1542                    .collect();
1543
1544                let mut output_fields = stream_schema.fields().to_vec();
1545                output_fields.extend(table_schema.fields().iter().cloned());
1546                let output_schema = Arc::new(Schema::new(output_fields));
1547
1548                let exec = VersionedLookupJoinExec::try_new(
1549                    input,
1550                    versioned_state.batch.clone(),
1551                    Arc::clone(&versioned_state.index),
1552                    stream_key_indices,
1553                    stream_time_col_idx,
1554                    lookup_node.join_type(),
1555                    output_schema,
1556                    key_sort_fields,
1557                )?;
1558
1559                Ok(Some(Arc::new(exec)))
1560            }
1561        }
1562    }
1563}
1564
1565/// Evaluates pushdown predicates against the lookup snapshot, returning
1566/// only the rows that pass all predicates. This shrinks the hash index.
1567fn apply_pushdown_predicates(
1568    batch: &RecordBatch,
1569    predicates: &[Expr],
1570    session_state: &SessionState,
1571) -> Result<RecordBatch> {
1572    use arrow::compute::filter_record_batch;
1573    use datafusion::physical_expr::create_physical_expr;
1574
1575    let schema = batch.schema();
1576    let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
1577
1578    let mut mask = None::<arrow_array::BooleanArray>;
1579    for pred in predicates {
1580        let phys_expr = create_physical_expr(pred, &df_schema, session_state.execution_props())?;
1581        let result = phys_expr.evaluate(batch)?;
1582        let bool_arr = result
1583            .into_array(batch.num_rows())?
1584            .as_any()
1585            .downcast_ref::<arrow_array::BooleanArray>()
1586            .ok_or_else(|| {
1587                DataFusionError::Internal("pushdown predicate did not evaluate to boolean".into())
1588            })?
1589            .clone();
1590        mask = Some(match mask {
1591            Some(existing) => arrow::compute::and(&existing, &bool_arr)?,
1592            None => bool_arr,
1593        });
1594    }
1595
1596    match mask {
1597        Some(m) => Ok(filter_record_batch(batch, &m)?),
1598        None => Ok(batch.clone()),
1599    }
1600}
1601
1602fn resolve_stream_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1603    node.join_keys()
1604        .iter()
1605        .map(|pair| match &pair.stream_expr {
1606            Expr::Column(col) => schema.index_of(&col.name).map_err(|_| {
1607                DataFusionError::Plan(format!(
1608                    "stream key column '{}' not found in physical schema",
1609                    col.name
1610                ))
1611            }),
1612            other => Err(DataFusionError::NotImplemented(format!(
1613                "lookup join requires column references as stream keys, got: {other}"
1614            ))),
1615        })
1616        .collect()
1617}
1618
1619fn resolve_lookup_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1620    node.join_keys()
1621        .iter()
1622        .map(|pair| {
1623            schema.index_of(&pair.lookup_column).map_err(|_| {
1624                DataFusionError::Plan(format!(
1625                    "lookup key column '{}' not found in lookup table schema",
1626                    pair.lookup_column
1627                ))
1628            })
1629        })
1630        .collect()
1631}
1632
1633// ── Tests ────────────────────────────────────────────────────────
1634
1635#[cfg(test)]
1636mod tests {
1637    use super::*;
1638    use arrow_array::{Array, Float64Array, Int64Array, StringArray};
1639    use arrow_schema::{DataType, Field};
1640    use datafusion::physical_plan::stream::RecordBatchStreamAdapter as TestStreamAdapter;
1641    use futures::TryStreamExt;
1642
1643    /// Creates a bounded `ExecutionPlan` from a single `RecordBatch`.
1644    fn batch_exec(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
1645        let schema = batch.schema();
1646        let batches = vec![batch];
1647        let stream_schema = Arc::clone(&schema);
1648        Arc::new(StreamExecStub {
1649            schema,
1650            batches: std::sync::Mutex::new(Some(batches)),
1651            stream_schema,
1652        })
1653    }
1654
1655    /// Minimal bounded exec for tests — produces one partition of batches.
1656    struct StreamExecStub {
1657        schema: SchemaRef,
1658        batches: std::sync::Mutex<Option<Vec<RecordBatch>>>,
1659        stream_schema: SchemaRef,
1660    }
1661
1662    impl Debug for StreamExecStub {
1663        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1664            write!(f, "StreamExecStub")
1665        }
1666    }
1667
1668    impl DisplayAs for StreamExecStub {
1669        fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1670            write!(f, "StreamExecStub")
1671        }
1672    }
1673
1674    impl ExecutionPlan for StreamExecStub {
1675        fn name(&self) -> &'static str {
1676            "StreamExecStub"
1677        }
1678        fn as_any(&self) -> &dyn Any {
1679            self
1680        }
1681        fn schema(&self) -> SchemaRef {
1682            Arc::clone(&self.schema)
1683        }
1684        fn properties(&self) -> &PlanProperties {
1685            // Leak a static PlanProperties for test simplicity
1686            Box::leak(Box::new(PlanProperties::new(
1687                EquivalenceProperties::new(Arc::clone(&self.schema)),
1688                Partitioning::UnknownPartitioning(1),
1689                EmissionType::Final,
1690                Boundedness::Bounded,
1691            )))
1692        }
1693        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1694            vec![]
1695        }
1696        fn with_new_children(
1697            self: Arc<Self>,
1698            _: Vec<Arc<dyn ExecutionPlan>>,
1699        ) -> Result<Arc<dyn ExecutionPlan>> {
1700            Ok(self)
1701        }
1702        fn execute(&self, _: usize, _: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
1703            let batches = self.batches.lock().unwrap().take().unwrap_or_default();
1704            let schema = Arc::clone(&self.stream_schema);
1705            let stream = futures::stream::iter(batches.into_iter().map(Ok));
1706            Ok(Box::pin(TestStreamAdapter::new(schema, stream)))
1707        }
1708    }
1709
1710    impl datafusion::physical_plan::ExecutionPlanProperties for StreamExecStub {
1711        fn output_partitioning(&self) -> &Partitioning {
1712            self.properties().output_partitioning()
1713        }
1714        fn output_ordering(&self) -> Option<&LexOrdering> {
1715            None
1716        }
1717        fn boundedness(&self) -> Boundedness {
1718            Boundedness::Bounded
1719        }
1720        fn pipeline_behavior(&self) -> EmissionType {
1721            EmissionType::Final
1722        }
1723        fn equivalence_properties(&self) -> &EquivalenceProperties {
1724            self.properties().equivalence_properties()
1725        }
1726    }
1727
1728    fn orders_schema() -> SchemaRef {
1729        Arc::new(Schema::new(vec![
1730            Field::new("order_id", DataType::Int64, false),
1731            Field::new("customer_id", DataType::Int64, false),
1732            Field::new("amount", DataType::Float64, false),
1733        ]))
1734    }
1735
1736    fn customers_schema() -> SchemaRef {
1737        Arc::new(Schema::new(vec![
1738            Field::new("id", DataType::Int64, false),
1739            Field::new("name", DataType::Utf8, true),
1740        ]))
1741    }
1742
1743    fn output_schema() -> SchemaRef {
1744        Arc::new(Schema::new(vec![
1745            Field::new("order_id", DataType::Int64, false),
1746            Field::new("customer_id", DataType::Int64, false),
1747            Field::new("amount", DataType::Float64, false),
1748            Field::new("id", DataType::Int64, false),
1749            Field::new("name", DataType::Utf8, true),
1750        ]))
1751    }
1752
1753    fn customers_batch() -> RecordBatch {
1754        RecordBatch::try_new(
1755            customers_schema(),
1756            vec![
1757                Arc::new(Int64Array::from(vec![1, 2, 3])),
1758                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1759            ],
1760        )
1761        .unwrap()
1762    }
1763
1764    fn orders_batch() -> RecordBatch {
1765        RecordBatch::try_new(
1766            orders_schema(),
1767            vec![
1768                Arc::new(Int64Array::from(vec![100, 101, 102, 103])),
1769                Arc::new(Int64Array::from(vec![1, 2, 99, 3])),
1770                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0, 40.0])),
1771            ],
1772        )
1773        .unwrap()
1774    }
1775
1776    fn make_exec(join_type: LookupJoinType) -> LookupJoinExec {
1777        let input = batch_exec(orders_batch());
1778        LookupJoinExec::try_new(
1779            input,
1780            customers_batch(),
1781            vec![1], // customer_id
1782            vec![0], // id
1783            join_type,
1784            output_schema(),
1785        )
1786        .unwrap()
1787    }
1788
1789    #[tokio::test]
1790    async fn inner_join_filters_non_matches() {
1791        let exec = make_exec(LookupJoinType::Inner);
1792        let ctx = Arc::new(TaskContext::default());
1793        let stream = exec.execute(0, ctx).unwrap();
1794        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1795
1796        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1797        assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
1798
1799        let names = batches[0]
1800            .column(4)
1801            .as_any()
1802            .downcast_ref::<StringArray>()
1803            .unwrap();
1804        assert_eq!(names.value(0), "Alice");
1805        assert_eq!(names.value(1), "Bob");
1806        assert_eq!(names.value(2), "Charlie");
1807    }
1808
1809    #[tokio::test]
1810    async fn left_outer_preserves_non_matches() {
1811        let exec = make_exec(LookupJoinType::LeftOuter);
1812        let ctx = Arc::new(TaskContext::default());
1813        let stream = exec.execute(0, ctx).unwrap();
1814        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1815
1816        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1817        assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
1818
1819        let names = batches[0]
1820            .column(4)
1821            .as_any()
1822            .downcast_ref::<StringArray>()
1823            .unwrap();
1824        // Row 2 (customer_id=99) should have null name
1825        assert!(names.is_null(2));
1826    }
1827
1828    #[tokio::test]
1829    async fn empty_lookup_inner_produces_no_rows() {
1830        let empty = RecordBatch::new_empty(customers_schema());
1831        let input = batch_exec(orders_batch());
1832        let exec = LookupJoinExec::try_new(
1833            input,
1834            empty,
1835            vec![1],
1836            vec![0],
1837            LookupJoinType::Inner,
1838            output_schema(),
1839        )
1840        .unwrap();
1841
1842        let ctx = Arc::new(TaskContext::default());
1843        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1844        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1845        assert_eq!(total, 0);
1846    }
1847
1848    #[tokio::test]
1849    async fn empty_lookup_left_outer_preserves_all_stream_rows() {
1850        let empty = RecordBatch::new_empty(customers_schema());
1851        let input = batch_exec(orders_batch());
1852        let exec = LookupJoinExec::try_new(
1853            input,
1854            empty,
1855            vec![1],
1856            vec![0],
1857            LookupJoinType::LeftOuter,
1858            output_schema(),
1859        )
1860        .unwrap();
1861
1862        let ctx = Arc::new(TaskContext::default());
1863        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1864        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1865        assert_eq!(total, 4);
1866    }
1867
1868    #[tokio::test]
1869    async fn duplicate_keys_produce_multiple_rows() {
1870        let lookup = RecordBatch::try_new(
1871            customers_schema(),
1872            vec![
1873                Arc::new(Int64Array::from(vec![1, 1])),
1874                Arc::new(StringArray::from(vec!["Alice-A", "Alice-B"])),
1875            ],
1876        )
1877        .unwrap();
1878
1879        let stream = RecordBatch::try_new(
1880            orders_schema(),
1881            vec![
1882                Arc::new(Int64Array::from(vec![100])),
1883                Arc::new(Int64Array::from(vec![1])),
1884                Arc::new(Float64Array::from(vec![10.0])),
1885            ],
1886        )
1887        .unwrap();
1888
1889        let input = batch_exec(stream);
1890        let exec = LookupJoinExec::try_new(
1891            input,
1892            lookup,
1893            vec![1],
1894            vec![0],
1895            LookupJoinType::Inner,
1896            output_schema(),
1897        )
1898        .unwrap();
1899
1900        let ctx = Arc::new(TaskContext::default());
1901        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1902        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1903        assert_eq!(total, 2, "one stream row matched two lookup rows");
1904    }
1905
1906    #[test]
1907    fn with_new_children_preserves_state() {
1908        let exec = Arc::new(make_exec(LookupJoinType::Inner));
1909        let expected_schema = exec.schema();
1910        let children = exec.children().into_iter().cloned().collect();
1911        let rebuilt = exec.with_new_children(children).unwrap();
1912        assert_eq!(rebuilt.schema(), expected_schema);
1913        assert_eq!(rebuilt.name(), "LookupJoinExec");
1914    }
1915
1916    #[test]
1917    fn display_format() {
1918        let exec = make_exec(LookupJoinType::Inner);
1919        let s = format!("{exec:?}");
1920        assert!(s.contains("LookupJoinExec"));
1921        assert!(s.contains("lookup_rows: 3"));
1922    }
1923
1924    #[test]
1925    fn registry_crud() {
1926        let reg = LookupTableRegistry::new();
1927        assert!(reg.get("customers").is_none());
1928
1929        reg.register(
1930            "customers",
1931            LookupSnapshot {
1932                batch: customers_batch(),
1933                key_columns: vec!["id".into()],
1934            },
1935        );
1936        assert!(reg.get("customers").is_some());
1937        assert!(reg.get("CUSTOMERS").is_some(), "case-insensitive");
1938
1939        reg.unregister("customers");
1940        assert!(reg.get("customers").is_none());
1941    }
1942
1943    #[test]
1944    fn registry_update_replaces() {
1945        let reg = LookupTableRegistry::new();
1946        reg.register(
1947            "t",
1948            LookupSnapshot {
1949                batch: RecordBatch::new_empty(customers_schema()),
1950                key_columns: vec![],
1951            },
1952        );
1953        assert_eq!(reg.get("t").unwrap().batch.num_rows(), 0);
1954
1955        reg.register(
1956            "t",
1957            LookupSnapshot {
1958                batch: customers_batch(),
1959                key_columns: vec![],
1960            },
1961        );
1962        assert_eq!(reg.get("t").unwrap().batch.num_rows(), 3);
1963    }
1964
1965    #[test]
1966    fn pushdown_predicates_filter_snapshot() {
1967        use datafusion::logical_expr::{col, lit};
1968
1969        let batch = customers_batch(); // id=[1,2,3], name=[Alice,Bob,Charlie]
1970        let ctx = datafusion::prelude::SessionContext::new();
1971        let state = ctx.state();
1972
1973        // Filter: id > 1 (should keep rows 2 and 3)
1974        let predicates = vec![col("id").gt(lit(1i64))];
1975        let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
1976        assert_eq!(filtered.num_rows(), 2);
1977
1978        let ids = filtered
1979            .column(0)
1980            .as_any()
1981            .downcast_ref::<Int64Array>()
1982            .unwrap();
1983        assert_eq!(ids.value(0), 2);
1984        assert_eq!(ids.value(1), 3);
1985    }
1986
1987    #[test]
1988    fn pushdown_predicates_empty_passes_all() {
1989        let batch = customers_batch();
1990        let ctx = datafusion::prelude::SessionContext::new();
1991        let state = ctx.state();
1992
1993        let filtered = apply_pushdown_predicates(&batch, &[], &state).unwrap();
1994        assert_eq!(filtered.num_rows(), 3);
1995    }
1996
1997    #[test]
1998    fn pushdown_predicates_multiple_and() {
1999        use datafusion::logical_expr::{col, lit};
2000
2001        let batch = customers_batch(); // id=[1,2,3]
2002        let ctx = datafusion::prelude::SessionContext::new();
2003        let state = ctx.state();
2004
2005        // id >= 2 AND id < 3 → only row with id=2
2006        let predicates = vec![col("id").gt_eq(lit(2i64)), col("id").lt(lit(3i64))];
2007        let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
2008        assert_eq!(filtered.num_rows(), 1);
2009    }
2010
2011    // ── PartialLookupJoinExec Tests ──────────────────────────────
2012
2013    use laminar_core::lookup::foyer_cache::FoyerMemoryCacheConfig;
2014
2015    fn make_foyer_cache() -> Arc<FoyerMemoryCache> {
2016        Arc::new(FoyerMemoryCache::new(
2017            1,
2018            FoyerMemoryCacheConfig {
2019                capacity: 64,
2020                shards: 4,
2021            },
2022        ))
2023    }
2024
2025    fn customer_row(id: i64, name: &str) -> RecordBatch {
2026        RecordBatch::try_new(
2027            customers_schema(),
2028            vec![
2029                Arc::new(Int64Array::from(vec![id])),
2030                Arc::new(StringArray::from(vec![name])),
2031            ],
2032        )
2033        .unwrap()
2034    }
2035
2036    fn warm_cache(cache: &FoyerMemoryCache) {
2037        let converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap();
2038
2039        for (id, name) in [(1, "Alice"), (2, "Bob"), (3, "Charlie")] {
2040            let key_col = Int64Array::from(vec![id]);
2041            let rows = converter.convert_columns(&[Arc::new(key_col)]).unwrap();
2042            let key = rows.row(0);
2043            cache.insert(key.as_ref(), customer_row(id, name));
2044        }
2045    }
2046
2047    fn make_partial_exec(join_type: LookupJoinType) -> PartialLookupJoinExec {
2048        let cache = make_foyer_cache();
2049        warm_cache(&cache);
2050
2051        let input = batch_exec(orders_batch());
2052        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2053
2054        PartialLookupJoinExec::try_new(
2055            input,
2056            cache,
2057            vec![1], // customer_id
2058            key_sort_fields,
2059            join_type,
2060            customers_schema(),
2061            output_schema(),
2062        )
2063        .unwrap()
2064    }
2065
2066    #[tokio::test]
2067    async fn partial_inner_join_filters_non_matches() {
2068        let exec = make_partial_exec(LookupJoinType::Inner);
2069        let ctx = Arc::new(TaskContext::default());
2070        let stream = exec.execute(0, ctx).unwrap();
2071        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2072
2073        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2074        assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
2075
2076        let names = batches[0]
2077            .column(4)
2078            .as_any()
2079            .downcast_ref::<StringArray>()
2080            .unwrap();
2081        assert_eq!(names.value(0), "Alice");
2082        assert_eq!(names.value(1), "Bob");
2083        assert_eq!(names.value(2), "Charlie");
2084    }
2085
2086    #[tokio::test]
2087    async fn partial_left_outer_preserves_non_matches() {
2088        let exec = make_partial_exec(LookupJoinType::LeftOuter);
2089        let ctx = Arc::new(TaskContext::default());
2090        let stream = exec.execute(0, ctx).unwrap();
2091        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2092
2093        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2094        assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
2095
2096        let names = batches[0]
2097            .column(4)
2098            .as_any()
2099            .downcast_ref::<StringArray>()
2100            .unwrap();
2101        assert!(names.is_null(2), "customer_id=99 should have null name");
2102    }
2103
2104    #[tokio::test]
2105    async fn partial_empty_cache_inner_produces_no_rows() {
2106        let cache = make_foyer_cache();
2107        let input = batch_exec(orders_batch());
2108        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2109
2110        let exec = PartialLookupJoinExec::try_new(
2111            input,
2112            cache,
2113            vec![1],
2114            key_sort_fields,
2115            LookupJoinType::Inner,
2116            customers_schema(),
2117            output_schema(),
2118        )
2119        .unwrap();
2120
2121        let ctx = Arc::new(TaskContext::default());
2122        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2123        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2124        assert_eq!(total, 0);
2125    }
2126
2127    #[tokio::test]
2128    async fn partial_empty_cache_left_outer_preserves_all() {
2129        let cache = make_foyer_cache();
2130        let input = batch_exec(orders_batch());
2131        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2132
2133        let exec = PartialLookupJoinExec::try_new(
2134            input,
2135            cache,
2136            vec![1],
2137            key_sort_fields,
2138            LookupJoinType::LeftOuter,
2139            customers_schema(),
2140            output_schema(),
2141        )
2142        .unwrap();
2143
2144        let ctx = Arc::new(TaskContext::default());
2145        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2146        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2147        assert_eq!(total, 4);
2148    }
2149
2150    #[test]
2151    fn partial_with_new_children_preserves_state() {
2152        let exec = Arc::new(make_partial_exec(LookupJoinType::Inner));
2153        let expected_schema = exec.schema();
2154        let children = exec.children().into_iter().cloned().collect();
2155        let rebuilt = exec.with_new_children(children).unwrap();
2156        assert_eq!(rebuilt.schema(), expected_schema);
2157        assert_eq!(rebuilt.name(), "PartialLookupJoinExec");
2158    }
2159
2160    #[test]
2161    fn partial_display_format() {
2162        let exec = make_partial_exec(LookupJoinType::Inner);
2163        let s = format!("{exec:?}");
2164        assert!(s.contains("PartialLookupJoinExec"));
2165        assert!(s.contains("cache_table_id: 1"));
2166    }
2167
2168    #[test]
2169    fn registry_partial_entry() {
2170        let reg = LookupTableRegistry::new();
2171        let cache = make_foyer_cache();
2172        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2173
2174        reg.register_partial(
2175            "customers",
2176            PartialLookupState {
2177                foyer_cache: cache,
2178                schema: customers_schema(),
2179                key_columns: vec!["id".into()],
2180                key_sort_fields,
2181                source: None,
2182                fetch_semaphore: Arc::new(Semaphore::new(64)),
2183            },
2184        );
2185
2186        assert!(reg.get("customers").is_none());
2187
2188        let entry = reg.get_entry("customers");
2189        assert!(entry.is_some());
2190        assert!(matches!(entry.unwrap(), RegisteredLookup::Partial(_)));
2191    }
2192
2193    #[tokio::test]
2194    async fn partial_source_fallback_on_miss() {
2195        use laminar_core::lookup::source::LookupError;
2196        use laminar_core::lookup::source::LookupSourceDyn;
2197
2198        struct TestSource;
2199
2200        #[async_trait]
2201        impl LookupSourceDyn for TestSource {
2202            async fn query_batch(
2203                &self,
2204                keys: &[&[u8]],
2205                _predicates: &[laminar_core::lookup::predicate::Predicate],
2206                _projection: &[laminar_core::lookup::source::ColumnId],
2207            ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2208                Ok(keys
2209                    .iter()
2210                    .map(|_| Some(customer_row(99, "FromSource")))
2211                    .collect())
2212            }
2213
2214            fn schema(&self) -> SchemaRef {
2215                customers_schema()
2216            }
2217        }
2218
2219        let cache = make_foyer_cache();
2220        // Only warm id=1 in cache, id=99 will miss and go to source
2221        warm_cache(&cache);
2222
2223        let orders = RecordBatch::try_new(
2224            orders_schema(),
2225            vec![
2226                Arc::new(Int64Array::from(vec![200])),
2227                Arc::new(Int64Array::from(vec![99])), // not in cache
2228                Arc::new(Float64Array::from(vec![50.0])),
2229            ],
2230        )
2231        .unwrap();
2232
2233        let input = batch_exec(orders);
2234        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2235        let source: Arc<dyn LookupSourceDyn> = Arc::new(TestSource);
2236
2237        let exec = PartialLookupJoinExec::try_new_with_source(
2238            input,
2239            cache,
2240            vec![1],
2241            key_sort_fields,
2242            LookupJoinType::Inner,
2243            customers_schema(),
2244            output_schema(),
2245            Some(source),
2246            Arc::new(Semaphore::new(64)),
2247        )
2248        .unwrap();
2249
2250        let ctx = Arc::new(TaskContext::default());
2251        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2252        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2253        assert_eq!(total, 1, "source fallback should produce 1 row");
2254
2255        let names = batches[0]
2256            .column(4)
2257            .as_any()
2258            .downcast_ref::<StringArray>()
2259            .unwrap();
2260        assert_eq!(names.value(0), "FromSource");
2261    }
2262
2263    #[tokio::test]
2264    async fn partial_source_error_graceful_degradation() {
2265        use laminar_core::lookup::source::LookupError;
2266        use laminar_core::lookup::source::LookupSourceDyn;
2267
2268        struct FailingSource;
2269
2270        #[async_trait]
2271        impl LookupSourceDyn for FailingSource {
2272            async fn query_batch(
2273                &self,
2274                _keys: &[&[u8]],
2275                _predicates: &[laminar_core::lookup::predicate::Predicate],
2276                _projection: &[laminar_core::lookup::source::ColumnId],
2277            ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2278                Err(LookupError::Internal("source unavailable".into()))
2279            }
2280
2281            fn schema(&self) -> SchemaRef {
2282                customers_schema()
2283            }
2284        }
2285
2286        let cache = make_foyer_cache();
2287        let input = batch_exec(orders_batch());
2288        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2289        let source: Arc<dyn LookupSourceDyn> = Arc::new(FailingSource);
2290
2291        let exec = PartialLookupJoinExec::try_new_with_source(
2292            input,
2293            cache,
2294            vec![1],
2295            key_sort_fields,
2296            LookupJoinType::LeftOuter,
2297            customers_schema(),
2298            output_schema(),
2299            Some(source),
2300            Arc::new(Semaphore::new(64)),
2301        )
2302        .unwrap();
2303
2304        let ctx = Arc::new(TaskContext::default());
2305        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2306        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2307        // All rows preserved in left outer, but all lookup columns null
2308        assert_eq!(total, 4);
2309    }
2310
2311    #[test]
2312    fn registry_snapshot_entry_via_get_entry() {
2313        let reg = LookupTableRegistry::new();
2314        reg.register(
2315            "t",
2316            LookupSnapshot {
2317                batch: customers_batch(),
2318                key_columns: vec!["id".into()],
2319            },
2320        );
2321
2322        let entry = reg.get_entry("t");
2323        assert!(matches!(entry.unwrap(), RegisteredLookup::Snapshot(_)));
2324        assert!(reg.get("t").is_some());
2325    }
2326
2327    // ── NULL key tests ────────────────────────────────────────────────
2328
2329    fn nullable_orders_schema() -> SchemaRef {
2330        Arc::new(Schema::new(vec![
2331            Field::new("order_id", DataType::Int64, false),
2332            Field::new("customer_id", DataType::Int64, true), // nullable key
2333            Field::new("amount", DataType::Float64, false),
2334        ]))
2335    }
2336
2337    fn nullable_output_schema(join_type: LookupJoinType) -> SchemaRef {
2338        let lookup_nullable = join_type == LookupJoinType::LeftOuter;
2339        Arc::new(Schema::new(vec![
2340            Field::new("order_id", DataType::Int64, false),
2341            Field::new("customer_id", DataType::Int64, true),
2342            Field::new("amount", DataType::Float64, false),
2343            Field::new("id", DataType::Int64, lookup_nullable),
2344            Field::new("name", DataType::Utf8, true),
2345        ]))
2346    }
2347
2348    #[tokio::test]
2349    async fn null_key_inner_join_no_match() {
2350        // Stream: customer_id = [1, NULL, 2]
2351        let stream_batch = RecordBatch::try_new(
2352            nullable_orders_schema(),
2353            vec![
2354                Arc::new(Int64Array::from(vec![100, 101, 102])),
2355                Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2356                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2357            ],
2358        )
2359        .unwrap();
2360
2361        let input = batch_exec(stream_batch);
2362        let exec = LookupJoinExec::try_new(
2363            input,
2364            customers_batch(),
2365            vec![1],
2366            vec![0],
2367            LookupJoinType::Inner,
2368            nullable_output_schema(LookupJoinType::Inner),
2369        )
2370        .unwrap();
2371
2372        let ctx = Arc::new(TaskContext::default());
2373        let stream = exec.execute(0, ctx).unwrap();
2374        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2375
2376        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2377        // Only customer_id=1 and customer_id=2 match; NULL is skipped
2378        assert_eq!(total, 2, "NULL key row should not match in inner join");
2379    }
2380
2381    #[tokio::test]
2382    async fn null_key_left_outer_produces_nulls() {
2383        // Stream: customer_id = [1, NULL, 2]
2384        let stream_batch = RecordBatch::try_new(
2385            nullable_orders_schema(),
2386            vec![
2387                Arc::new(Int64Array::from(vec![100, 101, 102])),
2388                Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2389                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2390            ],
2391        )
2392        .unwrap();
2393
2394        let input = batch_exec(stream_batch);
2395        let out_schema = nullable_output_schema(LookupJoinType::LeftOuter);
2396        let exec = LookupJoinExec::try_new(
2397            input,
2398            customers_batch(),
2399            vec![1],
2400            vec![0],
2401            LookupJoinType::LeftOuter,
2402            out_schema,
2403        )
2404        .unwrap();
2405
2406        let ctx = Arc::new(TaskContext::default());
2407        let stream = exec.execute(0, ctx).unwrap();
2408        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2409
2410        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2411        // All 3 rows preserved; NULL key row has null lookup columns
2412        assert_eq!(total, 3, "all rows preserved in left outer");
2413
2414        let names = batches[0]
2415            .column(4)
2416            .as_any()
2417            .downcast_ref::<StringArray>()
2418            .unwrap();
2419        assert_eq!(names.value(0), "Alice");
2420        assert!(
2421            names.is_null(1),
2422            "NULL key row should have null lookup name"
2423        );
2424        assert_eq!(names.value(2), "Bob");
2425    }
2426
2427    // ── Versioned Lookup Join Tests ────────────────────────────────
2428
2429    fn versioned_table_batch() -> RecordBatch {
2430        // Table with key=currency, version_ts=valid_from, rate=value
2431        // Two currencies with multiple versions each
2432        let schema = Arc::new(Schema::new(vec![
2433            Field::new("currency", DataType::Utf8, false),
2434            Field::new("valid_from", DataType::Int64, false),
2435            Field::new("rate", DataType::Float64, false),
2436        ]));
2437        RecordBatch::try_new(
2438            schema,
2439            vec![
2440                Arc::new(StringArray::from(vec!["USD", "USD", "EUR", "EUR", "EUR"])),
2441                Arc::new(Int64Array::from(vec![100, 200, 100, 150, 300])),
2442                Arc::new(Float64Array::from(vec![1.0, 1.1, 0.85, 0.90, 0.88])),
2443            ],
2444        )
2445        .unwrap()
2446    }
2447
2448    fn stream_batch_with_time() -> RecordBatch {
2449        let schema = Arc::new(Schema::new(vec![
2450            Field::new("order_id", DataType::Int64, false),
2451            Field::new("currency", DataType::Utf8, false),
2452            Field::new("event_ts", DataType::Int64, false),
2453        ]));
2454        RecordBatch::try_new(
2455            schema,
2456            vec![
2457                Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
2458                Arc::new(StringArray::from(vec!["USD", "EUR", "USD", "EUR"])),
2459                Arc::new(Int64Array::from(vec![150, 160, 250, 50])),
2460            ],
2461        )
2462        .unwrap()
2463    }
2464
2465    #[test]
2466    fn test_versioned_index_build_and_probe() {
2467        let batch = versioned_table_batch();
2468        let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2469
2470        // USD has versions at 100 and 200
2471        // Probe at 150 → should find version 100 (latest <= 150)
2472        let key_sf = vec![SortField::new(DataType::Utf8)];
2473        let converter = RowConverter::new(key_sf).unwrap();
2474        let usd_col = Arc::new(StringArray::from(vec!["USD"]));
2475        let usd_rows = converter.convert_columns(&[usd_col]).unwrap();
2476        let usd_key = usd_rows.row(0);
2477
2478        let result = index.probe_at_time(usd_key.as_ref(), 150);
2479        assert!(result.is_some());
2480        // Row 0 is USD@100, Row 1 is USD@200. At time 150, should get row 0.
2481        assert_eq!(result.unwrap(), 0);
2482
2483        // Probe at 250 → should find version 200 (row 1)
2484        let result = index.probe_at_time(usd_key.as_ref(), 250);
2485        assert_eq!(result.unwrap(), 1);
2486    }
2487
2488    #[test]
2489    fn test_versioned_index_no_version_before_ts() {
2490        let batch = versioned_table_batch();
2491        let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2492
2493        let key_sf = vec![SortField::new(DataType::Utf8)];
2494        let converter = RowConverter::new(key_sf).unwrap();
2495        let eur_col = Arc::new(StringArray::from(vec!["EUR"]));
2496        let eur_rows = converter.convert_columns(&[eur_col]).unwrap();
2497        let eur_key = eur_rows.row(0);
2498
2499        // EUR versions start at 100. Probe at 50 → None
2500        let result = index.probe_at_time(eur_key.as_ref(), 50);
2501        assert!(result.is_none());
2502    }
2503
2504    /// Helper to build a VersionedLookupJoinExec for tests.
2505    fn build_versioned_exec(
2506        table: RecordBatch,
2507        stream: &RecordBatch,
2508        join_type: LookupJoinType,
2509    ) -> VersionedLookupJoinExec {
2510        let input = batch_exec(stream.clone());
2511        let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2512        let key_sort_fields = vec![SortField::new(DataType::Utf8)];
2513        let mut output_fields = stream.schema().fields().to_vec();
2514        output_fields.extend(table.schema().fields().iter().cloned());
2515        let output_schema = Arc::new(Schema::new(output_fields));
2516        VersionedLookupJoinExec::try_new(
2517            input,
2518            table,
2519            index,
2520            vec![1], // stream key col: currency
2521            2,       // stream time col: event_ts
2522            join_type,
2523            output_schema,
2524            key_sort_fields,
2525        )
2526        .unwrap()
2527    }
2528
2529    #[tokio::test]
2530    async fn test_versioned_join_exec_inner() {
2531        let table = versioned_table_batch();
2532        let stream = stream_batch_with_time();
2533        let exec = build_versioned_exec(table, &stream, LookupJoinType::Inner);
2534
2535        let ctx = Arc::new(TaskContext::default());
2536        let stream_out = exec.execute(0, ctx).unwrap();
2537        let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2538
2539        assert_eq!(batches.len(), 1);
2540        let batch = &batches[0];
2541        // Row 1: order_id=1, USD, ts=150 → USD@100 (rate=1.0)
2542        // Row 2: order_id=2, EUR, ts=160 → EUR@150 (rate=0.90)
2543        // Row 3: order_id=3, USD, ts=250 → USD@200 (rate=1.1)
2544        // Row 4: order_id=4, EUR, ts=50 → no EUR version <= 50 → SKIP (inner)
2545        assert_eq!(batch.num_rows(), 3);
2546
2547        let rates = batch
2548            .column(5) // rate is 6th column (3 stream + 3 table, rate is table col 2)
2549            .as_any()
2550            .downcast_ref::<Float64Array>()
2551            .unwrap();
2552        assert!((rates.value(0) - 1.0).abs() < f64::EPSILON); // USD@100
2553        assert!((rates.value(1) - 0.90).abs() < f64::EPSILON); // EUR@150
2554        assert!((rates.value(2) - 1.1).abs() < f64::EPSILON); // USD@200
2555    }
2556
2557    #[tokio::test]
2558    async fn test_versioned_join_exec_left_outer() {
2559        let table = versioned_table_batch();
2560        let stream = stream_batch_with_time();
2561        let exec = build_versioned_exec(table, &stream, LookupJoinType::LeftOuter);
2562
2563        let ctx = Arc::new(TaskContext::default());
2564        let stream_out = exec.execute(0, ctx).unwrap();
2565        let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2566
2567        assert_eq!(batches.len(), 1);
2568        let batch = &batches[0];
2569        // All 4 rows present (left outer)
2570        assert_eq!(batch.num_rows(), 4);
2571
2572        // Row 4 (EUR@50): no version → null rate
2573        let rates = batch
2574            .column(5)
2575            .as_any()
2576            .downcast_ref::<Float64Array>()
2577            .unwrap();
2578        assert!(rates.is_null(3), "EUR@50 should have null rate");
2579    }
2580
2581    #[test]
2582    fn test_versioned_index_empty_batch() {
2583        let schema = Arc::new(Schema::new(vec![
2584            Field::new("k", DataType::Utf8, false),
2585            Field::new("v", DataType::Int64, false),
2586        ]));
2587        let batch = RecordBatch::new_empty(schema);
2588        let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2589        assert!(index.map.is_empty());
2590    }
2591
2592    #[test]
2593    fn test_versioned_lookup_registry() {
2594        let registry = LookupTableRegistry::new();
2595        let table = versioned_table_batch();
2596        let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2597
2598        registry.register_versioned(
2599            "rates",
2600            VersionedLookupState {
2601                batch: table,
2602                index,
2603                key_columns: vec!["currency".to_string()],
2604                version_column: "valid_from".to_string(),
2605                stream_time_column: "event_ts".to_string(),
2606                max_versions_per_key: usize::MAX,
2607            },
2608        );
2609
2610        let entry = registry.get_entry("rates");
2611        assert!(entry.is_some());
2612        assert!(matches!(entry.unwrap(), RegisteredLookup::Versioned(_)));
2613
2614        // get() should return None for versioned entries (snapshot-only)
2615        assert!(registry.get("rates").is_none());
2616    }
2617}