Skip to main content

laminar_sql/datafusion/
lookup_join_exec.rs

1//! Physical execution plan for lookup joins.
2//!
3//! Bridges `LookupJoinNode` (logical) to a hash-probe executor that
4//! joins streaming input against a pre-indexed lookup table snapshot.
5//!
6//! ## Data flow
7//!
8//! ```text
9//! Stream input ──► LookupJoinExec ──► Output (stream + lookup columns)
10//!                       │
11//!                  HashIndex probe
12//!                       │
13//!                  LookupSnapshot (pre-indexed RecordBatch)
14//! ```
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Debug, Formatter};
19use std::sync::Arc;
20
21use parking_lot::RwLock;
22
23use std::collections::BTreeMap;
24
25use arrow::compute::take;
26use arrow::row::{RowConverter, SortField};
27use arrow_array::{Array, Int64Array, RecordBatch, UInt32Array};
28use arrow_schema::{Schema, SchemaRef};
29use async_trait::async_trait;
30use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext};
31use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
32use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
35use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
36use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
37use datafusion_common::{DataFusionError, Result};
38use datafusion_expr::Expr;
39use futures::StreamExt;
40use laminar_core::lookup::lookup_cache::LookupMemoryCache;
41use laminar_core::lookup::source::{ColumnId, LookupSourceDyn};
42use tokio::sync::Semaphore;
43
44use super::lookup_join::{LookupJoinNode, LookupJoinType};
45
46/// Deadline for a cache-miss source fetch. The fetch is awaited inline on the
47/// single compute thread, so without a bound a hung source wedges the pipeline.
48const LOOKUP_SOURCE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49
50// ── Registry ─────────────────────────────────────────────────────
51
52/// Thread-safe registry of lookup table entries (snapshot or partial).
53///
54/// The db layer populates this when `CREATE LOOKUP TABLE` executes;
55/// the [`LookupJoinExtensionPlanner`] reads it at physical plan time.
56#[derive(Default)]
57pub struct LookupTableRegistry {
58    tables: RwLock<HashMap<String, RegisteredLookup>>,
59}
60
61/// A registered lookup table entry — snapshot, partial (on-demand), or
62/// versioned (temporal join with version history).
63pub enum RegisteredLookup {
64    /// Full snapshot: all rows pre-loaded in a single batch.
65    Snapshot(Arc<LookupSnapshot>),
66    /// Partial (on-demand): bounded lookup cache with S3-FIFO eviction.
67    Partial(Arc<PartialLookupState>),
68    /// Versioned: all versions of all keys for temporal joins.
69    Versioned(Arc<VersionedLookupState>),
70}
71
72/// Point-in-time snapshot of a lookup table for join execution.
73pub struct LookupSnapshot {
74    /// All rows concatenated into a single batch.
75    pub batch: RecordBatch,
76}
77
78/// State for a versioned (temporal) lookup table.
79///
80/// Holds all versions of all keys in a single `RecordBatch`, plus a
81/// pre-built `VersionedIndex` for efficient point-in-time lookups.
82/// The index is built once at registration time and rebuilt only when
83/// the table is updated via CDC.
84pub struct VersionedLookupState {
85    /// All rows (all versions) concatenated into a single batch.
86    pub batch: RecordBatch,
87    /// Pre-built versioned index (built at registration time, not per-cycle).
88    pub index: Arc<VersionedIndex>,
89    /// Primary key column names for the equi-join.
90    pub key_columns: Vec<String>,
91    /// Column containing the version timestamp in the table.
92    pub version_column: String,
93    /// Stream-side column name for event time (the AS OF column).
94    pub stream_time_column: String,
95    /// Maximum versions to retain per key. `usize::MAX` = unbounded.
96    pub max_versions_per_key: usize,
97}
98
99/// State for a partial (on-demand) lookup table.
100pub struct PartialLookupState {
101    /// Bounded in-memory cache with S3-FIFO eviction.
102    pub lookup_cache: Arc<LookupMemoryCache>,
103    /// Schema of the lookup table.
104    pub schema: SchemaRef,
105    /// Key column names for row encoding.
106    pub key_columns: Vec<String>,
107    /// `SortField` descriptors for key encoding via `RowConverter`.
108    pub key_sort_fields: Vec<SortField>,
109    /// Async source for cache miss fallback (None = cache-only mode).
110    pub source: Option<Arc<dyn LookupSourceDyn>>,
111    /// Limits concurrent source queries to avoid overloading the source.
112    pub fetch_semaphore: Arc<Semaphore>,
113    /// Column indices (into `schema`) to fetch from the source — the union of
114    /// every column any query references, plus the key. Empty = fetch all.
115    /// Per-table (the cache is shared across queries), so it must be a superset.
116    pub projection: Vec<ColumnId>,
117}
118
119impl LookupTableRegistry {
120    /// Creates an empty registry.
121    #[must_use]
122    pub fn new() -> Self {
123        Self::default()
124    }
125
126    /// Registers or replaces a lookup table snapshot.
127    ///
128    /// # Panics
129    ///
130    /// Panics if the internal lock is poisoned.
131    pub fn register(&self, name: &str, snapshot: LookupSnapshot) {
132        self.tables.write().insert(
133            name.to_lowercase(),
134            RegisteredLookup::Snapshot(Arc::new(snapshot)),
135        );
136    }
137
138    /// Registers or replaces a partial (on-demand) lookup table.
139    ///
140    /// # Panics
141    ///
142    /// Panics if the internal lock is poisoned.
143    pub fn register_partial(&self, name: &str, state: PartialLookupState) {
144        self.tables.write().insert(
145            name.to_lowercase(),
146            RegisteredLookup::Partial(Arc::new(state)),
147        );
148    }
149
150    /// Registers or replaces a versioned (temporal) lookup table.
151    ///
152    /// # Panics
153    ///
154    /// Panics if the internal lock is poisoned.
155    pub fn register_versioned(&self, name: &str, state: VersionedLookupState) {
156        self.tables.write().insert(
157            name.to_lowercase(),
158            RegisteredLookup::Versioned(Arc::new(state)),
159        );
160    }
161
162    /// Removes a lookup table from the registry.
163    ///
164    /// # Panics
165    ///
166    /// Panics if the internal lock is poisoned.
167    pub fn unregister(&self, name: &str) {
168        self.tables.write().remove(&name.to_lowercase());
169    }
170
171    /// Returns the current snapshot for a table, if registered as a snapshot.
172    ///
173    /// # Panics
174    ///
175    /// Panics if the internal lock is poisoned.
176    #[must_use]
177    pub fn get(&self, name: &str) -> Option<Arc<LookupSnapshot>> {
178        let tables = self.tables.read();
179        match tables.get(&name.to_lowercase())? {
180            RegisteredLookup::Snapshot(s) => Some(Arc::clone(s)),
181            RegisteredLookup::Partial(_) | RegisteredLookup::Versioned(_) => None,
182        }
183    }
184
185    /// Returns the registered lookup entry (snapshot, partial, or versioned).
186    ///
187    /// # Panics
188    ///
189    /// Panics if the internal lock is poisoned.
190    pub fn get_entry(&self, name: &str) -> Option<RegisteredLookup> {
191        let tables = self.tables.read();
192        tables.get(&name.to_lowercase()).map(|e| match e {
193            RegisteredLookup::Snapshot(s) => RegisteredLookup::Snapshot(Arc::clone(s)),
194            RegisteredLookup::Partial(p) => RegisteredLookup::Partial(Arc::clone(p)),
195            RegisteredLookup::Versioned(v) => RegisteredLookup::Versioned(Arc::clone(v)),
196        })
197    }
198}
199
200// ── Hash Index ───────────────────────────────────────────────────
201
202/// Pre-built hash index mapping encoded key bytes to row indices.
203struct HashIndex {
204    map: HashMap<Box<[u8]>, Vec<u32>>,
205}
206
207impl HashIndex {
208    /// Builds an index over `key_indices` columns in `batch`.
209    ///
210    /// Uses Arrow's `RowConverter` for binary-comparable key encoding
211    /// so any Arrow data type is handled without manual serialization.
212    fn build(batch: &RecordBatch, key_indices: &[usize]) -> Result<Self> {
213        if batch.num_rows() == 0 {
214            return Ok(Self {
215                map: HashMap::new(),
216            });
217        }
218
219        let sort_fields: Vec<SortField> = key_indices
220            .iter()
221            .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
222            .collect();
223        let converter = RowConverter::new(sort_fields)?;
224
225        let key_cols: Vec<_> = key_indices
226            .iter()
227            .map(|&i| batch.column(i).clone())
228            .collect();
229        let rows = converter.convert_columns(&key_cols)?;
230
231        let num_rows = batch.num_rows();
232        let mut map: HashMap<Box<[u8]>, Vec<u32>> = HashMap::with_capacity(num_rows);
233        #[allow(clippy::cast_possible_truncation)] // batch row count fits u32
234        for i in 0..num_rows {
235            map.entry(Box::from(rows.row(i).as_ref()))
236                .or_default()
237                .push(i as u32);
238        }
239
240        Ok(Self { map })
241    }
242
243    fn probe(&self, key: &[u8]) -> Option<&[u32]> {
244        self.map.get(key).map(Vec::as_slice)
245    }
246}
247
248// ── Versioned Index ──────────────────────────────────────────────
249
250/// Pre-built versioned index mapping encoded key bytes to a BTreeMap
251/// of version timestamps to row indices. Supports point-in-time lookups
252/// via `probe_at_time` for temporal joins.
253#[derive(Default)]
254pub struct VersionedIndex {
255    map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>>,
256}
257
258impl VersionedIndex {
259    /// Builds a versioned index over `key_indices` and `version_col_idx`
260    /// columns in `batch`.
261    ///
262    /// Uses Arrow's `RowConverter` for binary-comparable key encoding.
263    /// Null keys and null version timestamps are skipped.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if key encoding or timestamp extraction fails.
268    pub fn build(
269        batch: &RecordBatch,
270        key_indices: &[usize],
271        version_col_idx: usize,
272        max_versions_per_key: usize,
273    ) -> Result<Self> {
274        if batch.num_rows() == 0 {
275            return Ok(Self {
276                map: HashMap::new(),
277            });
278        }
279
280        let sort_fields: Vec<SortField> = key_indices
281            .iter()
282            .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
283            .collect();
284        let converter = RowConverter::new(sort_fields)?;
285
286        let key_cols: Vec<_> = key_indices
287            .iter()
288            .map(|&i| batch.column(i).clone())
289            .collect();
290        let rows = converter.convert_columns(&key_cols)?;
291
292        let timestamps = extract_i64_timestamps(batch.column(version_col_idx))?;
293
294        let num_rows = batch.num_rows();
295        let mut map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>> = HashMap::with_capacity(num_rows);
296        #[allow(clippy::cast_possible_truncation)]
297        for i in 0..timestamps.len() {
298            if timestamps.is_null(i) || key_cols.iter().any(|c| c.is_null(i)) {
299                continue;
300            }
301            let key = Box::from(rows.row(i).as_ref());
302            map.entry(key)
303                .or_default()
304                .entry(timestamps.value(i))
305                .or_default()
306                .push(i as u32);
307        }
308
309        // GC: keep only the N most recent versions per key.
310        if max_versions_per_key < usize::MAX {
311            for versions in map.values_mut() {
312                while versions.len() > max_versions_per_key {
313                    versions.pop_first();
314                }
315            }
316        }
317
318        Ok(Self { map })
319    }
320
321    /// Finds the row index for the latest version `<= event_ts` for the
322    /// given key. Returns the last row index at that version.
323    fn probe_at_time(&self, key: &[u8], event_ts: i64) -> Option<u32> {
324        let versions = self.map.get(key)?;
325        let (_, indices) = versions.range(..=event_ts).next_back()?;
326        indices.last().copied()
327    }
328}
329
330/// Normalises a timestamp-like column to milliseconds as `Int64Array`.
331///
332/// Supports `Int64` (interpreted as milliseconds), all `Timestamp` variants,
333/// and `Float64` (saturating truncation, interpreted as milliseconds). The
334/// returned array's null buffer mirrors the source.
335fn extract_i64_timestamps(col: &dyn arrow_array::Array) -> Result<Int64Array> {
336    use arrow::compute::cast;
337    use arrow_array::types::Int64Type;
338    use arrow_array::{ArrayRef, Float64Array};
339    use arrow_schema::{DataType, TimeUnit};
340
341    let int64_arr: ArrayRef = match col.data_type() {
342        DataType::Int64 => col.slice(0, col.len()),
343        DataType::Timestamp(TimeUnit::Millisecond, _) => cast(col, &DataType::Int64)?,
344        DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Second | TimeUnit::Nanosecond, _) => {
345            let ms = cast(col, &DataType::Timestamp(TimeUnit::Millisecond, None))?;
346            cast(ms.as_ref(), &DataType::Int64)?
347        }
348        DataType::Float64 => {
349            let arr = col
350                .as_any()
351                .downcast_ref::<Float64Array>()
352                .ok_or_else(|| DataFusionError::Internal("expected Float64Array".into()))?;
353            // A non-finite/out-of-range float join key would saturate
354            // (NaN -> 0, huge -> i64::MIN/MAX) and silently mis-join.
355            return arr.try_unary::<_, Int64Type, DataFusionError>(|v| {
356                let r = v.round();
357                if !(-9_223_372_036_854_775_808.0..9_223_372_036_854_775_808.0).contains(&r) {
358                    return Err(DataFusionError::Plan(format!(
359                        "Float64 temporal-join key {v} out of i64 range"
360                    )));
361                }
362                #[allow(clippy::cast_possible_truncation)] // range-checked; integral
363                let ms = r as i64;
364                Ok(ms)
365            });
366        }
367        other => {
368            return Err(DataFusionError::Plan(format!(
369                "unsupported timestamp type for temporal join: {other:?}"
370            )));
371        }
372    };
373
374    int64_arr
375        .as_any()
376        .downcast_ref::<Int64Array>()
377        .cloned()
378        .ok_or_else(|| DataFusionError::Internal("cast produced non-Int64 array".into()))
379}
380
381// ── Physical Execution Plan ──────────────────────────────────────
382
383/// Physical plan that hash-probes a pre-indexed lookup table for
384/// each batch from the streaming input.
385pub struct LookupJoinExec {
386    input: Arc<dyn ExecutionPlan>,
387    index: Arc<HashIndex>,
388    lookup_batch: Arc<RecordBatch>,
389    stream_key_indices: Vec<usize>,
390    join_type: LookupJoinType,
391    schema: SchemaRef,
392    properties: PlanProperties,
393    /// Prebuilt `RowConverter` for encoding probe keys. Shared across
394    /// every `execute()` call so we don't rebuild per-type encoders on
395    /// every cycle of a cached physical plan.
396    converter: Arc<RowConverter>,
397    stream_field_count: usize,
398    projection: Vec<usize>,
399}
400
401impl LookupJoinExec {
402    /// Creates a new lookup join executor.
403    ///
404    /// `stream_key_indices` and `lookup_key_indices` must be the same
405    /// length and correspond pairwise (stream key 0 matches lookup key 0).
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the hash index cannot be built (e.g., unsupported key type).
410    #[allow(clippy::needless_pass_by_value)] // lookup_batch is moved into Arc
411    pub fn try_new(
412        input: Arc<dyn ExecutionPlan>,
413        lookup_batch: RecordBatch,
414        stream_key_indices: Vec<usize>,
415        lookup_key_indices: Vec<usize>,
416        join_type: LookupJoinType,
417        output_schema: SchemaRef,
418    ) -> Result<Self> {
419        let index = HashIndex::build(&lookup_batch, &lookup_key_indices)?;
420
421        let key_sort_fields: Vec<SortField> = lookup_key_indices
422            .iter()
423            .map(|&i| SortField::new(lookup_batch.schema().field(i).data_type().clone()))
424            .collect();
425        let converter = Arc::new(RowConverter::new(key_sort_fields)?);
426
427        // Left outer joins produce NULLs for non-matching lookup rows,
428        // so force all lookup columns nullable in the output schema.
429        let output_schema = if join_type == LookupJoinType::LeftOuter {
430            let stream_count = input.schema().fields().len();
431            let mut fields = output_schema.fields().to_vec();
432            for f in &mut fields[stream_count..] {
433                if !f.is_nullable() {
434                    *f = Arc::new(f.as_ref().clone().with_nullable(true));
435                }
436            }
437            Arc::new(Schema::new_with_metadata(
438                fields,
439                output_schema.metadata().clone(),
440            ))
441        } else {
442            output_schema
443        };
444
445        let properties = PlanProperties::new(
446            EquivalenceProperties::new(Arc::clone(&output_schema)),
447            Partitioning::UnknownPartitioning(1),
448            EmissionType::Incremental,
449            Boundedness::Unbounded {
450                requires_infinite_memory: false,
451            },
452        );
453
454        let stream_field_count = input.schema().fields().len();
455        let projection = (0..(stream_field_count + lookup_batch.num_columns())).collect();
456
457        Ok(Self {
458            input,
459            index: Arc::new(index),
460            lookup_batch: Arc::new(lookup_batch),
461            stream_key_indices,
462            join_type,
463            schema: output_schema,
464            properties,
465            converter,
466            stream_field_count,
467            projection,
468        })
469    }
470
471    /// Sets the projection list for output columns.
472    #[must_use]
473    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
474        self.projection = projection;
475        self
476    }
477}
478
479impl Debug for LookupJoinExec {
480    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
481        f.debug_struct("LookupJoinExec")
482            .field("join_type", &self.join_type)
483            .field("stream_keys", &self.stream_key_indices)
484            .field("lookup_rows", &self.lookup_batch.num_rows())
485            .finish_non_exhaustive()
486    }
487}
488
489impl DisplayAs for LookupJoinExec {
490    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
491        match t {
492            DisplayFormatType::Default | DisplayFormatType::Verbose => {
493                write!(
494                    f,
495                    "LookupJoinExec: type={}, stream_keys={:?}, lookup_rows={}",
496                    self.join_type,
497                    self.stream_key_indices,
498                    self.lookup_batch.num_rows(),
499                )
500            }
501            DisplayFormatType::TreeRender => write!(f, "LookupJoinExec"),
502        }
503    }
504}
505
506impl ExecutionPlan for LookupJoinExec {
507    fn name(&self) -> &'static str {
508        "LookupJoinExec"
509    }
510
511    fn as_any(&self) -> &dyn Any {
512        self
513    }
514
515    fn schema(&self) -> SchemaRef {
516        Arc::clone(&self.schema)
517    }
518
519    fn properties(&self) -> &PlanProperties {
520        &self.properties
521    }
522
523    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
524        vec![&self.input]
525    }
526
527    fn with_new_children(
528        self: Arc<Self>,
529        mut children: Vec<Arc<dyn ExecutionPlan>>,
530    ) -> Result<Arc<dyn ExecutionPlan>> {
531        if children.len() != 1 {
532            return Err(DataFusionError::Plan(
533                "LookupJoinExec requires exactly one child".into(),
534            ));
535        }
536        Ok(Arc::new(Self {
537            input: children.swap_remove(0),
538            index: Arc::clone(&self.index),
539            lookup_batch: Arc::clone(&self.lookup_batch),
540            stream_key_indices: self.stream_key_indices.clone(),
541            join_type: self.join_type,
542            schema: Arc::clone(&self.schema),
543            properties: self.properties.clone(),
544            converter: Arc::clone(&self.converter),
545            stream_field_count: self.stream_field_count,
546            projection: self.projection.clone(),
547        }))
548    }
549
550    fn execute(
551        &self,
552        partition: usize,
553        context: Arc<TaskContext>,
554    ) -> Result<SendableRecordBatchStream> {
555        let input_stream = self.input.execute(partition, context)?;
556        let converter = Arc::clone(&self.converter);
557        let index = Arc::clone(&self.index);
558        let lookup_batch = Arc::clone(&self.lookup_batch);
559        let stream_key_indices = self.stream_key_indices.clone();
560        let join_type = self.join_type;
561        let schema = self.schema();
562        let stream_field_count = self.stream_field_count;
563        let projection = self.projection.clone();
564
565        let output = input_stream.map(move |result| {
566            let batch = result?;
567            if batch.num_rows() == 0 {
568                return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
569            }
570            probe_batch(
571                &batch,
572                &converter,
573                &index,
574                &lookup_batch,
575                &stream_key_indices,
576                join_type,
577                &schema,
578                stream_field_count,
579                &projection,
580            )
581        });
582
583        Ok(Box::pin(RecordBatchStreamAdapter::new(
584            self.schema(),
585            output,
586        )))
587    }
588}
589
590impl datafusion::physical_plan::ExecutionPlanProperties for LookupJoinExec {
591    fn output_partitioning(&self) -> &Partitioning {
592        self.properties.output_partitioning()
593    }
594
595    fn output_ordering(&self) -> Option<&LexOrdering> {
596        self.properties.output_ordering()
597    }
598
599    fn boundedness(&self) -> Boundedness {
600        Boundedness::Unbounded {
601            requires_infinite_memory: false,
602        }
603    }
604
605    fn pipeline_behavior(&self) -> EmissionType {
606        EmissionType::Incremental
607    }
608
609    fn equivalence_properties(&self) -> &EquivalenceProperties {
610        self.properties.equivalence_properties()
611    }
612}
613
614// ── Probe Logic ──────────────────────────────────────────────────
615
616/// Probes the hash index for each row in `stream_batch` and builds
617/// the joined output batch.
618#[allow(clippy::too_many_arguments)]
619fn probe_batch(
620    stream_batch: &RecordBatch,
621    converter: &RowConverter,
622    index: &HashIndex,
623    lookup_batch: &RecordBatch,
624    stream_key_indices: &[usize],
625    join_type: LookupJoinType,
626    output_schema: &SchemaRef,
627    stream_field_count: usize,
628    projection: &[usize],
629) -> Result<RecordBatch> {
630    let key_cols: Vec<_> = stream_key_indices
631        .iter()
632        .map(|&i| stream_batch.column(i).clone())
633        .collect();
634    let rows = converter.convert_columns(&key_cols)?;
635
636    let num_rows = stream_batch.num_rows();
637    let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
638    let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
639
640    #[allow(clippy::cast_possible_truncation)] // batch row count fits u32
641    for row in 0..num_rows {
642        // SQL semantics: NULL != NULL, so rows with any null key never match.
643        if key_cols.iter().any(|c| c.is_null(row)) {
644            if join_type == LookupJoinType::LeftOuter {
645                stream_indices.push(row as u32);
646                lookup_indices.push(None);
647            }
648            continue;
649        }
650
651        let key = rows.row(row);
652        match index.probe(key.as_ref()) {
653            Some(matches) => {
654                for &lookup_row in matches {
655                    stream_indices.push(row as u32);
656                    lookup_indices.push(Some(lookup_row));
657                }
658            }
659            None if join_type == LookupJoinType::LeftOuter => {
660                stream_indices.push(row as u32);
661                lookup_indices.push(None);
662            }
663            None => {}
664        }
665    }
666
667    if stream_indices.is_empty() {
668        return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
669    }
670
671    // Gather stream-side columns
672    let take_stream = UInt32Array::from(stream_indices);
673    let mut columns = Vec::with_capacity(stream_field_count + lookup_batch.num_columns());
674
675    for col in stream_batch.columns() {
676        columns.push(take(col.as_ref(), &take_stream, None)?);
677    }
678
679    // Gather lookup-side columns (None → null in output)
680    let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
681    for col in lookup_batch.columns() {
682        columns.push(take(col.as_ref(), &take_lookup, None)?);
683    }
684
685    debug_assert_eq!(
686        columns.len(),
687        stream_field_count + lookup_batch.num_columns(),
688        "output column count mismatch"
689    );
690
691    let projected_columns: Vec<_> = projection.iter().map(|&idx| columns[idx].clone()).collect();
692
693    debug_assert_eq!(
694        projected_columns.len(),
695        output_schema.fields().len(),
696        "projected column count mismatch"
697    );
698
699    Ok(RecordBatch::try_new(
700        Arc::clone(output_schema),
701        projected_columns,
702    )?)
703}
704
705// ── Versioned Lookup Join Exec ────────────────────────────────────
706
707/// Physical plan that probes a versioned (temporal) index for each
708/// batch from the streaming input. For each stream row, finds the
709/// table row with the latest version timestamp `<= event_ts`.
710pub struct VersionedLookupJoinExec {
711    input: Arc<dyn ExecutionPlan>,
712    index: Arc<VersionedIndex>,
713    table_batch: Arc<RecordBatch>,
714    stream_key_indices: Vec<usize>,
715    stream_time_col_idx: usize,
716    join_type: LookupJoinType,
717    schema: SchemaRef,
718    properties: PlanProperties,
719    /// Prebuilt `RowConverter` — built once at planning time and reused
720    /// across every `execute()` call. Previously rebuilt per cycle.
721    converter: Arc<RowConverter>,
722    stream_field_count: usize,
723}
724
725impl VersionedLookupJoinExec {
726    /// Creates a new versioned lookup join executor.
727    ///
728    /// The `index` should be pre-built via `VersionedIndex::build()` and
729    /// cached in `VersionedLookupState`. The index is only rebuilt when
730    /// the table data changes (CDC update), not per execution cycle.
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if the output schema cannot be constructed.
735    #[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
736    pub fn try_new(
737        input: Arc<dyn ExecutionPlan>,
738        table_batch: RecordBatch,
739        index: Arc<VersionedIndex>,
740        stream_key_indices: Vec<usize>,
741        stream_time_col_idx: usize,
742        join_type: LookupJoinType,
743        output_schema: SchemaRef,
744        key_sort_fields: Vec<SortField>,
745    ) -> Result<Self> {
746        let output_schema = if join_type == LookupJoinType::LeftOuter {
747            let stream_count = input.schema().fields().len();
748            let mut fields = output_schema.fields().to_vec();
749            for f in &mut fields[stream_count..] {
750                if !f.is_nullable() {
751                    *f = Arc::new(f.as_ref().clone().with_nullable(true));
752                }
753            }
754            Arc::new(Schema::new_with_metadata(
755                fields,
756                output_schema.metadata().clone(),
757            ))
758        } else {
759            output_schema
760        };
761
762        let properties = PlanProperties::new(
763            EquivalenceProperties::new(Arc::clone(&output_schema)),
764            Partitioning::UnknownPartitioning(1),
765            EmissionType::Incremental,
766            Boundedness::Unbounded {
767                requires_infinite_memory: false,
768            },
769        );
770
771        let stream_field_count = input.schema().fields().len();
772        let converter = Arc::new(RowConverter::new(key_sort_fields)?);
773
774        Ok(Self {
775            input,
776            index,
777            table_batch: Arc::new(table_batch),
778            stream_key_indices,
779            stream_time_col_idx,
780            join_type,
781            schema: output_schema,
782            properties,
783            converter,
784            stream_field_count,
785        })
786    }
787}
788
789impl Debug for VersionedLookupJoinExec {
790    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
791        f.debug_struct("VersionedLookupJoinExec")
792            .field("join_type", &self.join_type)
793            .field("stream_keys", &self.stream_key_indices)
794            .field("table_rows", &self.table_batch.num_rows())
795            .finish_non_exhaustive()
796    }
797}
798
799impl DisplayAs for VersionedLookupJoinExec {
800    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
801        match t {
802            DisplayFormatType::Default | DisplayFormatType::Verbose => {
803                write!(
804                    f,
805                    "VersionedLookupJoinExec: type={}, stream_keys={:?}, table_rows={}",
806                    self.join_type,
807                    self.stream_key_indices,
808                    self.table_batch.num_rows(),
809                )
810            }
811            DisplayFormatType::TreeRender => write!(f, "VersionedLookupJoinExec"),
812        }
813    }
814}
815
816impl ExecutionPlan for VersionedLookupJoinExec {
817    fn name(&self) -> &'static str {
818        "VersionedLookupJoinExec"
819    }
820
821    fn as_any(&self) -> &dyn Any {
822        self
823    }
824
825    fn schema(&self) -> SchemaRef {
826        Arc::clone(&self.schema)
827    }
828
829    fn properties(&self) -> &PlanProperties {
830        &self.properties
831    }
832
833    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
834        vec![&self.input]
835    }
836
837    fn with_new_children(
838        self: Arc<Self>,
839        mut children: Vec<Arc<dyn ExecutionPlan>>,
840    ) -> Result<Arc<dyn ExecutionPlan>> {
841        if children.len() != 1 {
842            return Err(DataFusionError::Plan(
843                "VersionedLookupJoinExec requires exactly one child".into(),
844            ));
845        }
846        Ok(Arc::new(Self {
847            input: children.swap_remove(0),
848            index: Arc::clone(&self.index),
849            table_batch: Arc::clone(&self.table_batch),
850            stream_key_indices: self.stream_key_indices.clone(),
851            stream_time_col_idx: self.stream_time_col_idx,
852            join_type: self.join_type,
853            schema: Arc::clone(&self.schema),
854            properties: self.properties.clone(),
855            converter: Arc::clone(&self.converter),
856            stream_field_count: self.stream_field_count,
857        }))
858    }
859
860    fn execute(
861        &self,
862        partition: usize,
863        context: Arc<TaskContext>,
864    ) -> Result<SendableRecordBatchStream> {
865        let input_stream = self.input.execute(partition, context)?;
866        let converter = Arc::clone(&self.converter);
867        let index = Arc::clone(&self.index);
868        let table_batch = Arc::clone(&self.table_batch);
869        let stream_key_indices = self.stream_key_indices.clone();
870        let stream_time_col_idx = self.stream_time_col_idx;
871        let join_type = self.join_type;
872        let schema = self.schema();
873        let stream_field_count = self.stream_field_count;
874
875        let output = input_stream.map(move |result| {
876            let batch = result?;
877            if batch.num_rows() == 0 {
878                return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
879            }
880            probe_versioned_batch(
881                &batch,
882                &converter,
883                &index,
884                &table_batch,
885                &stream_key_indices,
886                stream_time_col_idx,
887                join_type,
888                &schema,
889                stream_field_count,
890            )
891        });
892
893        Ok(Box::pin(RecordBatchStreamAdapter::new(
894            self.schema(),
895            output,
896        )))
897    }
898}
899
900impl datafusion::physical_plan::ExecutionPlanProperties for VersionedLookupJoinExec {
901    fn output_partitioning(&self) -> &Partitioning {
902        self.properties.output_partitioning()
903    }
904
905    fn output_ordering(&self) -> Option<&LexOrdering> {
906        self.properties.output_ordering()
907    }
908
909    fn boundedness(&self) -> Boundedness {
910        Boundedness::Unbounded {
911            requires_infinite_memory: false,
912        }
913    }
914
915    fn pipeline_behavior(&self) -> EmissionType {
916        EmissionType::Incremental
917    }
918
919    fn equivalence_properties(&self) -> &EquivalenceProperties {
920        self.properties.equivalence_properties()
921    }
922}
923
924/// Probes the versioned index for each row in `stream_batch`, finding
925/// the table row with the latest version `<= event_ts`.
926#[allow(clippy::too_many_arguments)]
927fn probe_versioned_batch(
928    stream_batch: &RecordBatch,
929    converter: &RowConverter,
930    index: &VersionedIndex,
931    table_batch: &RecordBatch,
932    stream_key_indices: &[usize],
933    stream_time_col_idx: usize,
934    join_type: LookupJoinType,
935    output_schema: &SchemaRef,
936    stream_field_count: usize,
937) -> Result<RecordBatch> {
938    let key_cols: Vec<_> = stream_key_indices
939        .iter()
940        .map(|&i| stream_batch.column(i).clone())
941        .collect();
942    let rows = converter.convert_columns(&key_cols)?;
943    let event_timestamps =
944        extract_i64_timestamps(stream_batch.column(stream_time_col_idx).as_ref())?;
945
946    let num_rows = stream_batch.num_rows();
947    let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
948    let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
949
950    #[allow(clippy::cast_possible_truncation)]
951    for row in 0..event_timestamps.len() {
952        if event_timestamps.is_null(row) || key_cols.iter().any(|c| c.is_null(row)) {
953            if join_type == LookupJoinType::LeftOuter {
954                stream_indices.push(row as u32);
955                lookup_indices.push(None);
956            }
957            continue;
958        }
959
960        let key = rows.row(row);
961        let event_ts = event_timestamps.value(row);
962        match index.probe_at_time(key.as_ref(), event_ts) {
963            Some(table_row_idx) => {
964                stream_indices.push(row as u32);
965                lookup_indices.push(Some(table_row_idx));
966            }
967            None if join_type == LookupJoinType::LeftOuter => {
968                stream_indices.push(row as u32);
969                lookup_indices.push(None);
970            }
971            None => {}
972        }
973    }
974
975    if stream_indices.is_empty() {
976        return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
977    }
978
979    let take_stream = UInt32Array::from(stream_indices);
980    let mut columns = Vec::with_capacity(output_schema.fields().len());
981
982    for col in stream_batch.columns() {
983        columns.push(take(col.as_ref(), &take_stream, None)?);
984    }
985
986    let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
987    for col in table_batch.columns() {
988        columns.push(take(col.as_ref(), &take_lookup, None)?);
989    }
990
991    debug_assert_eq!(
992        columns.len(),
993        stream_field_count + table_batch.num_columns(),
994        "output column count mismatch"
995    );
996
997    Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
998}
999
1000// ── Partial Lookup Join Exec ──────────────────────────────────────
1001
1002/// Physical plan that probes a bounded lookup cache per key for each
1003/// batch from the streaming input. Used for on-demand/partial tables
1004/// where the full dataset does not fit in memory.
1005pub struct PartialLookupJoinExec {
1006    input: Arc<dyn ExecutionPlan>,
1007    lookup_cache: Arc<LookupMemoryCache>,
1008    stream_key_indices: Vec<usize>,
1009    join_type: LookupJoinType,
1010    schema: SchemaRef,
1011    properties: PlanProperties,
1012    /// Prebuilt `RowConverter` — built once at planning time, reused on
1013    /// every `execute()`. Previously rebuilt per-cycle.
1014    converter: Arc<RowConverter>,
1015    stream_field_count: usize,
1016    lookup_schema: SchemaRef,
1017    source: Option<Arc<dyn LookupSourceDyn>>,
1018    fetch_semaphore: Arc<Semaphore>,
1019    projection: Vec<ColumnId>,
1020}
1021
1022impl PartialLookupJoinExec {
1023    /// Creates a new partial lookup join executor.
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns an error if the output schema cannot be constructed.
1028    pub fn try_new(
1029        input: Arc<dyn ExecutionPlan>,
1030        lookup_cache: Arc<LookupMemoryCache>,
1031        stream_key_indices: Vec<usize>,
1032        key_sort_fields: Vec<SortField>,
1033        join_type: LookupJoinType,
1034        lookup_schema: SchemaRef,
1035        output_schema: SchemaRef,
1036    ) -> Result<Self> {
1037        Self::try_new_with_source(
1038            input,
1039            lookup_cache,
1040            stream_key_indices,
1041            key_sort_fields,
1042            join_type,
1043            lookup_schema,
1044            output_schema,
1045            None,
1046            Arc::new(Semaphore::new(64)),
1047            vec![],
1048        )
1049    }
1050
1051    /// Creates a new partial lookup join executor with optional source fallback.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns an error if the output schema cannot be constructed.
1056    #[allow(clippy::too_many_arguments)]
1057    pub fn try_new_with_source(
1058        input: Arc<dyn ExecutionPlan>,
1059        lookup_cache: Arc<LookupMemoryCache>,
1060        stream_key_indices: Vec<usize>,
1061        key_sort_fields: Vec<SortField>,
1062        join_type: LookupJoinType,
1063        lookup_schema: SchemaRef,
1064        output_schema: SchemaRef,
1065        source: Option<Arc<dyn LookupSourceDyn>>,
1066        fetch_semaphore: Arc<Semaphore>,
1067        projection: Vec<ColumnId>,
1068    ) -> Result<Self> {
1069        let output_schema = if join_type == LookupJoinType::LeftOuter {
1070            let stream_count = input.schema().fields().len();
1071            let mut fields = output_schema.fields().to_vec();
1072            for f in &mut fields[stream_count..] {
1073                if !f.is_nullable() {
1074                    *f = Arc::new(f.as_ref().clone().with_nullable(true));
1075                }
1076            }
1077            Arc::new(Schema::new_with_metadata(
1078                fields,
1079                output_schema.metadata().clone(),
1080            ))
1081        } else {
1082            output_schema
1083        };
1084
1085        let properties = PlanProperties::new(
1086            EquivalenceProperties::new(Arc::clone(&output_schema)),
1087            Partitioning::UnknownPartitioning(1),
1088            EmissionType::Incremental,
1089            Boundedness::Unbounded {
1090                requires_infinite_memory: false,
1091            },
1092        );
1093
1094        let stream_field_count = input.schema().fields().len();
1095        let converter = Arc::new(RowConverter::new(key_sort_fields)?);
1096
1097        Ok(Self {
1098            input,
1099            lookup_cache,
1100            stream_key_indices,
1101            join_type,
1102            schema: output_schema,
1103            properties,
1104            converter,
1105            stream_field_count,
1106            lookup_schema,
1107            source,
1108            fetch_semaphore,
1109            projection,
1110        })
1111    }
1112}
1113
1114impl Debug for PartialLookupJoinExec {
1115    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1116        f.debug_struct("PartialLookupJoinExec")
1117            .field("join_type", &self.join_type)
1118            .field("stream_keys", &self.stream_key_indices)
1119            .field("cache_table_id", &self.lookup_cache.table_id())
1120            .finish_non_exhaustive()
1121    }
1122}
1123
1124impl DisplayAs for PartialLookupJoinExec {
1125    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1126        match t {
1127            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1128                write!(
1129                    f,
1130                    "PartialLookupJoinExec: type={}, stream_keys={:?}, cache_entries={}",
1131                    self.join_type,
1132                    self.stream_key_indices,
1133                    self.lookup_cache.len(),
1134                )
1135            }
1136            DisplayFormatType::TreeRender => write!(f, "PartialLookupJoinExec"),
1137        }
1138    }
1139}
1140
1141impl ExecutionPlan for PartialLookupJoinExec {
1142    fn name(&self) -> &'static str {
1143        "PartialLookupJoinExec"
1144    }
1145
1146    fn as_any(&self) -> &dyn Any {
1147        self
1148    }
1149
1150    fn schema(&self) -> SchemaRef {
1151        Arc::clone(&self.schema)
1152    }
1153
1154    fn properties(&self) -> &PlanProperties {
1155        &self.properties
1156    }
1157
1158    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1159        vec![&self.input]
1160    }
1161
1162    fn with_new_children(
1163        self: Arc<Self>,
1164        mut children: Vec<Arc<dyn ExecutionPlan>>,
1165    ) -> Result<Arc<dyn ExecutionPlan>> {
1166        if children.len() != 1 {
1167            return Err(DataFusionError::Plan(
1168                "PartialLookupJoinExec requires exactly one child".into(),
1169            ));
1170        }
1171        Ok(Arc::new(Self {
1172            input: children.swap_remove(0),
1173            lookup_cache: Arc::clone(&self.lookup_cache),
1174            stream_key_indices: self.stream_key_indices.clone(),
1175            join_type: self.join_type,
1176            schema: Arc::clone(&self.schema),
1177            properties: self.properties.clone(),
1178            converter: Arc::clone(&self.converter),
1179            stream_field_count: self.stream_field_count,
1180            lookup_schema: Arc::clone(&self.lookup_schema),
1181            source: self.source.clone(),
1182            fetch_semaphore: Arc::clone(&self.fetch_semaphore),
1183            projection: self.projection.clone(),
1184        }))
1185    }
1186
1187    fn execute(
1188        &self,
1189        partition: usize,
1190        context: Arc<TaskContext>,
1191    ) -> Result<SendableRecordBatchStream> {
1192        let input_stream = self.input.execute(partition, context)?;
1193        let converter = Arc::clone(&self.converter);
1194        let lookup_cache = Arc::clone(&self.lookup_cache);
1195        let stream_key_indices = self.stream_key_indices.clone();
1196        let join_type = self.join_type;
1197        let schema = self.schema();
1198        let stream_field_count = self.stream_field_count;
1199        let lookup_schema = Arc::clone(&self.lookup_schema);
1200        let source = self.source.clone();
1201        let fetch_semaphore = Arc::clone(&self.fetch_semaphore);
1202        let projection = self.projection.clone();
1203
1204        let output = input_stream.then(move |result| {
1205            let lookup_cache = Arc::clone(&lookup_cache);
1206            let converter = Arc::clone(&converter);
1207            let stream_key_indices = stream_key_indices.clone();
1208            let schema = Arc::clone(&schema);
1209            let lookup_schema = Arc::clone(&lookup_schema);
1210            let source = source.clone();
1211            let fetch_semaphore = Arc::clone(&fetch_semaphore);
1212            let projection = projection.clone();
1213            async move {
1214                let batch = result?;
1215                if batch.num_rows() == 0 {
1216                    return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
1217                }
1218                probe_partial_batch_with_fallback(
1219                    &batch,
1220                    &converter,
1221                    &lookup_cache,
1222                    &stream_key_indices,
1223                    join_type,
1224                    &schema,
1225                    stream_field_count,
1226                    &lookup_schema,
1227                    source.as_deref(),
1228                    &fetch_semaphore,
1229                    &projection,
1230                )
1231                .await
1232            }
1233        });
1234
1235        Ok(Box::pin(RecordBatchStreamAdapter::new(
1236            self.schema(),
1237            output,
1238        )))
1239    }
1240}
1241
1242impl datafusion::physical_plan::ExecutionPlanProperties for PartialLookupJoinExec {
1243    fn output_partitioning(&self) -> &Partitioning {
1244        self.properties.output_partitioning()
1245    }
1246
1247    fn output_ordering(&self) -> Option<&LexOrdering> {
1248        self.properties.output_ordering()
1249    }
1250
1251    fn boundedness(&self) -> Boundedness {
1252        Boundedness::Unbounded {
1253            requires_infinite_memory: false,
1254        }
1255    }
1256
1257    fn pipeline_behavior(&self) -> EmissionType {
1258        EmissionType::Incremental
1259    }
1260
1261    fn equivalence_properties(&self) -> &EquivalenceProperties {
1262        self.properties.equivalence_properties()
1263    }
1264}
1265
1266/// Probes the lookup cache for each row in `stream_batch`, falling back
1267/// to the async source for cache misses. Inserts source results into
1268/// the cache before building the output.
1269#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1270async fn probe_partial_batch_with_fallback(
1271    stream_batch: &RecordBatch,
1272    converter: &RowConverter,
1273    lookup_cache: &LookupMemoryCache,
1274    stream_key_indices: &[usize],
1275    join_type: LookupJoinType,
1276    output_schema: &SchemaRef,
1277    stream_field_count: usize,
1278    lookup_schema: &SchemaRef,
1279    source: Option<&dyn LookupSourceDyn>,
1280    fetch_semaphore: &Semaphore,
1281    projection: &[ColumnId],
1282) -> Result<RecordBatch> {
1283    let key_cols: Vec<_> = stream_key_indices
1284        .iter()
1285        .map(|&i| stream_batch.column(i).clone())
1286        .collect();
1287    let rows = converter.convert_columns(&key_cols)?;
1288
1289    let num_rows = stream_batch.num_rows();
1290    let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
1291    let mut lookup_batches: Vec<Option<RecordBatch>> = Vec::with_capacity(num_rows);
1292    let mut miss_keys: Vec<(usize, Vec<u8>)> = Vec::new();
1293
1294    #[allow(clippy::cast_possible_truncation)]
1295    for row in 0..num_rows {
1296        // SQL semantics: NULL != NULL, so rows with any null key never match.
1297        if key_cols.iter().any(|c| c.is_null(row)) {
1298            if join_type == LookupJoinType::LeftOuter {
1299                stream_indices.push(row as u32);
1300                lookup_batches.push(None);
1301            }
1302            continue;
1303        }
1304
1305        let key = rows.row(row);
1306        let result = lookup_cache.get_cached(key.as_ref());
1307        if let Some(batch) = result.into_batch() {
1308            stream_indices.push(row as u32);
1309            lookup_batches.push(Some(batch));
1310        } else {
1311            let idx = stream_indices.len();
1312            stream_indices.push(row as u32);
1313            lookup_batches.push(None);
1314            miss_keys.push((idx, key.as_ref().to_vec()));
1315        }
1316    }
1317
1318    // Fetch missed keys from the source in a single batch query
1319    if let Some(source) = source {
1320        if !miss_keys.is_empty() {
1321            let _permit = fetch_semaphore
1322                .acquire()
1323                .await
1324                .map_err(|_| DataFusionError::Internal("fetch semaphore closed".into()))?;
1325
1326            let key_refs: Vec<&[u8]> = miss_keys.iter().map(|(_, k)| k.as_slice()).collect();
1327
1328            // Propagate fetch failures instead of silently serving cache-only
1329            // results: returning Err replays the cycle rather than dropping
1330            // inner-join matches or NULL-filling left-join rows.
1331            let results = match tokio::time::timeout(
1332                LOOKUP_SOURCE_TIMEOUT,
1333                source.query_batch(&key_refs, &[], projection),
1334            )
1335            .await
1336            {
1337                Ok(Ok(results)) => results,
1338                Ok(Err(e)) => {
1339                    return Err(DataFusionError::Execution(format!(
1340                        "lookup source query failed ({} keys): {e}",
1341                        miss_keys.len()
1342                    )));
1343                }
1344                Err(_elapsed) => {
1345                    return Err(DataFusionError::Execution(format!(
1346                        "lookup source query timed out after {LOOKUP_SOURCE_TIMEOUT:?} \
1347                         ({} keys)",
1348                        miss_keys.len()
1349                    )));
1350                }
1351            };
1352
1353            if results.len() != miss_keys.len() {
1354                return Err(DataFusionError::Execution(format!(
1355                    "Lookup source returned mismatched results cardinality. Expected {} results, but got {} results. Miss keys: {:?}",
1356                    miss_keys.len(),
1357                    results.len(),
1358                    miss_keys
1359                )));
1360            }
1361
1362            for ((idx, key_bytes), maybe_batch) in miss_keys.iter().zip(results) {
1363                if let Some(batch) = maybe_batch {
1364                    lookup_cache.insert(key_bytes, batch.clone());
1365                    lookup_batches[*idx] = Some(batch);
1366                }
1367            }
1368        }
1369    }
1370
1371    // For inner joins, remove rows that still have no match
1372    if join_type == LookupJoinType::Inner {
1373        let mut write = 0;
1374        for read in 0..stream_indices.len() {
1375            if lookup_batches[read].is_some() {
1376                stream_indices[write] = stream_indices[read];
1377                lookup_batches.swap(write, read);
1378                write += 1;
1379            }
1380        }
1381        stream_indices.truncate(write);
1382        lookup_batches.truncate(write);
1383    }
1384
1385    if stream_indices.is_empty() {
1386        return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
1387    }
1388
1389    let take_indices = UInt32Array::from(stream_indices);
1390    let mut columns = Vec::with_capacity(output_schema.fields().len());
1391
1392    for col in stream_batch.columns() {
1393        columns.push(take(col.as_ref(), &take_indices, None)?);
1394    }
1395
1396    let lookup_col_count = lookup_schema.fields().len();
1397    for col_idx in 0..lookup_col_count {
1398        let arrays: Vec<_> = lookup_batches
1399            .iter()
1400            .map(|opt| match opt {
1401                Some(b) => b.column(col_idx).clone(),
1402                None => arrow_array::new_null_array(lookup_schema.field(col_idx).data_type(), 1),
1403            })
1404            .collect();
1405        let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(AsRef::as_ref).collect();
1406        columns.push(arrow::compute::concat(&refs)?);
1407    }
1408
1409    debug_assert_eq!(
1410        columns.len(),
1411        stream_field_count + lookup_col_count,
1412        "output column count mismatch"
1413    );
1414
1415    Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
1416}
1417
1418fn resolve_physical_projection(
1419    logical_schema: &datafusion::common::DFSchema,
1420    stream_schema: &arrow::datatypes::Schema,
1421    lookup_schema: &arrow::datatypes::Schema,
1422    lookup_table: &str,
1423    lookup_alias: Option<&str>,
1424) -> Result<Vec<usize>> {
1425    let mut projection = Vec::with_capacity(logical_schema.fields().len());
1426    for idx in 0..logical_schema.fields().len() {
1427        let (qualifier, field) = logical_schema.qualified_field(idx);
1428        let name = field.name();
1429        let is_lookup = if let Some(relation) = qualifier {
1430            let table = relation.table();
1431            table == lookup_table || Some(table) == lookup_alias
1432        } else {
1433            // No qualifier: fallback to name-based detection.
1434            if stream_schema.index_of(name).is_ok() && lookup_schema.index_of(name).is_err() {
1435                false
1436            } else if lookup_schema.index_of(name).is_ok() && stream_schema.index_of(name).is_err()
1437            {
1438                true
1439            } else {
1440                stream_schema.index_of(name).is_ok()
1441            }
1442        };
1443
1444        if is_lookup {
1445            let lookup_idx = lookup_schema.index_of(name).map_err(|_| {
1446                DataFusionError::Plan(format!(
1447                    "lookup join projection: output field '{name}' not found in lookup \
1448                     schema {lookup_schema:?}"
1449                ))
1450            })?;
1451            projection.push(stream_schema.fields().len() + lookup_idx);
1452        } else {
1453            let stream_idx = stream_schema.index_of(name).map_err(|_| {
1454                DataFusionError::Plan(format!(
1455                    "lookup join projection: output field '{name}' not found in stream \
1456                     schema {stream_schema:?}"
1457                ))
1458            })?;
1459            projection.push(stream_idx);
1460        }
1461    }
1462    Ok(projection)
1463}
1464
1465// ── Extension Planner ────────────────────────────────────────────
1466
1467/// Converts `LookupJoinNode` logical plans to [`LookupJoinExec`]
1468/// or [`PartialLookupJoinExec`] physical plans by resolving table
1469/// data from the registry.
1470pub struct LookupJoinExtensionPlanner {
1471    registry: Arc<LookupTableRegistry>,
1472}
1473
1474impl LookupJoinExtensionPlanner {
1475    /// Creates a planner backed by the given registry.
1476    pub fn new(registry: Arc<LookupTableRegistry>) -> Self {
1477        Self { registry }
1478    }
1479}
1480
1481#[async_trait]
1482impl ExtensionPlanner for LookupJoinExtensionPlanner {
1483    #[allow(clippy::too_many_lines)]
1484    async fn plan_extension(
1485        &self,
1486        _planner: &dyn PhysicalPlanner,
1487        node: &dyn UserDefinedLogicalNode,
1488        _logical_inputs: &[&LogicalPlan],
1489        physical_inputs: &[Arc<dyn ExecutionPlan>],
1490        session_state: &SessionState,
1491    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1492        let Some(lookup_node) = node.as_any().downcast_ref::<LookupJoinNode>() else {
1493            return Ok(None);
1494        };
1495
1496        let entry = self
1497            .registry
1498            .get_entry(lookup_node.lookup_table_name())
1499            .ok_or_else(|| {
1500                DataFusionError::Plan(format!(
1501                    "lookup table '{}' not registered",
1502                    lookup_node.lookup_table_name()
1503                ))
1504            })?;
1505
1506        let input = Arc::clone(&physical_inputs[0]);
1507        let stream_schema = input.schema();
1508
1509        match entry {
1510            RegisteredLookup::Partial(partial_state) => {
1511                let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1512
1513                let mut output_fields = stream_schema.fields().to_vec();
1514                output_fields.extend(partial_state.schema.fields().iter().cloned());
1515                let output_schema = Arc::new(Schema::new(output_fields));
1516
1517                let exec = PartialLookupJoinExec::try_new_with_source(
1518                    input,
1519                    Arc::clone(&partial_state.lookup_cache),
1520                    stream_key_indices,
1521                    partial_state.key_sort_fields.clone(),
1522                    lookup_node.join_type(),
1523                    Arc::clone(&partial_state.schema),
1524                    output_schema,
1525                    partial_state.source.clone(),
1526                    Arc::clone(&partial_state.fetch_semaphore),
1527                    partial_state.projection.clone(),
1528                )?;
1529                Ok(Some(Arc::new(exec)))
1530            }
1531            RegisteredLookup::Snapshot(snapshot) => {
1532                let lookup_schema = snapshot.batch.schema();
1533                let lookup_key_indices = resolve_lookup_keys(lookup_node, &lookup_schema)?;
1534
1535                let lookup_batch = if lookup_node.pushdown_predicates().is_empty()
1536                    || snapshot.batch.num_rows() == 0
1537                {
1538                    snapshot.batch.clone()
1539                } else {
1540                    apply_pushdown_predicates(
1541                        &snapshot.batch,
1542                        lookup_node.pushdown_predicates(),
1543                        session_state,
1544                    )?
1545                };
1546
1547                let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1548
1549                // Validate join key types are compatible
1550                for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1551                    let st = stream_schema.field(*si).data_type();
1552                    let lt = lookup_schema.field(*li).data_type();
1553                    if st != lt {
1554                        return Err(DataFusionError::Plan(format!(
1555                            "Lookup join key type mismatch: stream '{}' is {st:?} \
1556                             but lookup '{}' is {lt:?}",
1557                            stream_schema.field(*si).name(),
1558                            lookup_schema.field(*li).name(),
1559                        )));
1560                    }
1561                }
1562
1563                let logical_schema = lookup_node.schema();
1564                let physical_projection = resolve_physical_projection(
1565                    logical_schema,
1566                    &stream_schema,
1567                    &lookup_schema,
1568                    lookup_node.lookup_table_name(),
1569                    lookup_node.lookup_alias(),
1570                )?;
1571
1572                let output_schema = Arc::new(Schema::new(logical_schema.fields().to_vec()));
1573
1574                let exec = LookupJoinExec::try_new(
1575                    input,
1576                    lookup_batch,
1577                    stream_key_indices,
1578                    lookup_key_indices,
1579                    lookup_node.join_type(),
1580                    output_schema,
1581                )?
1582                .with_projection(physical_projection);
1583
1584                Ok(Some(Arc::new(exec)))
1585            }
1586            RegisteredLookup::Versioned(versioned_state) => {
1587                let table_schema = versioned_state.batch.schema();
1588                let lookup_key_indices = resolve_lookup_keys(lookup_node, &table_schema)?;
1589                let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1590
1591                // Validate key type compatibility.
1592                for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1593                    let st = stream_schema.field(*si).data_type();
1594                    let lt = table_schema.field(*li).data_type();
1595                    if st != lt {
1596                        return Err(DataFusionError::Plan(format!(
1597                            "Temporal join key type mismatch: stream '{}' is {st:?} \
1598                             but table '{}' is {lt:?}",
1599                            stream_schema.field(*si).name(),
1600                            table_schema.field(*li).name(),
1601                        )));
1602                    }
1603                }
1604
1605                let stream_time_col_idx = stream_schema
1606                    .index_of(&versioned_state.stream_time_column)
1607                    .map_err(|_| {
1608                        DataFusionError::Plan(format!(
1609                            "stream time column '{}' not found in stream schema",
1610                            versioned_state.stream_time_column
1611                        ))
1612                    })?;
1613
1614                let key_sort_fields: Vec<SortField> = lookup_key_indices
1615                    .iter()
1616                    .map(|&i| SortField::new(table_schema.field(i).data_type().clone()))
1617                    .collect();
1618
1619                let mut output_fields = stream_schema.fields().to_vec();
1620                output_fields.extend(table_schema.fields().iter().cloned());
1621                let output_schema = Arc::new(Schema::new(output_fields));
1622
1623                let exec = VersionedLookupJoinExec::try_new(
1624                    input,
1625                    versioned_state.batch.clone(),
1626                    Arc::clone(&versioned_state.index),
1627                    stream_key_indices,
1628                    stream_time_col_idx,
1629                    lookup_node.join_type(),
1630                    output_schema,
1631                    key_sort_fields,
1632                )?;
1633
1634                Ok(Some(Arc::new(exec)))
1635            }
1636        }
1637    }
1638}
1639
1640/// Evaluates pushdown predicates against the lookup snapshot, returning
1641/// only the rows that pass all predicates. This shrinks the hash index.
1642fn apply_pushdown_predicates(
1643    batch: &RecordBatch,
1644    predicates: &[Expr],
1645    session_state: &SessionState,
1646) -> Result<RecordBatch> {
1647    use arrow::compute::filter_record_batch;
1648    use datafusion::physical_expr::create_physical_expr;
1649
1650    let schema = batch.schema();
1651    let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
1652
1653    let mut mask = None::<arrow_array::BooleanArray>;
1654    for pred in predicates {
1655        let phys_expr = create_physical_expr(pred, &df_schema, session_state.execution_props())?;
1656        let result = phys_expr.evaluate(batch)?;
1657        let bool_arr = result
1658            .into_array(batch.num_rows())?
1659            .as_any()
1660            .downcast_ref::<arrow_array::BooleanArray>()
1661            .ok_or_else(|| {
1662                DataFusionError::Internal("pushdown predicate did not evaluate to boolean".into())
1663            })?
1664            .clone();
1665        mask = Some(match mask {
1666            Some(existing) => arrow::compute::and(&existing, &bool_arr)?,
1667            None => bool_arr,
1668        });
1669    }
1670
1671    match mask {
1672        Some(m) => Ok(filter_record_batch(batch, &m)?),
1673        None => Ok(batch.clone()),
1674    }
1675}
1676
1677fn resolve_stream_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1678    node.join_keys()
1679        .iter()
1680        .map(|pair| match &pair.stream_expr {
1681            Expr::Column(col) => schema.index_of(&col.name).map_err(|_| {
1682                DataFusionError::Plan(format!(
1683                    "stream key column '{}' not found in physical schema",
1684                    col.name
1685                ))
1686            }),
1687            other => Err(DataFusionError::NotImplemented(format!(
1688                "lookup join requires column references as stream keys, got: {other}"
1689            ))),
1690        })
1691        .collect()
1692}
1693
1694fn resolve_lookup_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1695    node.join_keys()
1696        .iter()
1697        .map(|pair| {
1698            schema.index_of(&pair.lookup_column).map_err(|_| {
1699                DataFusionError::Plan(format!(
1700                    "lookup key column '{}' not found in lookup table schema",
1701                    pair.lookup_column
1702                ))
1703            })
1704        })
1705        .collect()
1706}
1707
1708// ── Tests ────────────────────────────────────────────────────────
1709
1710#[cfg(test)]
1711mod tests {
1712    use super::*;
1713    use arrow_array::{Array, Float64Array, Int64Array, StringArray};
1714    use arrow_schema::{DataType, Field};
1715    use datafusion::physical_plan::stream::RecordBatchStreamAdapter as TestStreamAdapter;
1716    use futures::TryStreamExt;
1717
1718    /// Creates a bounded `ExecutionPlan` from a single `RecordBatch`.
1719    fn batch_exec(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
1720        let schema = batch.schema();
1721        let batches = vec![batch];
1722        let stream_schema = Arc::clone(&schema);
1723        Arc::new(StreamExecStub {
1724            schema,
1725            batches: std::sync::Mutex::new(Some(batches)),
1726            stream_schema,
1727        })
1728    }
1729
1730    /// Minimal bounded exec for tests — produces one partition of batches.
1731    struct StreamExecStub {
1732        schema: SchemaRef,
1733        batches: std::sync::Mutex<Option<Vec<RecordBatch>>>,
1734        stream_schema: SchemaRef,
1735    }
1736
1737    impl Debug for StreamExecStub {
1738        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1739            write!(f, "StreamExecStub")
1740        }
1741    }
1742
1743    impl DisplayAs for StreamExecStub {
1744        fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1745            write!(f, "StreamExecStub")
1746        }
1747    }
1748
1749    impl ExecutionPlan for StreamExecStub {
1750        fn name(&self) -> &'static str {
1751            "StreamExecStub"
1752        }
1753        fn as_any(&self) -> &dyn Any {
1754            self
1755        }
1756        fn schema(&self) -> SchemaRef {
1757            Arc::clone(&self.schema)
1758        }
1759        fn properties(&self) -> &PlanProperties {
1760            // Leak a static PlanProperties for test simplicity
1761            Box::leak(Box::new(PlanProperties::new(
1762                EquivalenceProperties::new(Arc::clone(&self.schema)),
1763                Partitioning::UnknownPartitioning(1),
1764                EmissionType::Final,
1765                Boundedness::Bounded,
1766            )))
1767        }
1768        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1769            vec![]
1770        }
1771        fn with_new_children(
1772            self: Arc<Self>,
1773            _: Vec<Arc<dyn ExecutionPlan>>,
1774        ) -> Result<Arc<dyn ExecutionPlan>> {
1775            Ok(self)
1776        }
1777        fn execute(&self, _: usize, _: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
1778            let batches = self.batches.lock().unwrap().take().unwrap_or_default();
1779            let schema = Arc::clone(&self.stream_schema);
1780            let stream = futures::stream::iter(batches.into_iter().map(Ok));
1781            Ok(Box::pin(TestStreamAdapter::new(schema, stream)))
1782        }
1783    }
1784
1785    impl datafusion::physical_plan::ExecutionPlanProperties for StreamExecStub {
1786        fn output_partitioning(&self) -> &Partitioning {
1787            self.properties().output_partitioning()
1788        }
1789        fn output_ordering(&self) -> Option<&LexOrdering> {
1790            None
1791        }
1792        fn boundedness(&self) -> Boundedness {
1793            Boundedness::Bounded
1794        }
1795        fn pipeline_behavior(&self) -> EmissionType {
1796            EmissionType::Final
1797        }
1798        fn equivalence_properties(&self) -> &EquivalenceProperties {
1799            self.properties().equivalence_properties()
1800        }
1801    }
1802
1803    fn orders_schema() -> SchemaRef {
1804        Arc::new(Schema::new(vec![
1805            Field::new("order_id", DataType::Int64, false),
1806            Field::new("customer_id", DataType::Int64, false),
1807            Field::new("amount", DataType::Float64, false),
1808        ]))
1809    }
1810
1811    fn customers_schema() -> SchemaRef {
1812        Arc::new(Schema::new(vec![
1813            Field::new("id", DataType::Int64, false),
1814            Field::new("name", DataType::Utf8, true),
1815        ]))
1816    }
1817
1818    fn output_schema() -> SchemaRef {
1819        Arc::new(Schema::new(vec![
1820            Field::new("order_id", DataType::Int64, false),
1821            Field::new("customer_id", DataType::Int64, false),
1822            Field::new("amount", DataType::Float64, false),
1823            Field::new("id", DataType::Int64, false),
1824            Field::new("name", DataType::Utf8, true),
1825        ]))
1826    }
1827
1828    fn customers_batch() -> RecordBatch {
1829        RecordBatch::try_new(
1830            customers_schema(),
1831            vec![
1832                Arc::new(Int64Array::from(vec![1, 2, 3])),
1833                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1834            ],
1835        )
1836        .unwrap()
1837    }
1838
1839    fn orders_batch() -> RecordBatch {
1840        RecordBatch::try_new(
1841            orders_schema(),
1842            vec![
1843                Arc::new(Int64Array::from(vec![100, 101, 102, 103])),
1844                Arc::new(Int64Array::from(vec![1, 2, 99, 3])),
1845                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0, 40.0])),
1846            ],
1847        )
1848        .unwrap()
1849    }
1850
1851    fn make_exec(join_type: LookupJoinType) -> LookupJoinExec {
1852        let input = batch_exec(orders_batch());
1853        LookupJoinExec::try_new(
1854            input,
1855            customers_batch(),
1856            vec![1], // customer_id
1857            vec![0], // id
1858            join_type,
1859            output_schema(),
1860        )
1861        .unwrap()
1862    }
1863
1864    #[tokio::test]
1865    async fn inner_join_filters_non_matches() {
1866        let exec = make_exec(LookupJoinType::Inner);
1867        let ctx = Arc::new(TaskContext::default());
1868        let stream = exec.execute(0, ctx).unwrap();
1869        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1870
1871        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1872        assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
1873
1874        let names = batches[0]
1875            .column(4)
1876            .as_any()
1877            .downcast_ref::<StringArray>()
1878            .unwrap();
1879        assert_eq!(names.value(0), "Alice");
1880        assert_eq!(names.value(1), "Bob");
1881        assert_eq!(names.value(2), "Charlie");
1882    }
1883
1884    #[tokio::test]
1885    async fn left_outer_preserves_non_matches() {
1886        let exec = make_exec(LookupJoinType::LeftOuter);
1887        let ctx = Arc::new(TaskContext::default());
1888        let stream = exec.execute(0, ctx).unwrap();
1889        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1890
1891        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1892        assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
1893
1894        let names = batches[0]
1895            .column(4)
1896            .as_any()
1897            .downcast_ref::<StringArray>()
1898            .unwrap();
1899        // Row 2 (customer_id=99) should have null name
1900        assert!(names.is_null(2));
1901    }
1902
1903    #[tokio::test]
1904    async fn empty_lookup_inner_produces_no_rows() {
1905        let empty = RecordBatch::new_empty(customers_schema());
1906        let input = batch_exec(orders_batch());
1907        let exec = LookupJoinExec::try_new(
1908            input,
1909            empty,
1910            vec![1],
1911            vec![0],
1912            LookupJoinType::Inner,
1913            output_schema(),
1914        )
1915        .unwrap();
1916
1917        let ctx = Arc::new(TaskContext::default());
1918        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1919        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1920        assert_eq!(total, 0);
1921    }
1922
1923    #[tokio::test]
1924    async fn empty_lookup_left_outer_preserves_all_stream_rows() {
1925        let empty = RecordBatch::new_empty(customers_schema());
1926        let input = batch_exec(orders_batch());
1927        let exec = LookupJoinExec::try_new(
1928            input,
1929            empty,
1930            vec![1],
1931            vec![0],
1932            LookupJoinType::LeftOuter,
1933            output_schema(),
1934        )
1935        .unwrap();
1936
1937        let ctx = Arc::new(TaskContext::default());
1938        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1939        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1940        assert_eq!(total, 4);
1941    }
1942
1943    #[tokio::test]
1944    async fn duplicate_keys_produce_multiple_rows() {
1945        let lookup = RecordBatch::try_new(
1946            customers_schema(),
1947            vec![
1948                Arc::new(Int64Array::from(vec![1, 1])),
1949                Arc::new(StringArray::from(vec!["Alice-A", "Alice-B"])),
1950            ],
1951        )
1952        .unwrap();
1953
1954        let stream = RecordBatch::try_new(
1955            orders_schema(),
1956            vec![
1957                Arc::new(Int64Array::from(vec![100])),
1958                Arc::new(Int64Array::from(vec![1])),
1959                Arc::new(Float64Array::from(vec![10.0])),
1960            ],
1961        )
1962        .unwrap();
1963
1964        let input = batch_exec(stream);
1965        let exec = LookupJoinExec::try_new(
1966            input,
1967            lookup,
1968            vec![1],
1969            vec![0],
1970            LookupJoinType::Inner,
1971            output_schema(),
1972        )
1973        .unwrap();
1974
1975        let ctx = Arc::new(TaskContext::default());
1976        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1977        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1978        assert_eq!(total, 2, "one stream row matched two lookup rows");
1979    }
1980
1981    #[test]
1982    fn with_new_children_preserves_state() {
1983        let exec = Arc::new(make_exec(LookupJoinType::Inner));
1984        let expected_schema = exec.schema();
1985        let children = exec.children().into_iter().cloned().collect();
1986        let rebuilt = exec.with_new_children(children).unwrap();
1987        assert_eq!(rebuilt.schema(), expected_schema);
1988        assert_eq!(rebuilt.name(), "LookupJoinExec");
1989    }
1990
1991    #[test]
1992    fn display_format() {
1993        let exec = make_exec(LookupJoinType::Inner);
1994        let s = format!("{exec:?}");
1995        assert!(s.contains("LookupJoinExec"));
1996        assert!(s.contains("lookup_rows: 3"));
1997    }
1998
1999    #[test]
2000    fn registry_crud() {
2001        let reg = LookupTableRegistry::new();
2002        assert!(reg.get("customers").is_none());
2003
2004        reg.register(
2005            "customers",
2006            LookupSnapshot {
2007                batch: customers_batch(),
2008            },
2009        );
2010        assert!(reg.get("customers").is_some());
2011        assert!(reg.get("CUSTOMERS").is_some(), "case-insensitive");
2012
2013        reg.unregister("customers");
2014        assert!(reg.get("customers").is_none());
2015    }
2016
2017    #[test]
2018    fn registry_update_replaces() {
2019        let reg = LookupTableRegistry::new();
2020        reg.register(
2021            "t",
2022            LookupSnapshot {
2023                batch: RecordBatch::new_empty(customers_schema()),
2024            },
2025        );
2026        assert_eq!(reg.get("t").unwrap().batch.num_rows(), 0);
2027
2028        reg.register(
2029            "t",
2030            LookupSnapshot {
2031                batch: customers_batch(),
2032            },
2033        );
2034        assert_eq!(reg.get("t").unwrap().batch.num_rows(), 3);
2035    }
2036
2037    #[test]
2038    fn pushdown_predicates_filter_snapshot() {
2039        use datafusion::logical_expr::{col, lit};
2040
2041        let batch = customers_batch(); // id=[1,2,3], name=[Alice,Bob,Charlie]
2042        let ctx = datafusion::prelude::SessionContext::new();
2043        let state = ctx.state();
2044
2045        // Filter: id > 1 (should keep rows 2 and 3)
2046        let predicates = vec![col("id").gt(lit(1i64))];
2047        let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
2048        assert_eq!(filtered.num_rows(), 2);
2049
2050        let ids = filtered
2051            .column(0)
2052            .as_any()
2053            .downcast_ref::<Int64Array>()
2054            .unwrap();
2055        assert_eq!(ids.value(0), 2);
2056        assert_eq!(ids.value(1), 3);
2057    }
2058
2059    #[test]
2060    fn pushdown_predicates_empty_passes_all() {
2061        let batch = customers_batch();
2062        let ctx = datafusion::prelude::SessionContext::new();
2063        let state = ctx.state();
2064
2065        let filtered = apply_pushdown_predicates(&batch, &[], &state).unwrap();
2066        assert_eq!(filtered.num_rows(), 3);
2067    }
2068
2069    #[test]
2070    fn pushdown_predicates_multiple_and() {
2071        use datafusion::logical_expr::{col, lit};
2072
2073        let batch = customers_batch(); // id=[1,2,3]
2074        let ctx = datafusion::prelude::SessionContext::new();
2075        let state = ctx.state();
2076
2077        // id >= 2 AND id < 3 → only row with id=2
2078        let predicates = vec![col("id").gt_eq(lit(2i64)), col("id").lt(lit(3i64))];
2079        let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
2080        assert_eq!(filtered.num_rows(), 1);
2081    }
2082
2083    // ── PartialLookupJoinExec Tests ──────────────────────────────
2084
2085    use laminar_core::lookup::lookup_cache::LookupMemoryCacheConfig;
2086
2087    fn make_lookup_cache() -> Arc<LookupMemoryCache> {
2088        Arc::new(LookupMemoryCache::new(
2089            1,
2090            LookupMemoryCacheConfig {
2091                capacity_bytes: 64 * 1024,
2092                ttl: None,
2093            },
2094        ))
2095    }
2096
2097    fn customer_row(id: i64, name: &str) -> RecordBatch {
2098        RecordBatch::try_new(
2099            customers_schema(),
2100            vec![
2101                Arc::new(Int64Array::from(vec![id])),
2102                Arc::new(StringArray::from(vec![name])),
2103            ],
2104        )
2105        .unwrap()
2106    }
2107
2108    fn warm_cache(cache: &LookupMemoryCache) {
2109        let converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap();
2110
2111        for (id, name) in [(1, "Alice"), (2, "Bob"), (3, "Charlie")] {
2112            let key_col = Int64Array::from(vec![id]);
2113            let rows = converter.convert_columns(&[Arc::new(key_col)]).unwrap();
2114            let key = rows.row(0);
2115            cache.insert(key.as_ref(), customer_row(id, name));
2116        }
2117    }
2118
2119    fn make_partial_exec(join_type: LookupJoinType) -> PartialLookupJoinExec {
2120        let cache = make_lookup_cache();
2121        warm_cache(&cache);
2122
2123        let input = batch_exec(orders_batch());
2124        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2125
2126        PartialLookupJoinExec::try_new(
2127            input,
2128            cache,
2129            vec![1], // customer_id
2130            key_sort_fields,
2131            join_type,
2132            customers_schema(),
2133            output_schema(),
2134        )
2135        .unwrap()
2136    }
2137
2138    #[tokio::test]
2139    async fn partial_inner_join_filters_non_matches() {
2140        let exec = make_partial_exec(LookupJoinType::Inner);
2141        let ctx = Arc::new(TaskContext::default());
2142        let stream = exec.execute(0, ctx).unwrap();
2143        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2144
2145        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2146        assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
2147
2148        let names = batches[0]
2149            .column(4)
2150            .as_any()
2151            .downcast_ref::<StringArray>()
2152            .unwrap();
2153        assert_eq!(names.value(0), "Alice");
2154        assert_eq!(names.value(1), "Bob");
2155        assert_eq!(names.value(2), "Charlie");
2156    }
2157
2158    #[tokio::test]
2159    async fn partial_left_outer_preserves_non_matches() {
2160        let exec = make_partial_exec(LookupJoinType::LeftOuter);
2161        let ctx = Arc::new(TaskContext::default());
2162        let stream = exec.execute(0, ctx).unwrap();
2163        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2164
2165        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2166        assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
2167
2168        let names = batches[0]
2169            .column(4)
2170            .as_any()
2171            .downcast_ref::<StringArray>()
2172            .unwrap();
2173        assert!(names.is_null(2), "customer_id=99 should have null name");
2174    }
2175
2176    #[tokio::test]
2177    async fn partial_empty_cache_inner_produces_no_rows() {
2178        let cache = make_lookup_cache();
2179        let input = batch_exec(orders_batch());
2180        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2181
2182        let exec = PartialLookupJoinExec::try_new(
2183            input,
2184            cache,
2185            vec![1],
2186            key_sort_fields,
2187            LookupJoinType::Inner,
2188            customers_schema(),
2189            output_schema(),
2190        )
2191        .unwrap();
2192
2193        let ctx = Arc::new(TaskContext::default());
2194        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2195        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2196        assert_eq!(total, 0);
2197    }
2198
2199    #[tokio::test]
2200    async fn partial_empty_cache_left_outer_preserves_all() {
2201        let cache = make_lookup_cache();
2202        let input = batch_exec(orders_batch());
2203        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2204
2205        let exec = PartialLookupJoinExec::try_new(
2206            input,
2207            cache,
2208            vec![1],
2209            key_sort_fields,
2210            LookupJoinType::LeftOuter,
2211            customers_schema(),
2212            output_schema(),
2213        )
2214        .unwrap();
2215
2216        let ctx = Arc::new(TaskContext::default());
2217        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2218        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2219        assert_eq!(total, 4);
2220    }
2221
2222    #[test]
2223    fn partial_with_new_children_preserves_state() {
2224        let exec = Arc::new(make_partial_exec(LookupJoinType::Inner));
2225        let expected_schema = exec.schema();
2226        let children = exec.children().into_iter().cloned().collect();
2227        let rebuilt = exec.with_new_children(children).unwrap();
2228        assert_eq!(rebuilt.schema(), expected_schema);
2229        assert_eq!(rebuilt.name(), "PartialLookupJoinExec");
2230    }
2231
2232    #[test]
2233    fn partial_display_format() {
2234        let exec = make_partial_exec(LookupJoinType::Inner);
2235        let s = format!("{exec:?}");
2236        assert!(s.contains("PartialLookupJoinExec"));
2237        assert!(s.contains("cache_table_id: 1"));
2238    }
2239
2240    #[test]
2241    fn registry_partial_entry() {
2242        let reg = LookupTableRegistry::new();
2243        let cache = make_lookup_cache();
2244        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2245
2246        reg.register_partial(
2247            "customers",
2248            PartialLookupState {
2249                lookup_cache: cache,
2250                schema: customers_schema(),
2251                key_columns: vec!["id".into()],
2252                key_sort_fields,
2253                source: None,
2254                fetch_semaphore: Arc::new(Semaphore::new(64)),
2255                projection: Vec::new(),
2256            },
2257        );
2258
2259        assert!(reg.get("customers").is_none());
2260
2261        let entry = reg.get_entry("customers");
2262        assert!(entry.is_some());
2263        assert!(matches!(entry.unwrap(), RegisteredLookup::Partial(_)));
2264    }
2265
2266    #[tokio::test]
2267    async fn partial_source_fallback_on_miss() {
2268        use laminar_core::lookup::source::LookupError;
2269        use laminar_core::lookup::source::LookupSourceDyn;
2270
2271        struct TestSource;
2272
2273        #[async_trait]
2274        impl LookupSourceDyn for TestSource {
2275            async fn query_batch(
2276                &self,
2277                keys: &[&[u8]],
2278                _predicates: &[laminar_core::lookup::predicate::Predicate],
2279                _projection: &[laminar_core::lookup::source::ColumnId],
2280            ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2281                Ok(keys
2282                    .iter()
2283                    .map(|_| Some(customer_row(99, "FromSource")))
2284                    .collect())
2285            }
2286
2287            fn schema(&self) -> SchemaRef {
2288                customers_schema()
2289            }
2290        }
2291
2292        let cache = make_lookup_cache();
2293        // Only warm id=1 in cache, id=99 will miss and go to source
2294        warm_cache(&cache);
2295
2296        let orders = RecordBatch::try_new(
2297            orders_schema(),
2298            vec![
2299                Arc::new(Int64Array::from(vec![200])),
2300                Arc::new(Int64Array::from(vec![99])), // not in cache
2301                Arc::new(Float64Array::from(vec![50.0])),
2302            ],
2303        )
2304        .unwrap();
2305
2306        let input = batch_exec(orders);
2307        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2308        let source: Arc<dyn LookupSourceDyn> = Arc::new(TestSource);
2309
2310        let exec = PartialLookupJoinExec::try_new_with_source(
2311            input,
2312            cache,
2313            vec![1],
2314            key_sort_fields,
2315            LookupJoinType::Inner,
2316            customers_schema(),
2317            output_schema(),
2318            Some(source),
2319            Arc::new(Semaphore::new(64)),
2320            vec![],
2321        )
2322        .unwrap();
2323
2324        let ctx = Arc::new(TaskContext::default());
2325        let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2326        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2327        assert_eq!(total, 1, "source fallback should produce 1 row");
2328
2329        let names = batches[0]
2330            .column(4)
2331            .as_any()
2332            .downcast_ref::<StringArray>()
2333            .unwrap();
2334        assert_eq!(names.value(0), "FromSource");
2335    }
2336
2337    #[tokio::test]
2338    async fn partial_source_error_propagates() {
2339        use laminar_core::lookup::source::LookupError;
2340        use laminar_core::lookup::source::LookupSourceDyn;
2341
2342        struct FailingSource;
2343
2344        #[async_trait]
2345        impl LookupSourceDyn for FailingSource {
2346            async fn query_batch(
2347                &self,
2348                _keys: &[&[u8]],
2349                _predicates: &[laminar_core::lookup::predicate::Predicate],
2350                _projection: &[laminar_core::lookup::source::ColumnId],
2351            ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2352                Err(LookupError::Internal("source unavailable".into()))
2353            }
2354
2355            fn schema(&self) -> SchemaRef {
2356                customers_schema()
2357            }
2358        }
2359
2360        let cache = make_lookup_cache();
2361        let input = batch_exec(orders_batch());
2362        let key_sort_fields = vec![SortField::new(DataType::Int64)];
2363        let source: Arc<dyn LookupSourceDyn> = Arc::new(FailingSource);
2364
2365        let exec = PartialLookupJoinExec::try_new_with_source(
2366            input,
2367            cache,
2368            vec![1],
2369            key_sort_fields,
2370            LookupJoinType::LeftOuter,
2371            customers_schema(),
2372            output_schema(),
2373            Some(source),
2374            Arc::new(Semaphore::new(64)),
2375            vec![],
2376        )
2377        .unwrap();
2378
2379        // A source error must propagate (fail the cycle → replay), NOT silently
2380        // serve cache-only results that drop/NULL-fill the missed rows.
2381        let ctx = Arc::new(TaskContext::default());
2382        let result: std::result::Result<Vec<RecordBatch>, _> =
2383            exec.execute(0, ctx).unwrap().try_collect().await;
2384        let err = result.expect_err("source error must propagate, not degrade silently");
2385        assert!(
2386            err.to_string().contains("lookup source query failed"),
2387            "unexpected error: {err}"
2388        );
2389    }
2390
2391    #[test]
2392    fn registry_snapshot_entry_via_get_entry() {
2393        let reg = LookupTableRegistry::new();
2394        reg.register(
2395            "t",
2396            LookupSnapshot {
2397                batch: customers_batch(),
2398            },
2399        );
2400
2401        let entry = reg.get_entry("t");
2402        assert!(matches!(entry.unwrap(), RegisteredLookup::Snapshot(_)));
2403        assert!(reg.get("t").is_some());
2404    }
2405
2406    // ── NULL key tests ────────────────────────────────────────────────
2407
2408    fn nullable_orders_schema() -> SchemaRef {
2409        Arc::new(Schema::new(vec![
2410            Field::new("order_id", DataType::Int64, false),
2411            Field::new("customer_id", DataType::Int64, true), // nullable key
2412            Field::new("amount", DataType::Float64, false),
2413        ]))
2414    }
2415
2416    fn nullable_output_schema(join_type: LookupJoinType) -> SchemaRef {
2417        let lookup_nullable = join_type == LookupJoinType::LeftOuter;
2418        Arc::new(Schema::new(vec![
2419            Field::new("order_id", DataType::Int64, false),
2420            Field::new("customer_id", DataType::Int64, true),
2421            Field::new("amount", DataType::Float64, false),
2422            Field::new("id", DataType::Int64, lookup_nullable),
2423            Field::new("name", DataType::Utf8, true),
2424        ]))
2425    }
2426
2427    #[tokio::test]
2428    async fn null_key_inner_join_no_match() {
2429        // Stream: customer_id = [1, NULL, 2]
2430        let stream_batch = RecordBatch::try_new(
2431            nullable_orders_schema(),
2432            vec![
2433                Arc::new(Int64Array::from(vec![100, 101, 102])),
2434                Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2435                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2436            ],
2437        )
2438        .unwrap();
2439
2440        let input = batch_exec(stream_batch);
2441        let exec = LookupJoinExec::try_new(
2442            input,
2443            customers_batch(),
2444            vec![1],
2445            vec![0],
2446            LookupJoinType::Inner,
2447            nullable_output_schema(LookupJoinType::Inner),
2448        )
2449        .unwrap();
2450
2451        let ctx = Arc::new(TaskContext::default());
2452        let stream = exec.execute(0, ctx).unwrap();
2453        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2454
2455        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2456        // Only customer_id=1 and customer_id=2 match; NULL is skipped
2457        assert_eq!(total, 2, "NULL key row should not match in inner join");
2458    }
2459
2460    #[tokio::test]
2461    async fn null_key_left_outer_produces_nulls() {
2462        // Stream: customer_id = [1, NULL, 2]
2463        let stream_batch = RecordBatch::try_new(
2464            nullable_orders_schema(),
2465            vec![
2466                Arc::new(Int64Array::from(vec![100, 101, 102])),
2467                Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2468                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2469            ],
2470        )
2471        .unwrap();
2472
2473        let input = batch_exec(stream_batch);
2474        let out_schema = nullable_output_schema(LookupJoinType::LeftOuter);
2475        let exec = LookupJoinExec::try_new(
2476            input,
2477            customers_batch(),
2478            vec![1],
2479            vec![0],
2480            LookupJoinType::LeftOuter,
2481            out_schema,
2482        )
2483        .unwrap();
2484
2485        let ctx = Arc::new(TaskContext::default());
2486        let stream = exec.execute(0, ctx).unwrap();
2487        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2488
2489        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2490        // All 3 rows preserved; NULL key row has null lookup columns
2491        assert_eq!(total, 3, "all rows preserved in left outer");
2492
2493        let names = batches[0]
2494            .column(4)
2495            .as_any()
2496            .downcast_ref::<StringArray>()
2497            .unwrap();
2498        assert_eq!(names.value(0), "Alice");
2499        assert!(
2500            names.is_null(1),
2501            "NULL key row should have null lookup name"
2502        );
2503        assert_eq!(names.value(2), "Bob");
2504    }
2505
2506    // ── Versioned Lookup Join Tests ────────────────────────────────
2507
2508    fn versioned_table_batch() -> RecordBatch {
2509        // Table with key=currency, version_ts=valid_from, rate=value
2510        // Two currencies with multiple versions each
2511        let schema = Arc::new(Schema::new(vec![
2512            Field::new("currency", DataType::Utf8, false),
2513            Field::new("valid_from", DataType::Int64, false),
2514            Field::new("rate", DataType::Float64, false),
2515        ]));
2516        RecordBatch::try_new(
2517            schema,
2518            vec![
2519                Arc::new(StringArray::from(vec!["USD", "USD", "EUR", "EUR", "EUR"])),
2520                Arc::new(Int64Array::from(vec![100, 200, 100, 150, 300])),
2521                Arc::new(Float64Array::from(vec![1.0, 1.1, 0.85, 0.90, 0.88])),
2522            ],
2523        )
2524        .unwrap()
2525    }
2526
2527    fn stream_batch_with_time() -> RecordBatch {
2528        let schema = Arc::new(Schema::new(vec![
2529            Field::new("order_id", DataType::Int64, false),
2530            Field::new("currency", DataType::Utf8, false),
2531            Field::new("event_ts", DataType::Int64, false),
2532        ]));
2533        RecordBatch::try_new(
2534            schema,
2535            vec![
2536                Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
2537                Arc::new(StringArray::from(vec!["USD", "EUR", "USD", "EUR"])),
2538                Arc::new(Int64Array::from(vec![150, 160, 250, 50])),
2539            ],
2540        )
2541        .unwrap()
2542    }
2543
2544    #[test]
2545    fn test_versioned_index_build_and_probe() {
2546        let batch = versioned_table_batch();
2547        let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2548
2549        // USD has versions at 100 and 200
2550        // Probe at 150 → should find version 100 (latest <= 150)
2551        let key_sf = vec![SortField::new(DataType::Utf8)];
2552        let converter = RowConverter::new(key_sf).unwrap();
2553        let usd_col = Arc::new(StringArray::from(vec!["USD"]));
2554        let usd_rows = converter.convert_columns(&[usd_col]).unwrap();
2555        let usd_key = usd_rows.row(0);
2556
2557        let result = index.probe_at_time(usd_key.as_ref(), 150);
2558        assert!(result.is_some());
2559        // Row 0 is USD@100, Row 1 is USD@200. At time 150, should get row 0.
2560        assert_eq!(result.unwrap(), 0);
2561
2562        // Probe at 250 → should find version 200 (row 1)
2563        let result = index.probe_at_time(usd_key.as_ref(), 250);
2564        assert_eq!(result.unwrap(), 1);
2565    }
2566
2567    #[test]
2568    fn test_versioned_index_no_version_before_ts() {
2569        let batch = versioned_table_batch();
2570        let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2571
2572        let key_sf = vec![SortField::new(DataType::Utf8)];
2573        let converter = RowConverter::new(key_sf).unwrap();
2574        let eur_col = Arc::new(StringArray::from(vec!["EUR"]));
2575        let eur_rows = converter.convert_columns(&[eur_col]).unwrap();
2576        let eur_key = eur_rows.row(0);
2577
2578        // EUR versions start at 100. Probe at 50 → None
2579        let result = index.probe_at_time(eur_key.as_ref(), 50);
2580        assert!(result.is_none());
2581    }
2582
2583    /// Helper to build a VersionedLookupJoinExec for tests.
2584    fn build_versioned_exec(
2585        table: RecordBatch,
2586        stream: &RecordBatch,
2587        join_type: LookupJoinType,
2588    ) -> VersionedLookupJoinExec {
2589        let input = batch_exec(stream.clone());
2590        let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2591        let key_sort_fields = vec![SortField::new(DataType::Utf8)];
2592        let mut output_fields = stream.schema().fields().to_vec();
2593        output_fields.extend(table.schema().fields().iter().cloned());
2594        let output_schema = Arc::new(Schema::new(output_fields));
2595        VersionedLookupJoinExec::try_new(
2596            input,
2597            table,
2598            index,
2599            vec![1], // stream key col: currency
2600            2,       // stream time col: event_ts
2601            join_type,
2602            output_schema,
2603            key_sort_fields,
2604        )
2605        .unwrap()
2606    }
2607
2608    #[tokio::test]
2609    async fn test_versioned_join_exec_inner() {
2610        let table = versioned_table_batch();
2611        let stream = stream_batch_with_time();
2612        let exec = build_versioned_exec(table, &stream, LookupJoinType::Inner);
2613
2614        let ctx = Arc::new(TaskContext::default());
2615        let stream_out = exec.execute(0, ctx).unwrap();
2616        let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2617
2618        assert_eq!(batches.len(), 1);
2619        let batch = &batches[0];
2620        // Row 1: order_id=1, USD, ts=150 → USD@100 (rate=1.0)
2621        // Row 2: order_id=2, EUR, ts=160 → EUR@150 (rate=0.90)
2622        // Row 3: order_id=3, USD, ts=250 → USD@200 (rate=1.1)
2623        // Row 4: order_id=4, EUR, ts=50 → no EUR version <= 50 → SKIP (inner)
2624        assert_eq!(batch.num_rows(), 3);
2625
2626        let rates = batch
2627            .column(5) // rate is 6th column (3 stream + 3 table, rate is table col 2)
2628            .as_any()
2629            .downcast_ref::<Float64Array>()
2630            .unwrap();
2631        assert!((rates.value(0) - 1.0).abs() < f64::EPSILON); // USD@100
2632        assert!((rates.value(1) - 0.90).abs() < f64::EPSILON); // EUR@150
2633        assert!((rates.value(2) - 1.1).abs() < f64::EPSILON); // USD@200
2634    }
2635
2636    #[tokio::test]
2637    async fn test_versioned_join_exec_left_outer() {
2638        let table = versioned_table_batch();
2639        let stream = stream_batch_with_time();
2640        let exec = build_versioned_exec(table, &stream, LookupJoinType::LeftOuter);
2641
2642        let ctx = Arc::new(TaskContext::default());
2643        let stream_out = exec.execute(0, ctx).unwrap();
2644        let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2645
2646        assert_eq!(batches.len(), 1);
2647        let batch = &batches[0];
2648        // All 4 rows present (left outer)
2649        assert_eq!(batch.num_rows(), 4);
2650
2651        // Row 4 (EUR@50): no version → null rate
2652        let rates = batch
2653            .column(5)
2654            .as_any()
2655            .downcast_ref::<Float64Array>()
2656            .unwrap();
2657        assert!(rates.is_null(3), "EUR@50 should have null rate");
2658    }
2659
2660    #[test]
2661    fn test_versioned_index_empty_batch() {
2662        let schema = Arc::new(Schema::new(vec![
2663            Field::new("k", DataType::Utf8, false),
2664            Field::new("v", DataType::Int64, false),
2665        ]));
2666        let batch = RecordBatch::new_empty(schema);
2667        let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2668        assert!(index.map.is_empty());
2669    }
2670
2671    #[test]
2672    fn test_versioned_lookup_registry() {
2673        let registry = LookupTableRegistry::new();
2674        let table = versioned_table_batch();
2675        let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2676
2677        registry.register_versioned(
2678            "rates",
2679            VersionedLookupState {
2680                batch: table,
2681                index,
2682                key_columns: vec!["currency".to_string()],
2683                version_column: "valid_from".to_string(),
2684                stream_time_column: "event_ts".to_string(),
2685                max_versions_per_key: usize::MAX,
2686            },
2687        );
2688
2689        let entry = registry.get_entry("rates");
2690        assert!(entry.is_some());
2691        assert!(matches!(entry.unwrap(), RegisteredLookup::Versioned(_)));
2692
2693        // get() should return None for versioned entries (snapshot-only)
2694        assert!(registry.get("rates").is_none());
2695    }
2696}