1use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Debug, Formatter};
19use std::sync::Arc;
20
21use parking_lot::RwLock;
22
23use std::collections::BTreeMap;
24
25use arrow::compute::take;
26use arrow::row::{RowConverter, SortField};
27use arrow_array::{RecordBatch, UInt32Array};
28use arrow_schema::{Schema, SchemaRef};
29use async_trait::async_trait;
30use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext};
31use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
32use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
35use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
36use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
37use datafusion_common::{DataFusionError, Result};
38use datafusion_expr::Expr;
39use futures::StreamExt;
40use laminar_core::lookup::foyer_cache::FoyerMemoryCache;
41use laminar_core::lookup::source::LookupSourceDyn;
42use tokio::sync::Semaphore;
43
44use super::lookup_join::{LookupJoinNode, LookupJoinType};
45
46#[derive(Default)]
53pub struct LookupTableRegistry {
54 tables: RwLock<HashMap<String, RegisteredLookup>>,
55}
56
57pub enum RegisteredLookup {
60 Snapshot(Arc<LookupSnapshot>),
62 Partial(Arc<PartialLookupState>),
64 Versioned(Arc<VersionedLookupState>),
66}
67
68pub struct LookupSnapshot {
70 pub batch: RecordBatch,
72 pub key_columns: Vec<String>,
74}
75
76pub struct VersionedLookupState {
83 pub batch: RecordBatch,
85 pub index: Arc<VersionedIndex>,
87 pub key_columns: Vec<String>,
89 pub version_column: String,
91 pub stream_time_column: String,
93 pub max_versions_per_key: usize,
95}
96
97pub struct PartialLookupState {
99 pub foyer_cache: Arc<FoyerMemoryCache>,
101 pub schema: SchemaRef,
103 pub key_columns: Vec<String>,
105 pub key_sort_fields: Vec<SortField>,
107 pub source: Option<Arc<dyn LookupSourceDyn>>,
109 pub fetch_semaphore: Arc<Semaphore>,
111}
112
113impl LookupTableRegistry {
114 #[must_use]
116 pub fn new() -> Self {
117 Self::default()
118 }
119
120 pub fn register(&self, name: &str, snapshot: LookupSnapshot) {
126 self.tables.write().insert(
127 name.to_lowercase(),
128 RegisteredLookup::Snapshot(Arc::new(snapshot)),
129 );
130 }
131
132 pub fn register_partial(&self, name: &str, state: PartialLookupState) {
138 self.tables.write().insert(
139 name.to_lowercase(),
140 RegisteredLookup::Partial(Arc::new(state)),
141 );
142 }
143
144 pub fn register_versioned(&self, name: &str, state: VersionedLookupState) {
150 self.tables.write().insert(
151 name.to_lowercase(),
152 RegisteredLookup::Versioned(Arc::new(state)),
153 );
154 }
155
156 pub fn unregister(&self, name: &str) {
162 self.tables.write().remove(&name.to_lowercase());
163 }
164
165 #[must_use]
171 pub fn get(&self, name: &str) -> Option<Arc<LookupSnapshot>> {
172 let tables = self.tables.read();
173 match tables.get(&name.to_lowercase())? {
174 RegisteredLookup::Snapshot(s) => Some(Arc::clone(s)),
175 RegisteredLookup::Partial(_) | RegisteredLookup::Versioned(_) => None,
176 }
177 }
178
179 pub fn get_entry(&self, name: &str) -> Option<RegisteredLookup> {
185 let tables = self.tables.read();
186 tables.get(&name.to_lowercase()).map(|e| match e {
187 RegisteredLookup::Snapshot(s) => RegisteredLookup::Snapshot(Arc::clone(s)),
188 RegisteredLookup::Partial(p) => RegisteredLookup::Partial(Arc::clone(p)),
189 RegisteredLookup::Versioned(v) => RegisteredLookup::Versioned(Arc::clone(v)),
190 })
191 }
192}
193
194struct HashIndex {
198 map: HashMap<Box<[u8]>, Vec<u32>>,
199}
200
201impl HashIndex {
202 fn build(batch: &RecordBatch, key_indices: &[usize]) -> Result<Self> {
207 if batch.num_rows() == 0 {
208 return Ok(Self {
209 map: HashMap::new(),
210 });
211 }
212
213 let sort_fields: Vec<SortField> = key_indices
214 .iter()
215 .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
216 .collect();
217 let converter = RowConverter::new(sort_fields)?;
218
219 let key_cols: Vec<_> = key_indices
220 .iter()
221 .map(|&i| batch.column(i).clone())
222 .collect();
223 let rows = converter.convert_columns(&key_cols)?;
224
225 let num_rows = batch.num_rows();
226 let mut map: HashMap<Box<[u8]>, Vec<u32>> = HashMap::with_capacity(num_rows);
227 #[allow(clippy::cast_possible_truncation)] for i in 0..num_rows {
229 map.entry(Box::from(rows.row(i).as_ref()))
230 .or_default()
231 .push(i as u32);
232 }
233
234 Ok(Self { map })
235 }
236
237 fn probe(&self, key: &[u8]) -> Option<&[u32]> {
238 self.map.get(key).map(Vec::as_slice)
239 }
240}
241
242#[derive(Default)]
248pub struct VersionedIndex {
249 map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>>,
250}
251
252impl VersionedIndex {
253 pub fn build(
263 batch: &RecordBatch,
264 key_indices: &[usize],
265 version_col_idx: usize,
266 max_versions_per_key: usize,
267 ) -> Result<Self> {
268 if batch.num_rows() == 0 {
269 return Ok(Self {
270 map: HashMap::new(),
271 });
272 }
273
274 let sort_fields: Vec<SortField> = key_indices
275 .iter()
276 .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
277 .collect();
278 let converter = RowConverter::new(sort_fields)?;
279
280 let key_cols: Vec<_> = key_indices
281 .iter()
282 .map(|&i| batch.column(i).clone())
283 .collect();
284 let rows = converter.convert_columns(&key_cols)?;
285
286 let timestamps = extract_i64_timestamps(batch.column(version_col_idx))?;
287
288 let num_rows = batch.num_rows();
289 let mut map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>> = HashMap::with_capacity(num_rows);
290 #[allow(clippy::cast_possible_truncation)]
291 for (i, ts_opt) in timestamps.iter().enumerate() {
292 let Some(version_ts) = ts_opt else { continue };
294 if key_cols.iter().any(|c| c.is_null(i)) {
295 continue;
296 }
297 let key = Box::from(rows.row(i).as_ref());
298 map.entry(key)
299 .or_default()
300 .entry(*version_ts)
301 .or_default()
302 .push(i as u32);
303 }
304
305 if max_versions_per_key < usize::MAX {
307 for versions in map.values_mut() {
308 while versions.len() > max_versions_per_key {
309 versions.pop_first();
310 }
311 }
312 }
313
314 Ok(Self { map })
315 }
316
317 fn probe_at_time(&self, key: &[u8], event_ts: i64) -> Option<u32> {
320 let versions = self.map.get(key)?;
321 let (_, indices) = versions.range(..=event_ts).next_back()?;
322 indices.last().copied()
323 }
324}
325
326fn extract_i64_timestamps(col: &dyn arrow_array::Array) -> Result<Vec<Option<i64>>> {
332 use arrow_array::{
333 Float64Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
334 TimestampNanosecondArray, TimestampSecondArray,
335 };
336 use arrow_schema::{DataType, TimeUnit};
337
338 let n = col.len();
339 let mut out = Vec::with_capacity(n);
340 macro_rules! extract_typed {
341 ($arr_type:ty, $scale:expr) => {{
342 let arr = col.as_any().downcast_ref::<$arr_type>().ok_or_else(|| {
343 DataFusionError::Internal(concat!("expected ", stringify!($arr_type)).into())
344 })?;
345 for i in 0..n {
346 out.push(if col.is_null(i) {
347 None
348 } else {
349 Some(arr.value(i) * $scale)
350 });
351 }
352 }};
353 }
354
355 match col.data_type() {
356 DataType::Int64 => extract_typed!(Int64Array, 1),
357 DataType::Timestamp(TimeUnit::Millisecond, _) => {
358 extract_typed!(TimestampMillisecondArray, 1);
359 }
360 DataType::Timestamp(TimeUnit::Microsecond, _) => {
361 let arr = col
362 .as_any()
363 .downcast_ref::<TimestampMicrosecondArray>()
364 .ok_or_else(|| {
365 DataFusionError::Internal("expected TimestampMicrosecondArray".into())
366 })?;
367 for i in 0..n {
368 out.push(if col.is_null(i) {
369 None
370 } else {
371 Some(arr.value(i) / 1000)
372 });
373 }
374 }
375 DataType::Timestamp(TimeUnit::Second, _) => {
376 extract_typed!(TimestampSecondArray, 1000);
377 }
378 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
379 let arr = col
380 .as_any()
381 .downcast_ref::<TimestampNanosecondArray>()
382 .ok_or_else(|| {
383 DataFusionError::Internal("expected TimestampNanosecondArray".into())
384 })?;
385 for i in 0..n {
386 out.push(if col.is_null(i) {
387 None
388 } else {
389 Some(arr.value(i) / 1_000_000)
390 });
391 }
392 }
393 DataType::Float64 => {
394 let arr = col
395 .as_any()
396 .downcast_ref::<Float64Array>()
397 .ok_or_else(|| DataFusionError::Internal("expected Float64Array".into()))?;
398 #[allow(clippy::cast_possible_truncation)]
399 for i in 0..n {
400 out.push(if col.is_null(i) {
401 None
402 } else {
403 Some(arr.value(i) as i64)
404 });
405 }
406 }
407 other => {
408 return Err(DataFusionError::Plan(format!(
409 "unsupported timestamp type for temporal join: {other:?}"
410 )));
411 }
412 }
413
414 Ok(out)
415}
416
417pub struct LookupJoinExec {
422 input: Arc<dyn ExecutionPlan>,
423 index: Arc<HashIndex>,
424 lookup_batch: Arc<RecordBatch>,
425 stream_key_indices: Vec<usize>,
426 join_type: LookupJoinType,
427 schema: SchemaRef,
428 properties: PlanProperties,
429 converter: Arc<RowConverter>,
433 stream_field_count: usize,
434}
435
436impl LookupJoinExec {
437 #[allow(clippy::needless_pass_by_value)] pub fn try_new(
447 input: Arc<dyn ExecutionPlan>,
448 lookup_batch: RecordBatch,
449 stream_key_indices: Vec<usize>,
450 lookup_key_indices: Vec<usize>,
451 join_type: LookupJoinType,
452 output_schema: SchemaRef,
453 ) -> Result<Self> {
454 let index = HashIndex::build(&lookup_batch, &lookup_key_indices)?;
455
456 let key_sort_fields: Vec<SortField> = lookup_key_indices
457 .iter()
458 .map(|&i| SortField::new(lookup_batch.schema().field(i).data_type().clone()))
459 .collect();
460 let converter = Arc::new(RowConverter::new(key_sort_fields)?);
461
462 let output_schema = if join_type == LookupJoinType::LeftOuter {
465 let stream_count = input.schema().fields().len();
466 let mut fields = output_schema.fields().to_vec();
467 for f in &mut fields[stream_count..] {
468 if !f.is_nullable() {
469 *f = Arc::new(f.as_ref().clone().with_nullable(true));
470 }
471 }
472 Arc::new(Schema::new_with_metadata(
473 fields,
474 output_schema.metadata().clone(),
475 ))
476 } else {
477 output_schema
478 };
479
480 let properties = PlanProperties::new(
481 EquivalenceProperties::new(Arc::clone(&output_schema)),
482 Partitioning::UnknownPartitioning(1),
483 EmissionType::Incremental,
484 Boundedness::Unbounded {
485 requires_infinite_memory: false,
486 },
487 );
488
489 let stream_field_count = input.schema().fields().len();
490
491 Ok(Self {
492 input,
493 index: Arc::new(index),
494 lookup_batch: Arc::new(lookup_batch),
495 stream_key_indices,
496 join_type,
497 schema: output_schema,
498 properties,
499 converter,
500 stream_field_count,
501 })
502 }
503}
504
505impl Debug for LookupJoinExec {
506 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
507 f.debug_struct("LookupJoinExec")
508 .field("join_type", &self.join_type)
509 .field("stream_keys", &self.stream_key_indices)
510 .field("lookup_rows", &self.lookup_batch.num_rows())
511 .finish_non_exhaustive()
512 }
513}
514
515impl DisplayAs for LookupJoinExec {
516 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
517 match t {
518 DisplayFormatType::Default | DisplayFormatType::Verbose => {
519 write!(
520 f,
521 "LookupJoinExec: type={}, stream_keys={:?}, lookup_rows={}",
522 self.join_type,
523 self.stream_key_indices,
524 self.lookup_batch.num_rows(),
525 )
526 }
527 DisplayFormatType::TreeRender => write!(f, "LookupJoinExec"),
528 }
529 }
530}
531
532impl ExecutionPlan for LookupJoinExec {
533 fn name(&self) -> &'static str {
534 "LookupJoinExec"
535 }
536
537 fn as_any(&self) -> &dyn Any {
538 self
539 }
540
541 fn schema(&self) -> SchemaRef {
542 Arc::clone(&self.schema)
543 }
544
545 fn properties(&self) -> &PlanProperties {
546 &self.properties
547 }
548
549 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
550 vec![&self.input]
551 }
552
553 fn with_new_children(
554 self: Arc<Self>,
555 mut children: Vec<Arc<dyn ExecutionPlan>>,
556 ) -> Result<Arc<dyn ExecutionPlan>> {
557 if children.len() != 1 {
558 return Err(DataFusionError::Plan(
559 "LookupJoinExec requires exactly one child".into(),
560 ));
561 }
562 Ok(Arc::new(Self {
563 input: children.swap_remove(0),
564 index: Arc::clone(&self.index),
565 lookup_batch: Arc::clone(&self.lookup_batch),
566 stream_key_indices: self.stream_key_indices.clone(),
567 join_type: self.join_type,
568 schema: Arc::clone(&self.schema),
569 properties: self.properties.clone(),
570 converter: Arc::clone(&self.converter),
571 stream_field_count: self.stream_field_count,
572 }))
573 }
574
575 fn execute(
576 &self,
577 partition: usize,
578 context: Arc<TaskContext>,
579 ) -> Result<SendableRecordBatchStream> {
580 let input_stream = self.input.execute(partition, context)?;
581 let converter = Arc::clone(&self.converter);
582 let index = Arc::clone(&self.index);
583 let lookup_batch = Arc::clone(&self.lookup_batch);
584 let stream_key_indices = self.stream_key_indices.clone();
585 let join_type = self.join_type;
586 let schema = self.schema();
587 let stream_field_count = self.stream_field_count;
588
589 let output = input_stream.map(move |result| {
590 let batch = result?;
591 if batch.num_rows() == 0 {
592 return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
593 }
594 probe_batch(
595 &batch,
596 &converter,
597 &index,
598 &lookup_batch,
599 &stream_key_indices,
600 join_type,
601 &schema,
602 stream_field_count,
603 )
604 });
605
606 Ok(Box::pin(RecordBatchStreamAdapter::new(
607 self.schema(),
608 output,
609 )))
610 }
611}
612
613impl datafusion::physical_plan::ExecutionPlanProperties for LookupJoinExec {
614 fn output_partitioning(&self) -> &Partitioning {
615 self.properties.output_partitioning()
616 }
617
618 fn output_ordering(&self) -> Option<&LexOrdering> {
619 self.properties.output_ordering()
620 }
621
622 fn boundedness(&self) -> Boundedness {
623 Boundedness::Unbounded {
624 requires_infinite_memory: false,
625 }
626 }
627
628 fn pipeline_behavior(&self) -> EmissionType {
629 EmissionType::Incremental
630 }
631
632 fn equivalence_properties(&self) -> &EquivalenceProperties {
633 self.properties.equivalence_properties()
634 }
635}
636
637#[allow(clippy::too_many_arguments)]
642fn probe_batch(
643 stream_batch: &RecordBatch,
644 converter: &RowConverter,
645 index: &HashIndex,
646 lookup_batch: &RecordBatch,
647 stream_key_indices: &[usize],
648 join_type: LookupJoinType,
649 output_schema: &SchemaRef,
650 stream_field_count: usize,
651) -> Result<RecordBatch> {
652 let key_cols: Vec<_> = stream_key_indices
653 .iter()
654 .map(|&i| stream_batch.column(i).clone())
655 .collect();
656 let rows = converter.convert_columns(&key_cols)?;
657
658 let num_rows = stream_batch.num_rows();
659 let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
660 let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
661
662 #[allow(clippy::cast_possible_truncation)] for row in 0..num_rows {
664 if key_cols.iter().any(|c| c.is_null(row)) {
666 if join_type == LookupJoinType::LeftOuter {
667 stream_indices.push(row as u32);
668 lookup_indices.push(None);
669 }
670 continue;
671 }
672
673 let key = rows.row(row);
674 match index.probe(key.as_ref()) {
675 Some(matches) => {
676 for &lookup_row in matches {
677 stream_indices.push(row as u32);
678 lookup_indices.push(Some(lookup_row));
679 }
680 }
681 None if join_type == LookupJoinType::LeftOuter => {
682 stream_indices.push(row as u32);
683 lookup_indices.push(None);
684 }
685 None => {}
686 }
687 }
688
689 if stream_indices.is_empty() {
690 return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
691 }
692
693 let take_stream = UInt32Array::from(stream_indices);
695 let mut columns = Vec::with_capacity(output_schema.fields().len());
696
697 for col in stream_batch.columns() {
698 columns.push(take(col.as_ref(), &take_stream, None)?);
699 }
700
701 let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
703 for col in lookup_batch.columns() {
704 columns.push(take(col.as_ref(), &take_lookup, None)?);
705 }
706
707 debug_assert_eq!(
708 columns.len(),
709 stream_field_count + lookup_batch.num_columns(),
710 "output column count mismatch"
711 );
712
713 Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
714}
715
716pub struct VersionedLookupJoinExec {
722 input: Arc<dyn ExecutionPlan>,
723 index: Arc<VersionedIndex>,
724 table_batch: Arc<RecordBatch>,
725 stream_key_indices: Vec<usize>,
726 stream_time_col_idx: usize,
727 join_type: LookupJoinType,
728 schema: SchemaRef,
729 properties: PlanProperties,
730 converter: Arc<RowConverter>,
733 stream_field_count: usize,
734}
735
736impl VersionedLookupJoinExec {
737 #[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
747 pub fn try_new(
748 input: Arc<dyn ExecutionPlan>,
749 table_batch: RecordBatch,
750 index: Arc<VersionedIndex>,
751 stream_key_indices: Vec<usize>,
752 stream_time_col_idx: usize,
753 join_type: LookupJoinType,
754 output_schema: SchemaRef,
755 key_sort_fields: Vec<SortField>,
756 ) -> Result<Self> {
757 let output_schema = if join_type == LookupJoinType::LeftOuter {
758 let stream_count = input.schema().fields().len();
759 let mut fields = output_schema.fields().to_vec();
760 for f in &mut fields[stream_count..] {
761 if !f.is_nullable() {
762 *f = Arc::new(f.as_ref().clone().with_nullable(true));
763 }
764 }
765 Arc::new(Schema::new_with_metadata(
766 fields,
767 output_schema.metadata().clone(),
768 ))
769 } else {
770 output_schema
771 };
772
773 let properties = PlanProperties::new(
774 EquivalenceProperties::new(Arc::clone(&output_schema)),
775 Partitioning::UnknownPartitioning(1),
776 EmissionType::Incremental,
777 Boundedness::Unbounded {
778 requires_infinite_memory: false,
779 },
780 );
781
782 let stream_field_count = input.schema().fields().len();
783 let converter = Arc::new(RowConverter::new(key_sort_fields)?);
784
785 Ok(Self {
786 input,
787 index,
788 table_batch: Arc::new(table_batch),
789 stream_key_indices,
790 stream_time_col_idx,
791 join_type,
792 schema: output_schema,
793 properties,
794 converter,
795 stream_field_count,
796 })
797 }
798}
799
800impl Debug for VersionedLookupJoinExec {
801 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
802 f.debug_struct("VersionedLookupJoinExec")
803 .field("join_type", &self.join_type)
804 .field("stream_keys", &self.stream_key_indices)
805 .field("table_rows", &self.table_batch.num_rows())
806 .finish_non_exhaustive()
807 }
808}
809
810impl DisplayAs for VersionedLookupJoinExec {
811 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
812 match t {
813 DisplayFormatType::Default | DisplayFormatType::Verbose => {
814 write!(
815 f,
816 "VersionedLookupJoinExec: type={}, stream_keys={:?}, table_rows={}",
817 self.join_type,
818 self.stream_key_indices,
819 self.table_batch.num_rows(),
820 )
821 }
822 DisplayFormatType::TreeRender => write!(f, "VersionedLookupJoinExec"),
823 }
824 }
825}
826
827impl ExecutionPlan for VersionedLookupJoinExec {
828 fn name(&self) -> &'static str {
829 "VersionedLookupJoinExec"
830 }
831
832 fn as_any(&self) -> &dyn Any {
833 self
834 }
835
836 fn schema(&self) -> SchemaRef {
837 Arc::clone(&self.schema)
838 }
839
840 fn properties(&self) -> &PlanProperties {
841 &self.properties
842 }
843
844 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
845 vec![&self.input]
846 }
847
848 fn with_new_children(
849 self: Arc<Self>,
850 mut children: Vec<Arc<dyn ExecutionPlan>>,
851 ) -> Result<Arc<dyn ExecutionPlan>> {
852 if children.len() != 1 {
853 return Err(DataFusionError::Plan(
854 "VersionedLookupJoinExec requires exactly one child".into(),
855 ));
856 }
857 Ok(Arc::new(Self {
858 input: children.swap_remove(0),
859 index: Arc::clone(&self.index),
860 table_batch: Arc::clone(&self.table_batch),
861 stream_key_indices: self.stream_key_indices.clone(),
862 stream_time_col_idx: self.stream_time_col_idx,
863 join_type: self.join_type,
864 schema: Arc::clone(&self.schema),
865 properties: self.properties.clone(),
866 converter: Arc::clone(&self.converter),
867 stream_field_count: self.stream_field_count,
868 }))
869 }
870
871 fn execute(
872 &self,
873 partition: usize,
874 context: Arc<TaskContext>,
875 ) -> Result<SendableRecordBatchStream> {
876 let input_stream = self.input.execute(partition, context)?;
877 let converter = Arc::clone(&self.converter);
878 let index = Arc::clone(&self.index);
879 let table_batch = Arc::clone(&self.table_batch);
880 let stream_key_indices = self.stream_key_indices.clone();
881 let stream_time_col_idx = self.stream_time_col_idx;
882 let join_type = self.join_type;
883 let schema = self.schema();
884 let stream_field_count = self.stream_field_count;
885
886 let output = input_stream.map(move |result| {
887 let batch = result?;
888 if batch.num_rows() == 0 {
889 return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
890 }
891 probe_versioned_batch(
892 &batch,
893 &converter,
894 &index,
895 &table_batch,
896 &stream_key_indices,
897 stream_time_col_idx,
898 join_type,
899 &schema,
900 stream_field_count,
901 )
902 });
903
904 Ok(Box::pin(RecordBatchStreamAdapter::new(
905 self.schema(),
906 output,
907 )))
908 }
909}
910
911impl datafusion::physical_plan::ExecutionPlanProperties for VersionedLookupJoinExec {
912 fn output_partitioning(&self) -> &Partitioning {
913 self.properties.output_partitioning()
914 }
915
916 fn output_ordering(&self) -> Option<&LexOrdering> {
917 self.properties.output_ordering()
918 }
919
920 fn boundedness(&self) -> Boundedness {
921 Boundedness::Unbounded {
922 requires_infinite_memory: false,
923 }
924 }
925
926 fn pipeline_behavior(&self) -> EmissionType {
927 EmissionType::Incremental
928 }
929
930 fn equivalence_properties(&self) -> &EquivalenceProperties {
931 self.properties.equivalence_properties()
932 }
933}
934
935#[allow(clippy::too_many_arguments)]
938fn probe_versioned_batch(
939 stream_batch: &RecordBatch,
940 converter: &RowConverter,
941 index: &VersionedIndex,
942 table_batch: &RecordBatch,
943 stream_key_indices: &[usize],
944 stream_time_col_idx: usize,
945 join_type: LookupJoinType,
946 output_schema: &SchemaRef,
947 stream_field_count: usize,
948) -> Result<RecordBatch> {
949 let key_cols: Vec<_> = stream_key_indices
950 .iter()
951 .map(|&i| stream_batch.column(i).clone())
952 .collect();
953 let rows = converter.convert_columns(&key_cols)?;
954 let event_timestamps =
955 extract_i64_timestamps(stream_batch.column(stream_time_col_idx).as_ref())?;
956
957 let num_rows = stream_batch.num_rows();
958 let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
959 let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
960
961 #[allow(clippy::cast_possible_truncation)]
962 for (row, event_ts_opt) in event_timestamps.iter().enumerate() {
963 if key_cols.iter().any(|c| c.is_null(row)) || event_ts_opt.is_none() {
965 if join_type == LookupJoinType::LeftOuter {
966 stream_indices.push(row as u32);
967 lookup_indices.push(None);
968 }
969 continue;
970 }
971
972 let key = rows.row(row);
973 let event_ts = event_ts_opt.unwrap();
974 match index.probe_at_time(key.as_ref(), event_ts) {
975 Some(table_row_idx) => {
976 stream_indices.push(row as u32);
977 lookup_indices.push(Some(table_row_idx));
978 }
979 None if join_type == LookupJoinType::LeftOuter => {
980 stream_indices.push(row as u32);
981 lookup_indices.push(None);
982 }
983 None => {}
984 }
985 }
986
987 if stream_indices.is_empty() {
988 return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
989 }
990
991 let take_stream = UInt32Array::from(stream_indices);
992 let mut columns = Vec::with_capacity(output_schema.fields().len());
993
994 for col in stream_batch.columns() {
995 columns.push(take(col.as_ref(), &take_stream, None)?);
996 }
997
998 let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
999 for col in table_batch.columns() {
1000 columns.push(take(col.as_ref(), &take_lookup, None)?);
1001 }
1002
1003 debug_assert_eq!(
1004 columns.len(),
1005 stream_field_count + table_batch.num_columns(),
1006 "output column count mismatch"
1007 );
1008
1009 Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
1010}
1011
1012pub struct PartialLookupJoinExec {
1018 input: Arc<dyn ExecutionPlan>,
1019 foyer_cache: Arc<FoyerMemoryCache>,
1020 stream_key_indices: Vec<usize>,
1021 join_type: LookupJoinType,
1022 schema: SchemaRef,
1023 properties: PlanProperties,
1024 converter: Arc<RowConverter>,
1027 stream_field_count: usize,
1028 lookup_schema: SchemaRef,
1029 source: Option<Arc<dyn LookupSourceDyn>>,
1030 fetch_semaphore: Arc<Semaphore>,
1031}
1032
1033impl PartialLookupJoinExec {
1034 pub fn try_new(
1040 input: Arc<dyn ExecutionPlan>,
1041 foyer_cache: Arc<FoyerMemoryCache>,
1042 stream_key_indices: Vec<usize>,
1043 key_sort_fields: Vec<SortField>,
1044 join_type: LookupJoinType,
1045 lookup_schema: SchemaRef,
1046 output_schema: SchemaRef,
1047 ) -> Result<Self> {
1048 Self::try_new_with_source(
1049 input,
1050 foyer_cache,
1051 stream_key_indices,
1052 key_sort_fields,
1053 join_type,
1054 lookup_schema,
1055 output_schema,
1056 None,
1057 Arc::new(Semaphore::new(64)),
1058 )
1059 }
1060
1061 #[allow(clippy::too_many_arguments)]
1067 pub fn try_new_with_source(
1068 input: Arc<dyn ExecutionPlan>,
1069 foyer_cache: Arc<FoyerMemoryCache>,
1070 stream_key_indices: Vec<usize>,
1071 key_sort_fields: Vec<SortField>,
1072 join_type: LookupJoinType,
1073 lookup_schema: SchemaRef,
1074 output_schema: SchemaRef,
1075 source: Option<Arc<dyn LookupSourceDyn>>,
1076 fetch_semaphore: Arc<Semaphore>,
1077 ) -> Result<Self> {
1078 let output_schema = if join_type == LookupJoinType::LeftOuter {
1079 let stream_count = input.schema().fields().len();
1080 let mut fields = output_schema.fields().to_vec();
1081 for f in &mut fields[stream_count..] {
1082 if !f.is_nullable() {
1083 *f = Arc::new(f.as_ref().clone().with_nullable(true));
1084 }
1085 }
1086 Arc::new(Schema::new_with_metadata(
1087 fields,
1088 output_schema.metadata().clone(),
1089 ))
1090 } else {
1091 output_schema
1092 };
1093
1094 let properties = PlanProperties::new(
1095 EquivalenceProperties::new(Arc::clone(&output_schema)),
1096 Partitioning::UnknownPartitioning(1),
1097 EmissionType::Incremental,
1098 Boundedness::Unbounded {
1099 requires_infinite_memory: false,
1100 },
1101 );
1102
1103 let stream_field_count = input.schema().fields().len();
1104 let converter = Arc::new(RowConverter::new(key_sort_fields)?);
1105
1106 Ok(Self {
1107 input,
1108 foyer_cache,
1109 stream_key_indices,
1110 join_type,
1111 schema: output_schema,
1112 properties,
1113 converter,
1114 stream_field_count,
1115 lookup_schema,
1116 source,
1117 fetch_semaphore,
1118 })
1119 }
1120}
1121
1122impl Debug for PartialLookupJoinExec {
1123 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1124 f.debug_struct("PartialLookupJoinExec")
1125 .field("join_type", &self.join_type)
1126 .field("stream_keys", &self.stream_key_indices)
1127 .field("cache_table_id", &self.foyer_cache.table_id())
1128 .finish_non_exhaustive()
1129 }
1130}
1131
1132impl DisplayAs for PartialLookupJoinExec {
1133 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1134 match t {
1135 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1136 write!(
1137 f,
1138 "PartialLookupJoinExec: type={}, stream_keys={:?}, cache_entries={}",
1139 self.join_type,
1140 self.stream_key_indices,
1141 self.foyer_cache.len(),
1142 )
1143 }
1144 DisplayFormatType::TreeRender => write!(f, "PartialLookupJoinExec"),
1145 }
1146 }
1147}
1148
1149impl ExecutionPlan for PartialLookupJoinExec {
1150 fn name(&self) -> &'static str {
1151 "PartialLookupJoinExec"
1152 }
1153
1154 fn as_any(&self) -> &dyn Any {
1155 self
1156 }
1157
1158 fn schema(&self) -> SchemaRef {
1159 Arc::clone(&self.schema)
1160 }
1161
1162 fn properties(&self) -> &PlanProperties {
1163 &self.properties
1164 }
1165
1166 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1167 vec![&self.input]
1168 }
1169
1170 fn with_new_children(
1171 self: Arc<Self>,
1172 mut children: Vec<Arc<dyn ExecutionPlan>>,
1173 ) -> Result<Arc<dyn ExecutionPlan>> {
1174 if children.len() != 1 {
1175 return Err(DataFusionError::Plan(
1176 "PartialLookupJoinExec requires exactly one child".into(),
1177 ));
1178 }
1179 Ok(Arc::new(Self {
1180 input: children.swap_remove(0),
1181 foyer_cache: Arc::clone(&self.foyer_cache),
1182 stream_key_indices: self.stream_key_indices.clone(),
1183 join_type: self.join_type,
1184 schema: Arc::clone(&self.schema),
1185 properties: self.properties.clone(),
1186 converter: Arc::clone(&self.converter),
1187 stream_field_count: self.stream_field_count,
1188 lookup_schema: Arc::clone(&self.lookup_schema),
1189 source: self.source.clone(),
1190 fetch_semaphore: Arc::clone(&self.fetch_semaphore),
1191 }))
1192 }
1193
1194 fn execute(
1195 &self,
1196 partition: usize,
1197 context: Arc<TaskContext>,
1198 ) -> Result<SendableRecordBatchStream> {
1199 let input_stream = self.input.execute(partition, context)?;
1200 let converter = Arc::clone(&self.converter);
1201 let foyer_cache = Arc::clone(&self.foyer_cache);
1202 let stream_key_indices = self.stream_key_indices.clone();
1203 let join_type = self.join_type;
1204 let schema = self.schema();
1205 let stream_field_count = self.stream_field_count;
1206 let lookup_schema = Arc::clone(&self.lookup_schema);
1207 let source = self.source.clone();
1208 let fetch_semaphore = Arc::clone(&self.fetch_semaphore);
1209
1210 let output = input_stream.then(move |result| {
1211 let foyer_cache = Arc::clone(&foyer_cache);
1212 let converter = Arc::clone(&converter);
1213 let stream_key_indices = stream_key_indices.clone();
1214 let schema = Arc::clone(&schema);
1215 let lookup_schema = Arc::clone(&lookup_schema);
1216 let source = source.clone();
1217 let fetch_semaphore = Arc::clone(&fetch_semaphore);
1218 async move {
1219 let batch = result?;
1220 if batch.num_rows() == 0 {
1221 return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
1222 }
1223 probe_partial_batch_with_fallback(
1224 &batch,
1225 &converter,
1226 &foyer_cache,
1227 &stream_key_indices,
1228 join_type,
1229 &schema,
1230 stream_field_count,
1231 &lookup_schema,
1232 source.as_deref(),
1233 &fetch_semaphore,
1234 )
1235 .await
1236 }
1237 });
1238
1239 Ok(Box::pin(RecordBatchStreamAdapter::new(
1240 self.schema(),
1241 output,
1242 )))
1243 }
1244}
1245
1246impl datafusion::physical_plan::ExecutionPlanProperties for PartialLookupJoinExec {
1247 fn output_partitioning(&self) -> &Partitioning {
1248 self.properties.output_partitioning()
1249 }
1250
1251 fn output_ordering(&self) -> Option<&LexOrdering> {
1252 self.properties.output_ordering()
1253 }
1254
1255 fn boundedness(&self) -> Boundedness {
1256 Boundedness::Unbounded {
1257 requires_infinite_memory: false,
1258 }
1259 }
1260
1261 fn pipeline_behavior(&self) -> EmissionType {
1262 EmissionType::Incremental
1263 }
1264
1265 fn equivalence_properties(&self) -> &EquivalenceProperties {
1266 self.properties.equivalence_properties()
1267 }
1268}
1269
1270#[allow(clippy::too_many_arguments)]
1274async fn probe_partial_batch_with_fallback(
1275 stream_batch: &RecordBatch,
1276 converter: &RowConverter,
1277 foyer_cache: &FoyerMemoryCache,
1278 stream_key_indices: &[usize],
1279 join_type: LookupJoinType,
1280 output_schema: &SchemaRef,
1281 stream_field_count: usize,
1282 lookup_schema: &SchemaRef,
1283 source: Option<&dyn LookupSourceDyn>,
1284 fetch_semaphore: &Semaphore,
1285) -> Result<RecordBatch> {
1286 let key_cols: Vec<_> = stream_key_indices
1287 .iter()
1288 .map(|&i| stream_batch.column(i).clone())
1289 .collect();
1290 let rows = converter.convert_columns(&key_cols)?;
1291
1292 let num_rows = stream_batch.num_rows();
1293 let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
1294 let mut lookup_batches: Vec<Option<RecordBatch>> = Vec::with_capacity(num_rows);
1295 let mut miss_keys: Vec<(usize, Vec<u8>)> = Vec::new();
1296
1297 #[allow(clippy::cast_possible_truncation)]
1298 for row in 0..num_rows {
1299 if key_cols.iter().any(|c| c.is_null(row)) {
1301 if join_type == LookupJoinType::LeftOuter {
1302 stream_indices.push(row as u32);
1303 lookup_batches.push(None);
1304 }
1305 continue;
1306 }
1307
1308 let key = rows.row(row);
1309 let result = foyer_cache.get_cached(key.as_ref());
1310 if let Some(batch) = result.into_batch() {
1311 stream_indices.push(row as u32);
1312 lookup_batches.push(Some(batch));
1313 } else {
1314 let idx = stream_indices.len();
1315 stream_indices.push(row as u32);
1316 lookup_batches.push(None);
1317 miss_keys.push((idx, key.as_ref().to_vec()));
1318 }
1319 }
1320
1321 if let Some(source) = source {
1323 if !miss_keys.is_empty() {
1324 let _permit = fetch_semaphore
1325 .acquire()
1326 .await
1327 .map_err(|_| DataFusionError::Internal("fetch semaphore closed".into()))?;
1328
1329 let key_refs: Vec<&[u8]> = miss_keys.iter().map(|(_, k)| k.as_slice()).collect();
1330 let source_results = source.query_batch(&key_refs, &[], &[]).await;
1331
1332 match source_results {
1333 Ok(results) => {
1334 for ((idx, key_bytes), maybe_batch) in miss_keys.iter().zip(results) {
1335 if let Some(batch) = maybe_batch {
1336 foyer_cache.insert(key_bytes, batch.clone());
1337 lookup_batches[*idx] = Some(batch);
1338 }
1339 }
1340 }
1341 Err(e) => {
1342 tracing::warn!(
1343 error = %e,
1344 missed_keys = miss_keys.len(),
1345 "partial lookup: source fallback failed, serving cache-only results"
1346 );
1347 }
1348 }
1349 }
1350 }
1351
1352 if join_type == LookupJoinType::Inner {
1354 let mut write = 0;
1355 for read in 0..stream_indices.len() {
1356 if lookup_batches[read].is_some() {
1357 stream_indices[write] = stream_indices[read];
1358 lookup_batches.swap(write, read);
1359 write += 1;
1360 }
1361 }
1362 stream_indices.truncate(write);
1363 lookup_batches.truncate(write);
1364 }
1365
1366 if stream_indices.is_empty() {
1367 return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
1368 }
1369
1370 let take_indices = UInt32Array::from(stream_indices);
1371 let mut columns = Vec::with_capacity(output_schema.fields().len());
1372
1373 for col in stream_batch.columns() {
1374 columns.push(take(col.as_ref(), &take_indices, None)?);
1375 }
1376
1377 let lookup_col_count = lookup_schema.fields().len();
1378 for col_idx in 0..lookup_col_count {
1379 let arrays: Vec<_> = lookup_batches
1380 .iter()
1381 .map(|opt| match opt {
1382 Some(b) => b.column(col_idx).clone(),
1383 None => arrow_array::new_null_array(lookup_schema.field(col_idx).data_type(), 1),
1384 })
1385 .collect();
1386 let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(AsRef::as_ref).collect();
1387 columns.push(arrow::compute::concat(&refs)?);
1388 }
1389
1390 debug_assert_eq!(
1391 columns.len(),
1392 stream_field_count + lookup_col_count,
1393 "output column count mismatch"
1394 );
1395
1396 Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
1397}
1398
1399pub struct LookupJoinExtensionPlanner {
1405 registry: Arc<LookupTableRegistry>,
1406}
1407
1408impl LookupJoinExtensionPlanner {
1409 pub fn new(registry: Arc<LookupTableRegistry>) -> Self {
1411 Self { registry }
1412 }
1413}
1414
1415#[async_trait]
1416impl ExtensionPlanner for LookupJoinExtensionPlanner {
1417 #[allow(clippy::too_many_lines)]
1418 async fn plan_extension(
1419 &self,
1420 _planner: &dyn PhysicalPlanner,
1421 node: &dyn UserDefinedLogicalNode,
1422 _logical_inputs: &[&LogicalPlan],
1423 physical_inputs: &[Arc<dyn ExecutionPlan>],
1424 session_state: &SessionState,
1425 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1426 let Some(lookup_node) = node.as_any().downcast_ref::<LookupJoinNode>() else {
1427 return Ok(None);
1428 };
1429
1430 let entry = self
1431 .registry
1432 .get_entry(lookup_node.lookup_table_name())
1433 .ok_or_else(|| {
1434 DataFusionError::Plan(format!(
1435 "lookup table '{}' not registered",
1436 lookup_node.lookup_table_name()
1437 ))
1438 })?;
1439
1440 let input = Arc::clone(&physical_inputs[0]);
1441 let stream_schema = input.schema();
1442
1443 match entry {
1444 RegisteredLookup::Partial(partial_state) => {
1445 let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1446
1447 let mut output_fields = stream_schema.fields().to_vec();
1448 output_fields.extend(partial_state.schema.fields().iter().cloned());
1449 let output_schema = Arc::new(Schema::new(output_fields));
1450
1451 let exec = PartialLookupJoinExec::try_new_with_source(
1452 input,
1453 Arc::clone(&partial_state.foyer_cache),
1454 stream_key_indices,
1455 partial_state.key_sort_fields.clone(),
1456 lookup_node.join_type(),
1457 Arc::clone(&partial_state.schema),
1458 output_schema,
1459 partial_state.source.clone(),
1460 Arc::clone(&partial_state.fetch_semaphore),
1461 )?;
1462 Ok(Some(Arc::new(exec)))
1463 }
1464 RegisteredLookup::Snapshot(snapshot) => {
1465 let lookup_schema = snapshot.batch.schema();
1466 let lookup_key_indices = resolve_lookup_keys(lookup_node, &lookup_schema)?;
1467
1468 let lookup_batch = if lookup_node.pushdown_predicates().is_empty()
1469 || snapshot.batch.num_rows() == 0
1470 {
1471 snapshot.batch.clone()
1472 } else {
1473 apply_pushdown_predicates(
1474 &snapshot.batch,
1475 lookup_node.pushdown_predicates(),
1476 session_state,
1477 )?
1478 };
1479
1480 let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1481
1482 for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1484 let st = stream_schema.field(*si).data_type();
1485 let lt = lookup_schema.field(*li).data_type();
1486 if st != lt {
1487 return Err(DataFusionError::Plan(format!(
1488 "Lookup join key type mismatch: stream '{}' is {st:?} \
1489 but lookup '{}' is {lt:?}",
1490 stream_schema.field(*si).name(),
1491 lookup_schema.field(*li).name(),
1492 )));
1493 }
1494 }
1495
1496 let mut output_fields = stream_schema.fields().to_vec();
1497 output_fields.extend(lookup_batch.schema().fields().iter().cloned());
1498 let output_schema = Arc::new(Schema::new(output_fields));
1499
1500 let exec = LookupJoinExec::try_new(
1501 input,
1502 lookup_batch,
1503 stream_key_indices,
1504 lookup_key_indices,
1505 lookup_node.join_type(),
1506 output_schema,
1507 )?;
1508
1509 Ok(Some(Arc::new(exec)))
1510 }
1511 RegisteredLookup::Versioned(versioned_state) => {
1512 let table_schema = versioned_state.batch.schema();
1513 let lookup_key_indices = resolve_lookup_keys(lookup_node, &table_schema)?;
1514 let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1515
1516 for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1518 let st = stream_schema.field(*si).data_type();
1519 let lt = table_schema.field(*li).data_type();
1520 if st != lt {
1521 return Err(DataFusionError::Plan(format!(
1522 "Temporal join key type mismatch: stream '{}' is {st:?} \
1523 but table '{}' is {lt:?}",
1524 stream_schema.field(*si).name(),
1525 table_schema.field(*li).name(),
1526 )));
1527 }
1528 }
1529
1530 let stream_time_col_idx = stream_schema
1531 .index_of(&versioned_state.stream_time_column)
1532 .map_err(|_| {
1533 DataFusionError::Plan(format!(
1534 "stream time column '{}' not found in stream schema",
1535 versioned_state.stream_time_column
1536 ))
1537 })?;
1538
1539 let key_sort_fields: Vec<SortField> = lookup_key_indices
1540 .iter()
1541 .map(|&i| SortField::new(table_schema.field(i).data_type().clone()))
1542 .collect();
1543
1544 let mut output_fields = stream_schema.fields().to_vec();
1545 output_fields.extend(table_schema.fields().iter().cloned());
1546 let output_schema = Arc::new(Schema::new(output_fields));
1547
1548 let exec = VersionedLookupJoinExec::try_new(
1549 input,
1550 versioned_state.batch.clone(),
1551 Arc::clone(&versioned_state.index),
1552 stream_key_indices,
1553 stream_time_col_idx,
1554 lookup_node.join_type(),
1555 output_schema,
1556 key_sort_fields,
1557 )?;
1558
1559 Ok(Some(Arc::new(exec)))
1560 }
1561 }
1562 }
1563}
1564
1565fn apply_pushdown_predicates(
1568 batch: &RecordBatch,
1569 predicates: &[Expr],
1570 session_state: &SessionState,
1571) -> Result<RecordBatch> {
1572 use arrow::compute::filter_record_batch;
1573 use datafusion::physical_expr::create_physical_expr;
1574
1575 let schema = batch.schema();
1576 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
1577
1578 let mut mask = None::<arrow_array::BooleanArray>;
1579 for pred in predicates {
1580 let phys_expr = create_physical_expr(pred, &df_schema, session_state.execution_props())?;
1581 let result = phys_expr.evaluate(batch)?;
1582 let bool_arr = result
1583 .into_array(batch.num_rows())?
1584 .as_any()
1585 .downcast_ref::<arrow_array::BooleanArray>()
1586 .ok_or_else(|| {
1587 DataFusionError::Internal("pushdown predicate did not evaluate to boolean".into())
1588 })?
1589 .clone();
1590 mask = Some(match mask {
1591 Some(existing) => arrow::compute::and(&existing, &bool_arr)?,
1592 None => bool_arr,
1593 });
1594 }
1595
1596 match mask {
1597 Some(m) => Ok(filter_record_batch(batch, &m)?),
1598 None => Ok(batch.clone()),
1599 }
1600}
1601
1602fn resolve_stream_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1603 node.join_keys()
1604 .iter()
1605 .map(|pair| match &pair.stream_expr {
1606 Expr::Column(col) => schema.index_of(&col.name).map_err(|_| {
1607 DataFusionError::Plan(format!(
1608 "stream key column '{}' not found in physical schema",
1609 col.name
1610 ))
1611 }),
1612 other => Err(DataFusionError::NotImplemented(format!(
1613 "lookup join requires column references as stream keys, got: {other}"
1614 ))),
1615 })
1616 .collect()
1617}
1618
1619fn resolve_lookup_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1620 node.join_keys()
1621 .iter()
1622 .map(|pair| {
1623 schema.index_of(&pair.lookup_column).map_err(|_| {
1624 DataFusionError::Plan(format!(
1625 "lookup key column '{}' not found in lookup table schema",
1626 pair.lookup_column
1627 ))
1628 })
1629 })
1630 .collect()
1631}
1632
1633#[cfg(test)]
1636mod tests {
1637 use super::*;
1638 use arrow_array::{Array, Float64Array, Int64Array, StringArray};
1639 use arrow_schema::{DataType, Field};
1640 use datafusion::physical_plan::stream::RecordBatchStreamAdapter as TestStreamAdapter;
1641 use futures::TryStreamExt;
1642
1643 fn batch_exec(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
1645 let schema = batch.schema();
1646 let batches = vec![batch];
1647 let stream_schema = Arc::clone(&schema);
1648 Arc::new(StreamExecStub {
1649 schema,
1650 batches: std::sync::Mutex::new(Some(batches)),
1651 stream_schema,
1652 })
1653 }
1654
1655 struct StreamExecStub {
1657 schema: SchemaRef,
1658 batches: std::sync::Mutex<Option<Vec<RecordBatch>>>,
1659 stream_schema: SchemaRef,
1660 }
1661
1662 impl Debug for StreamExecStub {
1663 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1664 write!(f, "StreamExecStub")
1665 }
1666 }
1667
1668 impl DisplayAs for StreamExecStub {
1669 fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1670 write!(f, "StreamExecStub")
1671 }
1672 }
1673
1674 impl ExecutionPlan for StreamExecStub {
1675 fn name(&self) -> &'static str {
1676 "StreamExecStub"
1677 }
1678 fn as_any(&self) -> &dyn Any {
1679 self
1680 }
1681 fn schema(&self) -> SchemaRef {
1682 Arc::clone(&self.schema)
1683 }
1684 fn properties(&self) -> &PlanProperties {
1685 Box::leak(Box::new(PlanProperties::new(
1687 EquivalenceProperties::new(Arc::clone(&self.schema)),
1688 Partitioning::UnknownPartitioning(1),
1689 EmissionType::Final,
1690 Boundedness::Bounded,
1691 )))
1692 }
1693 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1694 vec![]
1695 }
1696 fn with_new_children(
1697 self: Arc<Self>,
1698 _: Vec<Arc<dyn ExecutionPlan>>,
1699 ) -> Result<Arc<dyn ExecutionPlan>> {
1700 Ok(self)
1701 }
1702 fn execute(&self, _: usize, _: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
1703 let batches = self.batches.lock().unwrap().take().unwrap_or_default();
1704 let schema = Arc::clone(&self.stream_schema);
1705 let stream = futures::stream::iter(batches.into_iter().map(Ok));
1706 Ok(Box::pin(TestStreamAdapter::new(schema, stream)))
1707 }
1708 }
1709
1710 impl datafusion::physical_plan::ExecutionPlanProperties for StreamExecStub {
1711 fn output_partitioning(&self) -> &Partitioning {
1712 self.properties().output_partitioning()
1713 }
1714 fn output_ordering(&self) -> Option<&LexOrdering> {
1715 None
1716 }
1717 fn boundedness(&self) -> Boundedness {
1718 Boundedness::Bounded
1719 }
1720 fn pipeline_behavior(&self) -> EmissionType {
1721 EmissionType::Final
1722 }
1723 fn equivalence_properties(&self) -> &EquivalenceProperties {
1724 self.properties().equivalence_properties()
1725 }
1726 }
1727
1728 fn orders_schema() -> SchemaRef {
1729 Arc::new(Schema::new(vec![
1730 Field::new("order_id", DataType::Int64, false),
1731 Field::new("customer_id", DataType::Int64, false),
1732 Field::new("amount", DataType::Float64, false),
1733 ]))
1734 }
1735
1736 fn customers_schema() -> SchemaRef {
1737 Arc::new(Schema::new(vec![
1738 Field::new("id", DataType::Int64, false),
1739 Field::new("name", DataType::Utf8, true),
1740 ]))
1741 }
1742
1743 fn output_schema() -> SchemaRef {
1744 Arc::new(Schema::new(vec![
1745 Field::new("order_id", DataType::Int64, false),
1746 Field::new("customer_id", DataType::Int64, false),
1747 Field::new("amount", DataType::Float64, false),
1748 Field::new("id", DataType::Int64, false),
1749 Field::new("name", DataType::Utf8, true),
1750 ]))
1751 }
1752
1753 fn customers_batch() -> RecordBatch {
1754 RecordBatch::try_new(
1755 customers_schema(),
1756 vec![
1757 Arc::new(Int64Array::from(vec![1, 2, 3])),
1758 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1759 ],
1760 )
1761 .unwrap()
1762 }
1763
1764 fn orders_batch() -> RecordBatch {
1765 RecordBatch::try_new(
1766 orders_schema(),
1767 vec![
1768 Arc::new(Int64Array::from(vec![100, 101, 102, 103])),
1769 Arc::new(Int64Array::from(vec![1, 2, 99, 3])),
1770 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0, 40.0])),
1771 ],
1772 )
1773 .unwrap()
1774 }
1775
1776 fn make_exec(join_type: LookupJoinType) -> LookupJoinExec {
1777 let input = batch_exec(orders_batch());
1778 LookupJoinExec::try_new(
1779 input,
1780 customers_batch(),
1781 vec![1], vec![0], join_type,
1784 output_schema(),
1785 )
1786 .unwrap()
1787 }
1788
1789 #[tokio::test]
1790 async fn inner_join_filters_non_matches() {
1791 let exec = make_exec(LookupJoinType::Inner);
1792 let ctx = Arc::new(TaskContext::default());
1793 let stream = exec.execute(0, ctx).unwrap();
1794 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1795
1796 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1797 assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
1798
1799 let names = batches[0]
1800 .column(4)
1801 .as_any()
1802 .downcast_ref::<StringArray>()
1803 .unwrap();
1804 assert_eq!(names.value(0), "Alice");
1805 assert_eq!(names.value(1), "Bob");
1806 assert_eq!(names.value(2), "Charlie");
1807 }
1808
1809 #[tokio::test]
1810 async fn left_outer_preserves_non_matches() {
1811 let exec = make_exec(LookupJoinType::LeftOuter);
1812 let ctx = Arc::new(TaskContext::default());
1813 let stream = exec.execute(0, ctx).unwrap();
1814 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1815
1816 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1817 assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
1818
1819 let names = batches[0]
1820 .column(4)
1821 .as_any()
1822 .downcast_ref::<StringArray>()
1823 .unwrap();
1824 assert!(names.is_null(2));
1826 }
1827
1828 #[tokio::test]
1829 async fn empty_lookup_inner_produces_no_rows() {
1830 let empty = RecordBatch::new_empty(customers_schema());
1831 let input = batch_exec(orders_batch());
1832 let exec = LookupJoinExec::try_new(
1833 input,
1834 empty,
1835 vec![1],
1836 vec![0],
1837 LookupJoinType::Inner,
1838 output_schema(),
1839 )
1840 .unwrap();
1841
1842 let ctx = Arc::new(TaskContext::default());
1843 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1844 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1845 assert_eq!(total, 0);
1846 }
1847
1848 #[tokio::test]
1849 async fn empty_lookup_left_outer_preserves_all_stream_rows() {
1850 let empty = RecordBatch::new_empty(customers_schema());
1851 let input = batch_exec(orders_batch());
1852 let exec = LookupJoinExec::try_new(
1853 input,
1854 empty,
1855 vec![1],
1856 vec![0],
1857 LookupJoinType::LeftOuter,
1858 output_schema(),
1859 )
1860 .unwrap();
1861
1862 let ctx = Arc::new(TaskContext::default());
1863 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1864 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1865 assert_eq!(total, 4);
1866 }
1867
1868 #[tokio::test]
1869 async fn duplicate_keys_produce_multiple_rows() {
1870 let lookup = RecordBatch::try_new(
1871 customers_schema(),
1872 vec![
1873 Arc::new(Int64Array::from(vec![1, 1])),
1874 Arc::new(StringArray::from(vec!["Alice-A", "Alice-B"])),
1875 ],
1876 )
1877 .unwrap();
1878
1879 let stream = RecordBatch::try_new(
1880 orders_schema(),
1881 vec![
1882 Arc::new(Int64Array::from(vec![100])),
1883 Arc::new(Int64Array::from(vec![1])),
1884 Arc::new(Float64Array::from(vec![10.0])),
1885 ],
1886 )
1887 .unwrap();
1888
1889 let input = batch_exec(stream);
1890 let exec = LookupJoinExec::try_new(
1891 input,
1892 lookup,
1893 vec![1],
1894 vec![0],
1895 LookupJoinType::Inner,
1896 output_schema(),
1897 )
1898 .unwrap();
1899
1900 let ctx = Arc::new(TaskContext::default());
1901 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1902 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1903 assert_eq!(total, 2, "one stream row matched two lookup rows");
1904 }
1905
1906 #[test]
1907 fn with_new_children_preserves_state() {
1908 let exec = Arc::new(make_exec(LookupJoinType::Inner));
1909 let expected_schema = exec.schema();
1910 let children = exec.children().into_iter().cloned().collect();
1911 let rebuilt = exec.with_new_children(children).unwrap();
1912 assert_eq!(rebuilt.schema(), expected_schema);
1913 assert_eq!(rebuilt.name(), "LookupJoinExec");
1914 }
1915
1916 #[test]
1917 fn display_format() {
1918 let exec = make_exec(LookupJoinType::Inner);
1919 let s = format!("{exec:?}");
1920 assert!(s.contains("LookupJoinExec"));
1921 assert!(s.contains("lookup_rows: 3"));
1922 }
1923
1924 #[test]
1925 fn registry_crud() {
1926 let reg = LookupTableRegistry::new();
1927 assert!(reg.get("customers").is_none());
1928
1929 reg.register(
1930 "customers",
1931 LookupSnapshot {
1932 batch: customers_batch(),
1933 key_columns: vec!["id".into()],
1934 },
1935 );
1936 assert!(reg.get("customers").is_some());
1937 assert!(reg.get("CUSTOMERS").is_some(), "case-insensitive");
1938
1939 reg.unregister("customers");
1940 assert!(reg.get("customers").is_none());
1941 }
1942
1943 #[test]
1944 fn registry_update_replaces() {
1945 let reg = LookupTableRegistry::new();
1946 reg.register(
1947 "t",
1948 LookupSnapshot {
1949 batch: RecordBatch::new_empty(customers_schema()),
1950 key_columns: vec![],
1951 },
1952 );
1953 assert_eq!(reg.get("t").unwrap().batch.num_rows(), 0);
1954
1955 reg.register(
1956 "t",
1957 LookupSnapshot {
1958 batch: customers_batch(),
1959 key_columns: vec![],
1960 },
1961 );
1962 assert_eq!(reg.get("t").unwrap().batch.num_rows(), 3);
1963 }
1964
1965 #[test]
1966 fn pushdown_predicates_filter_snapshot() {
1967 use datafusion::logical_expr::{col, lit};
1968
1969 let batch = customers_batch(); let ctx = datafusion::prelude::SessionContext::new();
1971 let state = ctx.state();
1972
1973 let predicates = vec![col("id").gt(lit(1i64))];
1975 let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
1976 assert_eq!(filtered.num_rows(), 2);
1977
1978 let ids = filtered
1979 .column(0)
1980 .as_any()
1981 .downcast_ref::<Int64Array>()
1982 .unwrap();
1983 assert_eq!(ids.value(0), 2);
1984 assert_eq!(ids.value(1), 3);
1985 }
1986
1987 #[test]
1988 fn pushdown_predicates_empty_passes_all() {
1989 let batch = customers_batch();
1990 let ctx = datafusion::prelude::SessionContext::new();
1991 let state = ctx.state();
1992
1993 let filtered = apply_pushdown_predicates(&batch, &[], &state).unwrap();
1994 assert_eq!(filtered.num_rows(), 3);
1995 }
1996
1997 #[test]
1998 fn pushdown_predicates_multiple_and() {
1999 use datafusion::logical_expr::{col, lit};
2000
2001 let batch = customers_batch(); let ctx = datafusion::prelude::SessionContext::new();
2003 let state = ctx.state();
2004
2005 let predicates = vec![col("id").gt_eq(lit(2i64)), col("id").lt(lit(3i64))];
2007 let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
2008 assert_eq!(filtered.num_rows(), 1);
2009 }
2010
2011 use laminar_core::lookup::foyer_cache::FoyerMemoryCacheConfig;
2014
2015 fn make_foyer_cache() -> Arc<FoyerMemoryCache> {
2016 Arc::new(FoyerMemoryCache::new(
2017 1,
2018 FoyerMemoryCacheConfig {
2019 capacity: 64,
2020 shards: 4,
2021 },
2022 ))
2023 }
2024
2025 fn customer_row(id: i64, name: &str) -> RecordBatch {
2026 RecordBatch::try_new(
2027 customers_schema(),
2028 vec![
2029 Arc::new(Int64Array::from(vec![id])),
2030 Arc::new(StringArray::from(vec![name])),
2031 ],
2032 )
2033 .unwrap()
2034 }
2035
2036 fn warm_cache(cache: &FoyerMemoryCache) {
2037 let converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap();
2038
2039 for (id, name) in [(1, "Alice"), (2, "Bob"), (3, "Charlie")] {
2040 let key_col = Int64Array::from(vec![id]);
2041 let rows = converter.convert_columns(&[Arc::new(key_col)]).unwrap();
2042 let key = rows.row(0);
2043 cache.insert(key.as_ref(), customer_row(id, name));
2044 }
2045 }
2046
2047 fn make_partial_exec(join_type: LookupJoinType) -> PartialLookupJoinExec {
2048 let cache = make_foyer_cache();
2049 warm_cache(&cache);
2050
2051 let input = batch_exec(orders_batch());
2052 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2053
2054 PartialLookupJoinExec::try_new(
2055 input,
2056 cache,
2057 vec![1], key_sort_fields,
2059 join_type,
2060 customers_schema(),
2061 output_schema(),
2062 )
2063 .unwrap()
2064 }
2065
2066 #[tokio::test]
2067 async fn partial_inner_join_filters_non_matches() {
2068 let exec = make_partial_exec(LookupJoinType::Inner);
2069 let ctx = Arc::new(TaskContext::default());
2070 let stream = exec.execute(0, ctx).unwrap();
2071 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2072
2073 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2074 assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
2075
2076 let names = batches[0]
2077 .column(4)
2078 .as_any()
2079 .downcast_ref::<StringArray>()
2080 .unwrap();
2081 assert_eq!(names.value(0), "Alice");
2082 assert_eq!(names.value(1), "Bob");
2083 assert_eq!(names.value(2), "Charlie");
2084 }
2085
2086 #[tokio::test]
2087 async fn partial_left_outer_preserves_non_matches() {
2088 let exec = make_partial_exec(LookupJoinType::LeftOuter);
2089 let ctx = Arc::new(TaskContext::default());
2090 let stream = exec.execute(0, ctx).unwrap();
2091 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2092
2093 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2094 assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
2095
2096 let names = batches[0]
2097 .column(4)
2098 .as_any()
2099 .downcast_ref::<StringArray>()
2100 .unwrap();
2101 assert!(names.is_null(2), "customer_id=99 should have null name");
2102 }
2103
2104 #[tokio::test]
2105 async fn partial_empty_cache_inner_produces_no_rows() {
2106 let cache = make_foyer_cache();
2107 let input = batch_exec(orders_batch());
2108 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2109
2110 let exec = PartialLookupJoinExec::try_new(
2111 input,
2112 cache,
2113 vec![1],
2114 key_sort_fields,
2115 LookupJoinType::Inner,
2116 customers_schema(),
2117 output_schema(),
2118 )
2119 .unwrap();
2120
2121 let ctx = Arc::new(TaskContext::default());
2122 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2123 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2124 assert_eq!(total, 0);
2125 }
2126
2127 #[tokio::test]
2128 async fn partial_empty_cache_left_outer_preserves_all() {
2129 let cache = make_foyer_cache();
2130 let input = batch_exec(orders_batch());
2131 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2132
2133 let exec = PartialLookupJoinExec::try_new(
2134 input,
2135 cache,
2136 vec![1],
2137 key_sort_fields,
2138 LookupJoinType::LeftOuter,
2139 customers_schema(),
2140 output_schema(),
2141 )
2142 .unwrap();
2143
2144 let ctx = Arc::new(TaskContext::default());
2145 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2146 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2147 assert_eq!(total, 4);
2148 }
2149
2150 #[test]
2151 fn partial_with_new_children_preserves_state() {
2152 let exec = Arc::new(make_partial_exec(LookupJoinType::Inner));
2153 let expected_schema = exec.schema();
2154 let children = exec.children().into_iter().cloned().collect();
2155 let rebuilt = exec.with_new_children(children).unwrap();
2156 assert_eq!(rebuilt.schema(), expected_schema);
2157 assert_eq!(rebuilt.name(), "PartialLookupJoinExec");
2158 }
2159
2160 #[test]
2161 fn partial_display_format() {
2162 let exec = make_partial_exec(LookupJoinType::Inner);
2163 let s = format!("{exec:?}");
2164 assert!(s.contains("PartialLookupJoinExec"));
2165 assert!(s.contains("cache_table_id: 1"));
2166 }
2167
2168 #[test]
2169 fn registry_partial_entry() {
2170 let reg = LookupTableRegistry::new();
2171 let cache = make_foyer_cache();
2172 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2173
2174 reg.register_partial(
2175 "customers",
2176 PartialLookupState {
2177 foyer_cache: cache,
2178 schema: customers_schema(),
2179 key_columns: vec!["id".into()],
2180 key_sort_fields,
2181 source: None,
2182 fetch_semaphore: Arc::new(Semaphore::new(64)),
2183 },
2184 );
2185
2186 assert!(reg.get("customers").is_none());
2187
2188 let entry = reg.get_entry("customers");
2189 assert!(entry.is_some());
2190 assert!(matches!(entry.unwrap(), RegisteredLookup::Partial(_)));
2191 }
2192
2193 #[tokio::test]
2194 async fn partial_source_fallback_on_miss() {
2195 use laminar_core::lookup::source::LookupError;
2196 use laminar_core::lookup::source::LookupSourceDyn;
2197
2198 struct TestSource;
2199
2200 #[async_trait]
2201 impl LookupSourceDyn for TestSource {
2202 async fn query_batch(
2203 &self,
2204 keys: &[&[u8]],
2205 _predicates: &[laminar_core::lookup::predicate::Predicate],
2206 _projection: &[laminar_core::lookup::source::ColumnId],
2207 ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2208 Ok(keys
2209 .iter()
2210 .map(|_| Some(customer_row(99, "FromSource")))
2211 .collect())
2212 }
2213
2214 fn schema(&self) -> SchemaRef {
2215 customers_schema()
2216 }
2217 }
2218
2219 let cache = make_foyer_cache();
2220 warm_cache(&cache);
2222
2223 let orders = RecordBatch::try_new(
2224 orders_schema(),
2225 vec![
2226 Arc::new(Int64Array::from(vec![200])),
2227 Arc::new(Int64Array::from(vec![99])), Arc::new(Float64Array::from(vec![50.0])),
2229 ],
2230 )
2231 .unwrap();
2232
2233 let input = batch_exec(orders);
2234 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2235 let source: Arc<dyn LookupSourceDyn> = Arc::new(TestSource);
2236
2237 let exec = PartialLookupJoinExec::try_new_with_source(
2238 input,
2239 cache,
2240 vec![1],
2241 key_sort_fields,
2242 LookupJoinType::Inner,
2243 customers_schema(),
2244 output_schema(),
2245 Some(source),
2246 Arc::new(Semaphore::new(64)),
2247 )
2248 .unwrap();
2249
2250 let ctx = Arc::new(TaskContext::default());
2251 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2252 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2253 assert_eq!(total, 1, "source fallback should produce 1 row");
2254
2255 let names = batches[0]
2256 .column(4)
2257 .as_any()
2258 .downcast_ref::<StringArray>()
2259 .unwrap();
2260 assert_eq!(names.value(0), "FromSource");
2261 }
2262
2263 #[tokio::test]
2264 async fn partial_source_error_graceful_degradation() {
2265 use laminar_core::lookup::source::LookupError;
2266 use laminar_core::lookup::source::LookupSourceDyn;
2267
2268 struct FailingSource;
2269
2270 #[async_trait]
2271 impl LookupSourceDyn for FailingSource {
2272 async fn query_batch(
2273 &self,
2274 _keys: &[&[u8]],
2275 _predicates: &[laminar_core::lookup::predicate::Predicate],
2276 _projection: &[laminar_core::lookup::source::ColumnId],
2277 ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2278 Err(LookupError::Internal("source unavailable".into()))
2279 }
2280
2281 fn schema(&self) -> SchemaRef {
2282 customers_schema()
2283 }
2284 }
2285
2286 let cache = make_foyer_cache();
2287 let input = batch_exec(orders_batch());
2288 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2289 let source: Arc<dyn LookupSourceDyn> = Arc::new(FailingSource);
2290
2291 let exec = PartialLookupJoinExec::try_new_with_source(
2292 input,
2293 cache,
2294 vec![1],
2295 key_sort_fields,
2296 LookupJoinType::LeftOuter,
2297 customers_schema(),
2298 output_schema(),
2299 Some(source),
2300 Arc::new(Semaphore::new(64)),
2301 )
2302 .unwrap();
2303
2304 let ctx = Arc::new(TaskContext::default());
2305 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2306 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2307 assert_eq!(total, 4);
2309 }
2310
2311 #[test]
2312 fn registry_snapshot_entry_via_get_entry() {
2313 let reg = LookupTableRegistry::new();
2314 reg.register(
2315 "t",
2316 LookupSnapshot {
2317 batch: customers_batch(),
2318 key_columns: vec!["id".into()],
2319 },
2320 );
2321
2322 let entry = reg.get_entry("t");
2323 assert!(matches!(entry.unwrap(), RegisteredLookup::Snapshot(_)));
2324 assert!(reg.get("t").is_some());
2325 }
2326
2327 fn nullable_orders_schema() -> SchemaRef {
2330 Arc::new(Schema::new(vec![
2331 Field::new("order_id", DataType::Int64, false),
2332 Field::new("customer_id", DataType::Int64, true), Field::new("amount", DataType::Float64, false),
2334 ]))
2335 }
2336
2337 fn nullable_output_schema(join_type: LookupJoinType) -> SchemaRef {
2338 let lookup_nullable = join_type == LookupJoinType::LeftOuter;
2339 Arc::new(Schema::new(vec![
2340 Field::new("order_id", DataType::Int64, false),
2341 Field::new("customer_id", DataType::Int64, true),
2342 Field::new("amount", DataType::Float64, false),
2343 Field::new("id", DataType::Int64, lookup_nullable),
2344 Field::new("name", DataType::Utf8, true),
2345 ]))
2346 }
2347
2348 #[tokio::test]
2349 async fn null_key_inner_join_no_match() {
2350 let stream_batch = RecordBatch::try_new(
2352 nullable_orders_schema(),
2353 vec![
2354 Arc::new(Int64Array::from(vec![100, 101, 102])),
2355 Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2356 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2357 ],
2358 )
2359 .unwrap();
2360
2361 let input = batch_exec(stream_batch);
2362 let exec = LookupJoinExec::try_new(
2363 input,
2364 customers_batch(),
2365 vec![1],
2366 vec![0],
2367 LookupJoinType::Inner,
2368 nullable_output_schema(LookupJoinType::Inner),
2369 )
2370 .unwrap();
2371
2372 let ctx = Arc::new(TaskContext::default());
2373 let stream = exec.execute(0, ctx).unwrap();
2374 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2375
2376 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2377 assert_eq!(total, 2, "NULL key row should not match in inner join");
2379 }
2380
2381 #[tokio::test]
2382 async fn null_key_left_outer_produces_nulls() {
2383 let stream_batch = RecordBatch::try_new(
2385 nullable_orders_schema(),
2386 vec![
2387 Arc::new(Int64Array::from(vec![100, 101, 102])),
2388 Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2389 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2390 ],
2391 )
2392 .unwrap();
2393
2394 let input = batch_exec(stream_batch);
2395 let out_schema = nullable_output_schema(LookupJoinType::LeftOuter);
2396 let exec = LookupJoinExec::try_new(
2397 input,
2398 customers_batch(),
2399 vec![1],
2400 vec![0],
2401 LookupJoinType::LeftOuter,
2402 out_schema,
2403 )
2404 .unwrap();
2405
2406 let ctx = Arc::new(TaskContext::default());
2407 let stream = exec.execute(0, ctx).unwrap();
2408 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2409
2410 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2411 assert_eq!(total, 3, "all rows preserved in left outer");
2413
2414 let names = batches[0]
2415 .column(4)
2416 .as_any()
2417 .downcast_ref::<StringArray>()
2418 .unwrap();
2419 assert_eq!(names.value(0), "Alice");
2420 assert!(
2421 names.is_null(1),
2422 "NULL key row should have null lookup name"
2423 );
2424 assert_eq!(names.value(2), "Bob");
2425 }
2426
2427 fn versioned_table_batch() -> RecordBatch {
2430 let schema = Arc::new(Schema::new(vec![
2433 Field::new("currency", DataType::Utf8, false),
2434 Field::new("valid_from", DataType::Int64, false),
2435 Field::new("rate", DataType::Float64, false),
2436 ]));
2437 RecordBatch::try_new(
2438 schema,
2439 vec![
2440 Arc::new(StringArray::from(vec!["USD", "USD", "EUR", "EUR", "EUR"])),
2441 Arc::new(Int64Array::from(vec![100, 200, 100, 150, 300])),
2442 Arc::new(Float64Array::from(vec![1.0, 1.1, 0.85, 0.90, 0.88])),
2443 ],
2444 )
2445 .unwrap()
2446 }
2447
2448 fn stream_batch_with_time() -> RecordBatch {
2449 let schema = Arc::new(Schema::new(vec![
2450 Field::new("order_id", DataType::Int64, false),
2451 Field::new("currency", DataType::Utf8, false),
2452 Field::new("event_ts", DataType::Int64, false),
2453 ]));
2454 RecordBatch::try_new(
2455 schema,
2456 vec![
2457 Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
2458 Arc::new(StringArray::from(vec!["USD", "EUR", "USD", "EUR"])),
2459 Arc::new(Int64Array::from(vec![150, 160, 250, 50])),
2460 ],
2461 )
2462 .unwrap()
2463 }
2464
2465 #[test]
2466 fn test_versioned_index_build_and_probe() {
2467 let batch = versioned_table_batch();
2468 let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2469
2470 let key_sf = vec![SortField::new(DataType::Utf8)];
2473 let converter = RowConverter::new(key_sf).unwrap();
2474 let usd_col = Arc::new(StringArray::from(vec!["USD"]));
2475 let usd_rows = converter.convert_columns(&[usd_col]).unwrap();
2476 let usd_key = usd_rows.row(0);
2477
2478 let result = index.probe_at_time(usd_key.as_ref(), 150);
2479 assert!(result.is_some());
2480 assert_eq!(result.unwrap(), 0);
2482
2483 let result = index.probe_at_time(usd_key.as_ref(), 250);
2485 assert_eq!(result.unwrap(), 1);
2486 }
2487
2488 #[test]
2489 fn test_versioned_index_no_version_before_ts() {
2490 let batch = versioned_table_batch();
2491 let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2492
2493 let key_sf = vec![SortField::new(DataType::Utf8)];
2494 let converter = RowConverter::new(key_sf).unwrap();
2495 let eur_col = Arc::new(StringArray::from(vec!["EUR"]));
2496 let eur_rows = converter.convert_columns(&[eur_col]).unwrap();
2497 let eur_key = eur_rows.row(0);
2498
2499 let result = index.probe_at_time(eur_key.as_ref(), 50);
2501 assert!(result.is_none());
2502 }
2503
2504 fn build_versioned_exec(
2506 table: RecordBatch,
2507 stream: &RecordBatch,
2508 join_type: LookupJoinType,
2509 ) -> VersionedLookupJoinExec {
2510 let input = batch_exec(stream.clone());
2511 let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2512 let key_sort_fields = vec![SortField::new(DataType::Utf8)];
2513 let mut output_fields = stream.schema().fields().to_vec();
2514 output_fields.extend(table.schema().fields().iter().cloned());
2515 let output_schema = Arc::new(Schema::new(output_fields));
2516 VersionedLookupJoinExec::try_new(
2517 input,
2518 table,
2519 index,
2520 vec![1], 2, join_type,
2523 output_schema,
2524 key_sort_fields,
2525 )
2526 .unwrap()
2527 }
2528
2529 #[tokio::test]
2530 async fn test_versioned_join_exec_inner() {
2531 let table = versioned_table_batch();
2532 let stream = stream_batch_with_time();
2533 let exec = build_versioned_exec(table, &stream, LookupJoinType::Inner);
2534
2535 let ctx = Arc::new(TaskContext::default());
2536 let stream_out = exec.execute(0, ctx).unwrap();
2537 let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2538
2539 assert_eq!(batches.len(), 1);
2540 let batch = &batches[0];
2541 assert_eq!(batch.num_rows(), 3);
2546
2547 let rates = batch
2548 .column(5) .as_any()
2550 .downcast_ref::<Float64Array>()
2551 .unwrap();
2552 assert!((rates.value(0) - 1.0).abs() < f64::EPSILON); assert!((rates.value(1) - 0.90).abs() < f64::EPSILON); assert!((rates.value(2) - 1.1).abs() < f64::EPSILON); }
2556
2557 #[tokio::test]
2558 async fn test_versioned_join_exec_left_outer() {
2559 let table = versioned_table_batch();
2560 let stream = stream_batch_with_time();
2561 let exec = build_versioned_exec(table, &stream, LookupJoinType::LeftOuter);
2562
2563 let ctx = Arc::new(TaskContext::default());
2564 let stream_out = exec.execute(0, ctx).unwrap();
2565 let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2566
2567 assert_eq!(batches.len(), 1);
2568 let batch = &batches[0];
2569 assert_eq!(batch.num_rows(), 4);
2571
2572 let rates = batch
2574 .column(5)
2575 .as_any()
2576 .downcast_ref::<Float64Array>()
2577 .unwrap();
2578 assert!(rates.is_null(3), "EUR@50 should have null rate");
2579 }
2580
2581 #[test]
2582 fn test_versioned_index_empty_batch() {
2583 let schema = Arc::new(Schema::new(vec![
2584 Field::new("k", DataType::Utf8, false),
2585 Field::new("v", DataType::Int64, false),
2586 ]));
2587 let batch = RecordBatch::new_empty(schema);
2588 let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2589 assert!(index.map.is_empty());
2590 }
2591
2592 #[test]
2593 fn test_versioned_lookup_registry() {
2594 let registry = LookupTableRegistry::new();
2595 let table = versioned_table_batch();
2596 let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2597
2598 registry.register_versioned(
2599 "rates",
2600 VersionedLookupState {
2601 batch: table,
2602 index,
2603 key_columns: vec!["currency".to_string()],
2604 version_column: "valid_from".to_string(),
2605 stream_time_column: "event_ts".to_string(),
2606 max_versions_per_key: usize::MAX,
2607 },
2608 );
2609
2610 let entry = registry.get_entry("rates");
2611 assert!(entry.is_some());
2612 assert!(matches!(entry.unwrap(), RegisteredLookup::Versioned(_)));
2613
2614 assert!(registry.get("rates").is_none());
2616 }
2617}