1use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Debug, Formatter};
19use std::sync::Arc;
20
21use parking_lot::RwLock;
22
23use std::collections::BTreeMap;
24
25use arrow::compute::take;
26use arrow::row::{RowConverter, SortField};
27use arrow_array::{Array, Int64Array, RecordBatch, UInt32Array};
28use arrow_schema::{Schema, SchemaRef};
29use async_trait::async_trait;
30use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext};
31use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
32use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
35use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
36use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
37use datafusion_common::{DataFusionError, Result};
38use datafusion_expr::Expr;
39use futures::StreamExt;
40use laminar_core::lookup::lookup_cache::LookupMemoryCache;
41use laminar_core::lookup::source::{ColumnId, LookupSourceDyn};
42use tokio::sync::Semaphore;
43
44use super::lookup_join::{LookupJoinNode, LookupJoinType};
45
46const LOOKUP_SOURCE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49
50#[derive(Default)]
57pub struct LookupTableRegistry {
58 tables: RwLock<HashMap<String, RegisteredLookup>>,
59}
60
61pub enum RegisteredLookup {
64 Snapshot(Arc<LookupSnapshot>),
66 Partial(Arc<PartialLookupState>),
68 Versioned(Arc<VersionedLookupState>),
70}
71
72pub struct LookupSnapshot {
74 pub batch: RecordBatch,
76}
77
78pub struct VersionedLookupState {
85 pub batch: RecordBatch,
87 pub index: Arc<VersionedIndex>,
89 pub key_columns: Vec<String>,
91 pub version_column: String,
93 pub stream_time_column: String,
95 pub max_versions_per_key: usize,
97}
98
99pub struct PartialLookupState {
101 pub lookup_cache: Arc<LookupMemoryCache>,
103 pub schema: SchemaRef,
105 pub key_columns: Vec<String>,
107 pub key_sort_fields: Vec<SortField>,
109 pub source: Option<Arc<dyn LookupSourceDyn>>,
111 pub fetch_semaphore: Arc<Semaphore>,
113 pub projection: Vec<ColumnId>,
117}
118
119impl LookupTableRegistry {
120 #[must_use]
122 pub fn new() -> Self {
123 Self::default()
124 }
125
126 pub fn register(&self, name: &str, snapshot: LookupSnapshot) {
132 self.tables.write().insert(
133 name.to_lowercase(),
134 RegisteredLookup::Snapshot(Arc::new(snapshot)),
135 );
136 }
137
138 pub fn register_partial(&self, name: &str, state: PartialLookupState) {
144 self.tables.write().insert(
145 name.to_lowercase(),
146 RegisteredLookup::Partial(Arc::new(state)),
147 );
148 }
149
150 pub fn register_versioned(&self, name: &str, state: VersionedLookupState) {
156 self.tables.write().insert(
157 name.to_lowercase(),
158 RegisteredLookup::Versioned(Arc::new(state)),
159 );
160 }
161
162 pub fn unregister(&self, name: &str) {
168 self.tables.write().remove(&name.to_lowercase());
169 }
170
171 #[must_use]
177 pub fn get(&self, name: &str) -> Option<Arc<LookupSnapshot>> {
178 let tables = self.tables.read();
179 match tables.get(&name.to_lowercase())? {
180 RegisteredLookup::Snapshot(s) => Some(Arc::clone(s)),
181 RegisteredLookup::Partial(_) | RegisteredLookup::Versioned(_) => None,
182 }
183 }
184
185 pub fn get_entry(&self, name: &str) -> Option<RegisteredLookup> {
191 let tables = self.tables.read();
192 tables.get(&name.to_lowercase()).map(|e| match e {
193 RegisteredLookup::Snapshot(s) => RegisteredLookup::Snapshot(Arc::clone(s)),
194 RegisteredLookup::Partial(p) => RegisteredLookup::Partial(Arc::clone(p)),
195 RegisteredLookup::Versioned(v) => RegisteredLookup::Versioned(Arc::clone(v)),
196 })
197 }
198}
199
200struct HashIndex {
204 map: HashMap<Box<[u8]>, Vec<u32>>,
205}
206
207impl HashIndex {
208 fn build(batch: &RecordBatch, key_indices: &[usize]) -> Result<Self> {
213 if batch.num_rows() == 0 {
214 return Ok(Self {
215 map: HashMap::new(),
216 });
217 }
218
219 let sort_fields: Vec<SortField> = key_indices
220 .iter()
221 .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
222 .collect();
223 let converter = RowConverter::new(sort_fields)?;
224
225 let key_cols: Vec<_> = key_indices
226 .iter()
227 .map(|&i| batch.column(i).clone())
228 .collect();
229 let rows = converter.convert_columns(&key_cols)?;
230
231 let num_rows = batch.num_rows();
232 let mut map: HashMap<Box<[u8]>, Vec<u32>> = HashMap::with_capacity(num_rows);
233 #[allow(clippy::cast_possible_truncation)] for i in 0..num_rows {
235 map.entry(Box::from(rows.row(i).as_ref()))
236 .or_default()
237 .push(i as u32);
238 }
239
240 Ok(Self { map })
241 }
242
243 fn probe(&self, key: &[u8]) -> Option<&[u32]> {
244 self.map.get(key).map(Vec::as_slice)
245 }
246}
247
248#[derive(Default)]
254pub struct VersionedIndex {
255 map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>>,
256}
257
258impl VersionedIndex {
259 pub fn build(
269 batch: &RecordBatch,
270 key_indices: &[usize],
271 version_col_idx: usize,
272 max_versions_per_key: usize,
273 ) -> Result<Self> {
274 if batch.num_rows() == 0 {
275 return Ok(Self {
276 map: HashMap::new(),
277 });
278 }
279
280 let sort_fields: Vec<SortField> = key_indices
281 .iter()
282 .map(|&i| SortField::new(batch.schema().field(i).data_type().clone()))
283 .collect();
284 let converter = RowConverter::new(sort_fields)?;
285
286 let key_cols: Vec<_> = key_indices
287 .iter()
288 .map(|&i| batch.column(i).clone())
289 .collect();
290 let rows = converter.convert_columns(&key_cols)?;
291
292 let timestamps = extract_i64_timestamps(batch.column(version_col_idx))?;
293
294 let num_rows = batch.num_rows();
295 let mut map: HashMap<Box<[u8]>, BTreeMap<i64, Vec<u32>>> = HashMap::with_capacity(num_rows);
296 #[allow(clippy::cast_possible_truncation)]
297 for i in 0..timestamps.len() {
298 if timestamps.is_null(i) || key_cols.iter().any(|c| c.is_null(i)) {
299 continue;
300 }
301 let key = Box::from(rows.row(i).as_ref());
302 map.entry(key)
303 .or_default()
304 .entry(timestamps.value(i))
305 .or_default()
306 .push(i as u32);
307 }
308
309 if max_versions_per_key < usize::MAX {
311 for versions in map.values_mut() {
312 while versions.len() > max_versions_per_key {
313 versions.pop_first();
314 }
315 }
316 }
317
318 Ok(Self { map })
319 }
320
321 fn probe_at_time(&self, key: &[u8], event_ts: i64) -> Option<u32> {
324 let versions = self.map.get(key)?;
325 let (_, indices) = versions.range(..=event_ts).next_back()?;
326 indices.last().copied()
327 }
328}
329
330fn extract_i64_timestamps(col: &dyn arrow_array::Array) -> Result<Int64Array> {
336 use arrow::compute::cast;
337 use arrow_array::types::Int64Type;
338 use arrow_array::{ArrayRef, Float64Array};
339 use arrow_schema::{DataType, TimeUnit};
340
341 let int64_arr: ArrayRef = match col.data_type() {
342 DataType::Int64 => col.slice(0, col.len()),
343 DataType::Timestamp(TimeUnit::Millisecond, _) => cast(col, &DataType::Int64)?,
344 DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Second | TimeUnit::Nanosecond, _) => {
345 let ms = cast(col, &DataType::Timestamp(TimeUnit::Millisecond, None))?;
346 cast(ms.as_ref(), &DataType::Int64)?
347 }
348 DataType::Float64 => {
349 let arr = col
350 .as_any()
351 .downcast_ref::<Float64Array>()
352 .ok_or_else(|| DataFusionError::Internal("expected Float64Array".into()))?;
353 return arr.try_unary::<_, Int64Type, DataFusionError>(|v| {
356 let r = v.round();
357 if !(-9_223_372_036_854_775_808.0..9_223_372_036_854_775_808.0).contains(&r) {
358 return Err(DataFusionError::Plan(format!(
359 "Float64 temporal-join key {v} out of i64 range"
360 )));
361 }
362 #[allow(clippy::cast_possible_truncation)] let ms = r as i64;
364 Ok(ms)
365 });
366 }
367 other => {
368 return Err(DataFusionError::Plan(format!(
369 "unsupported timestamp type for temporal join: {other:?}"
370 )));
371 }
372 };
373
374 int64_arr
375 .as_any()
376 .downcast_ref::<Int64Array>()
377 .cloned()
378 .ok_or_else(|| DataFusionError::Internal("cast produced non-Int64 array".into()))
379}
380
381pub struct LookupJoinExec {
386 input: Arc<dyn ExecutionPlan>,
387 index: Arc<HashIndex>,
388 lookup_batch: Arc<RecordBatch>,
389 stream_key_indices: Vec<usize>,
390 join_type: LookupJoinType,
391 schema: SchemaRef,
392 properties: PlanProperties,
393 converter: Arc<RowConverter>,
397 stream_field_count: usize,
398 projection: Vec<usize>,
399}
400
401impl LookupJoinExec {
402 #[allow(clippy::needless_pass_by_value)] pub fn try_new(
412 input: Arc<dyn ExecutionPlan>,
413 lookup_batch: RecordBatch,
414 stream_key_indices: Vec<usize>,
415 lookup_key_indices: Vec<usize>,
416 join_type: LookupJoinType,
417 output_schema: SchemaRef,
418 ) -> Result<Self> {
419 let index = HashIndex::build(&lookup_batch, &lookup_key_indices)?;
420
421 let key_sort_fields: Vec<SortField> = lookup_key_indices
422 .iter()
423 .map(|&i| SortField::new(lookup_batch.schema().field(i).data_type().clone()))
424 .collect();
425 let converter = Arc::new(RowConverter::new(key_sort_fields)?);
426
427 let output_schema = if join_type == LookupJoinType::LeftOuter {
430 let stream_count = input.schema().fields().len();
431 let mut fields = output_schema.fields().to_vec();
432 for f in &mut fields[stream_count..] {
433 if !f.is_nullable() {
434 *f = Arc::new(f.as_ref().clone().with_nullable(true));
435 }
436 }
437 Arc::new(Schema::new_with_metadata(
438 fields,
439 output_schema.metadata().clone(),
440 ))
441 } else {
442 output_schema
443 };
444
445 let properties = PlanProperties::new(
446 EquivalenceProperties::new(Arc::clone(&output_schema)),
447 Partitioning::UnknownPartitioning(1),
448 EmissionType::Incremental,
449 Boundedness::Unbounded {
450 requires_infinite_memory: false,
451 },
452 );
453
454 let stream_field_count = input.schema().fields().len();
455 let projection = (0..(stream_field_count + lookup_batch.num_columns())).collect();
456
457 Ok(Self {
458 input,
459 index: Arc::new(index),
460 lookup_batch: Arc::new(lookup_batch),
461 stream_key_indices,
462 join_type,
463 schema: output_schema,
464 properties,
465 converter,
466 stream_field_count,
467 projection,
468 })
469 }
470
471 #[must_use]
473 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
474 self.projection = projection;
475 self
476 }
477}
478
479impl Debug for LookupJoinExec {
480 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
481 f.debug_struct("LookupJoinExec")
482 .field("join_type", &self.join_type)
483 .field("stream_keys", &self.stream_key_indices)
484 .field("lookup_rows", &self.lookup_batch.num_rows())
485 .finish_non_exhaustive()
486 }
487}
488
489impl DisplayAs for LookupJoinExec {
490 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
491 match t {
492 DisplayFormatType::Default | DisplayFormatType::Verbose => {
493 write!(
494 f,
495 "LookupJoinExec: type={}, stream_keys={:?}, lookup_rows={}",
496 self.join_type,
497 self.stream_key_indices,
498 self.lookup_batch.num_rows(),
499 )
500 }
501 DisplayFormatType::TreeRender => write!(f, "LookupJoinExec"),
502 }
503 }
504}
505
506impl ExecutionPlan for LookupJoinExec {
507 fn name(&self) -> &'static str {
508 "LookupJoinExec"
509 }
510
511 fn as_any(&self) -> &dyn Any {
512 self
513 }
514
515 fn schema(&self) -> SchemaRef {
516 Arc::clone(&self.schema)
517 }
518
519 fn properties(&self) -> &PlanProperties {
520 &self.properties
521 }
522
523 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
524 vec![&self.input]
525 }
526
527 fn with_new_children(
528 self: Arc<Self>,
529 mut children: Vec<Arc<dyn ExecutionPlan>>,
530 ) -> Result<Arc<dyn ExecutionPlan>> {
531 if children.len() != 1 {
532 return Err(DataFusionError::Plan(
533 "LookupJoinExec requires exactly one child".into(),
534 ));
535 }
536 Ok(Arc::new(Self {
537 input: children.swap_remove(0),
538 index: Arc::clone(&self.index),
539 lookup_batch: Arc::clone(&self.lookup_batch),
540 stream_key_indices: self.stream_key_indices.clone(),
541 join_type: self.join_type,
542 schema: Arc::clone(&self.schema),
543 properties: self.properties.clone(),
544 converter: Arc::clone(&self.converter),
545 stream_field_count: self.stream_field_count,
546 projection: self.projection.clone(),
547 }))
548 }
549
550 fn execute(
551 &self,
552 partition: usize,
553 context: Arc<TaskContext>,
554 ) -> Result<SendableRecordBatchStream> {
555 let input_stream = self.input.execute(partition, context)?;
556 let converter = Arc::clone(&self.converter);
557 let index = Arc::clone(&self.index);
558 let lookup_batch = Arc::clone(&self.lookup_batch);
559 let stream_key_indices = self.stream_key_indices.clone();
560 let join_type = self.join_type;
561 let schema = self.schema();
562 let stream_field_count = self.stream_field_count;
563 let projection = self.projection.clone();
564
565 let output = input_stream.map(move |result| {
566 let batch = result?;
567 if batch.num_rows() == 0 {
568 return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
569 }
570 probe_batch(
571 &batch,
572 &converter,
573 &index,
574 &lookup_batch,
575 &stream_key_indices,
576 join_type,
577 &schema,
578 stream_field_count,
579 &projection,
580 )
581 });
582
583 Ok(Box::pin(RecordBatchStreamAdapter::new(
584 self.schema(),
585 output,
586 )))
587 }
588}
589
590impl datafusion::physical_plan::ExecutionPlanProperties for LookupJoinExec {
591 fn output_partitioning(&self) -> &Partitioning {
592 self.properties.output_partitioning()
593 }
594
595 fn output_ordering(&self) -> Option<&LexOrdering> {
596 self.properties.output_ordering()
597 }
598
599 fn boundedness(&self) -> Boundedness {
600 Boundedness::Unbounded {
601 requires_infinite_memory: false,
602 }
603 }
604
605 fn pipeline_behavior(&self) -> EmissionType {
606 EmissionType::Incremental
607 }
608
609 fn equivalence_properties(&self) -> &EquivalenceProperties {
610 self.properties.equivalence_properties()
611 }
612}
613
614#[allow(clippy::too_many_arguments)]
619fn probe_batch(
620 stream_batch: &RecordBatch,
621 converter: &RowConverter,
622 index: &HashIndex,
623 lookup_batch: &RecordBatch,
624 stream_key_indices: &[usize],
625 join_type: LookupJoinType,
626 output_schema: &SchemaRef,
627 stream_field_count: usize,
628 projection: &[usize],
629) -> Result<RecordBatch> {
630 let key_cols: Vec<_> = stream_key_indices
631 .iter()
632 .map(|&i| stream_batch.column(i).clone())
633 .collect();
634 let rows = converter.convert_columns(&key_cols)?;
635
636 let num_rows = stream_batch.num_rows();
637 let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
638 let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
639
640 #[allow(clippy::cast_possible_truncation)] for row in 0..num_rows {
642 if key_cols.iter().any(|c| c.is_null(row)) {
644 if join_type == LookupJoinType::LeftOuter {
645 stream_indices.push(row as u32);
646 lookup_indices.push(None);
647 }
648 continue;
649 }
650
651 let key = rows.row(row);
652 match index.probe(key.as_ref()) {
653 Some(matches) => {
654 for &lookup_row in matches {
655 stream_indices.push(row as u32);
656 lookup_indices.push(Some(lookup_row));
657 }
658 }
659 None if join_type == LookupJoinType::LeftOuter => {
660 stream_indices.push(row as u32);
661 lookup_indices.push(None);
662 }
663 None => {}
664 }
665 }
666
667 if stream_indices.is_empty() {
668 return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
669 }
670
671 let take_stream = UInt32Array::from(stream_indices);
673 let mut columns = Vec::with_capacity(stream_field_count + lookup_batch.num_columns());
674
675 for col in stream_batch.columns() {
676 columns.push(take(col.as_ref(), &take_stream, None)?);
677 }
678
679 let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
681 for col in lookup_batch.columns() {
682 columns.push(take(col.as_ref(), &take_lookup, None)?);
683 }
684
685 debug_assert_eq!(
686 columns.len(),
687 stream_field_count + lookup_batch.num_columns(),
688 "output column count mismatch"
689 );
690
691 let projected_columns: Vec<_> = projection.iter().map(|&idx| columns[idx].clone()).collect();
692
693 debug_assert_eq!(
694 projected_columns.len(),
695 output_schema.fields().len(),
696 "projected column count mismatch"
697 );
698
699 Ok(RecordBatch::try_new(
700 Arc::clone(output_schema),
701 projected_columns,
702 )?)
703}
704
705pub struct VersionedLookupJoinExec {
711 input: Arc<dyn ExecutionPlan>,
712 index: Arc<VersionedIndex>,
713 table_batch: Arc<RecordBatch>,
714 stream_key_indices: Vec<usize>,
715 stream_time_col_idx: usize,
716 join_type: LookupJoinType,
717 schema: SchemaRef,
718 properties: PlanProperties,
719 converter: Arc<RowConverter>,
722 stream_field_count: usize,
723}
724
725impl VersionedLookupJoinExec {
726 #[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
736 pub fn try_new(
737 input: Arc<dyn ExecutionPlan>,
738 table_batch: RecordBatch,
739 index: Arc<VersionedIndex>,
740 stream_key_indices: Vec<usize>,
741 stream_time_col_idx: usize,
742 join_type: LookupJoinType,
743 output_schema: SchemaRef,
744 key_sort_fields: Vec<SortField>,
745 ) -> Result<Self> {
746 let output_schema = if join_type == LookupJoinType::LeftOuter {
747 let stream_count = input.schema().fields().len();
748 let mut fields = output_schema.fields().to_vec();
749 for f in &mut fields[stream_count..] {
750 if !f.is_nullable() {
751 *f = Arc::new(f.as_ref().clone().with_nullable(true));
752 }
753 }
754 Arc::new(Schema::new_with_metadata(
755 fields,
756 output_schema.metadata().clone(),
757 ))
758 } else {
759 output_schema
760 };
761
762 let properties = PlanProperties::new(
763 EquivalenceProperties::new(Arc::clone(&output_schema)),
764 Partitioning::UnknownPartitioning(1),
765 EmissionType::Incremental,
766 Boundedness::Unbounded {
767 requires_infinite_memory: false,
768 },
769 );
770
771 let stream_field_count = input.schema().fields().len();
772 let converter = Arc::new(RowConverter::new(key_sort_fields)?);
773
774 Ok(Self {
775 input,
776 index,
777 table_batch: Arc::new(table_batch),
778 stream_key_indices,
779 stream_time_col_idx,
780 join_type,
781 schema: output_schema,
782 properties,
783 converter,
784 stream_field_count,
785 })
786 }
787}
788
789impl Debug for VersionedLookupJoinExec {
790 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
791 f.debug_struct("VersionedLookupJoinExec")
792 .field("join_type", &self.join_type)
793 .field("stream_keys", &self.stream_key_indices)
794 .field("table_rows", &self.table_batch.num_rows())
795 .finish_non_exhaustive()
796 }
797}
798
799impl DisplayAs for VersionedLookupJoinExec {
800 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
801 match t {
802 DisplayFormatType::Default | DisplayFormatType::Verbose => {
803 write!(
804 f,
805 "VersionedLookupJoinExec: type={}, stream_keys={:?}, table_rows={}",
806 self.join_type,
807 self.stream_key_indices,
808 self.table_batch.num_rows(),
809 )
810 }
811 DisplayFormatType::TreeRender => write!(f, "VersionedLookupJoinExec"),
812 }
813 }
814}
815
816impl ExecutionPlan for VersionedLookupJoinExec {
817 fn name(&self) -> &'static str {
818 "VersionedLookupJoinExec"
819 }
820
821 fn as_any(&self) -> &dyn Any {
822 self
823 }
824
825 fn schema(&self) -> SchemaRef {
826 Arc::clone(&self.schema)
827 }
828
829 fn properties(&self) -> &PlanProperties {
830 &self.properties
831 }
832
833 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
834 vec![&self.input]
835 }
836
837 fn with_new_children(
838 self: Arc<Self>,
839 mut children: Vec<Arc<dyn ExecutionPlan>>,
840 ) -> Result<Arc<dyn ExecutionPlan>> {
841 if children.len() != 1 {
842 return Err(DataFusionError::Plan(
843 "VersionedLookupJoinExec requires exactly one child".into(),
844 ));
845 }
846 Ok(Arc::new(Self {
847 input: children.swap_remove(0),
848 index: Arc::clone(&self.index),
849 table_batch: Arc::clone(&self.table_batch),
850 stream_key_indices: self.stream_key_indices.clone(),
851 stream_time_col_idx: self.stream_time_col_idx,
852 join_type: self.join_type,
853 schema: Arc::clone(&self.schema),
854 properties: self.properties.clone(),
855 converter: Arc::clone(&self.converter),
856 stream_field_count: self.stream_field_count,
857 }))
858 }
859
860 fn execute(
861 &self,
862 partition: usize,
863 context: Arc<TaskContext>,
864 ) -> Result<SendableRecordBatchStream> {
865 let input_stream = self.input.execute(partition, context)?;
866 let converter = Arc::clone(&self.converter);
867 let index = Arc::clone(&self.index);
868 let table_batch = Arc::clone(&self.table_batch);
869 let stream_key_indices = self.stream_key_indices.clone();
870 let stream_time_col_idx = self.stream_time_col_idx;
871 let join_type = self.join_type;
872 let schema = self.schema();
873 let stream_field_count = self.stream_field_count;
874
875 let output = input_stream.map(move |result| {
876 let batch = result?;
877 if batch.num_rows() == 0 {
878 return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
879 }
880 probe_versioned_batch(
881 &batch,
882 &converter,
883 &index,
884 &table_batch,
885 &stream_key_indices,
886 stream_time_col_idx,
887 join_type,
888 &schema,
889 stream_field_count,
890 )
891 });
892
893 Ok(Box::pin(RecordBatchStreamAdapter::new(
894 self.schema(),
895 output,
896 )))
897 }
898}
899
900impl datafusion::physical_plan::ExecutionPlanProperties for VersionedLookupJoinExec {
901 fn output_partitioning(&self) -> &Partitioning {
902 self.properties.output_partitioning()
903 }
904
905 fn output_ordering(&self) -> Option<&LexOrdering> {
906 self.properties.output_ordering()
907 }
908
909 fn boundedness(&self) -> Boundedness {
910 Boundedness::Unbounded {
911 requires_infinite_memory: false,
912 }
913 }
914
915 fn pipeline_behavior(&self) -> EmissionType {
916 EmissionType::Incremental
917 }
918
919 fn equivalence_properties(&self) -> &EquivalenceProperties {
920 self.properties.equivalence_properties()
921 }
922}
923
924#[allow(clippy::too_many_arguments)]
927fn probe_versioned_batch(
928 stream_batch: &RecordBatch,
929 converter: &RowConverter,
930 index: &VersionedIndex,
931 table_batch: &RecordBatch,
932 stream_key_indices: &[usize],
933 stream_time_col_idx: usize,
934 join_type: LookupJoinType,
935 output_schema: &SchemaRef,
936 stream_field_count: usize,
937) -> Result<RecordBatch> {
938 let key_cols: Vec<_> = stream_key_indices
939 .iter()
940 .map(|&i| stream_batch.column(i).clone())
941 .collect();
942 let rows = converter.convert_columns(&key_cols)?;
943 let event_timestamps =
944 extract_i64_timestamps(stream_batch.column(stream_time_col_idx).as_ref())?;
945
946 let num_rows = stream_batch.num_rows();
947 let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
948 let mut lookup_indices: Vec<Option<u32>> = Vec::with_capacity(num_rows);
949
950 #[allow(clippy::cast_possible_truncation)]
951 for row in 0..event_timestamps.len() {
952 if event_timestamps.is_null(row) || key_cols.iter().any(|c| c.is_null(row)) {
953 if join_type == LookupJoinType::LeftOuter {
954 stream_indices.push(row as u32);
955 lookup_indices.push(None);
956 }
957 continue;
958 }
959
960 let key = rows.row(row);
961 let event_ts = event_timestamps.value(row);
962 match index.probe_at_time(key.as_ref(), event_ts) {
963 Some(table_row_idx) => {
964 stream_indices.push(row as u32);
965 lookup_indices.push(Some(table_row_idx));
966 }
967 None if join_type == LookupJoinType::LeftOuter => {
968 stream_indices.push(row as u32);
969 lookup_indices.push(None);
970 }
971 None => {}
972 }
973 }
974
975 if stream_indices.is_empty() {
976 return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
977 }
978
979 let take_stream = UInt32Array::from(stream_indices);
980 let mut columns = Vec::with_capacity(output_schema.fields().len());
981
982 for col in stream_batch.columns() {
983 columns.push(take(col.as_ref(), &take_stream, None)?);
984 }
985
986 let take_lookup: UInt32Array = lookup_indices.into_iter().collect();
987 for col in table_batch.columns() {
988 columns.push(take(col.as_ref(), &take_lookup, None)?);
989 }
990
991 debug_assert_eq!(
992 columns.len(),
993 stream_field_count + table_batch.num_columns(),
994 "output column count mismatch"
995 );
996
997 Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
998}
999
1000pub struct PartialLookupJoinExec {
1006 input: Arc<dyn ExecutionPlan>,
1007 lookup_cache: Arc<LookupMemoryCache>,
1008 stream_key_indices: Vec<usize>,
1009 join_type: LookupJoinType,
1010 schema: SchemaRef,
1011 properties: PlanProperties,
1012 converter: Arc<RowConverter>,
1015 stream_field_count: usize,
1016 lookup_schema: SchemaRef,
1017 source: Option<Arc<dyn LookupSourceDyn>>,
1018 fetch_semaphore: Arc<Semaphore>,
1019 projection: Vec<ColumnId>,
1020}
1021
1022impl PartialLookupJoinExec {
1023 pub fn try_new(
1029 input: Arc<dyn ExecutionPlan>,
1030 lookup_cache: Arc<LookupMemoryCache>,
1031 stream_key_indices: Vec<usize>,
1032 key_sort_fields: Vec<SortField>,
1033 join_type: LookupJoinType,
1034 lookup_schema: SchemaRef,
1035 output_schema: SchemaRef,
1036 ) -> Result<Self> {
1037 Self::try_new_with_source(
1038 input,
1039 lookup_cache,
1040 stream_key_indices,
1041 key_sort_fields,
1042 join_type,
1043 lookup_schema,
1044 output_schema,
1045 None,
1046 Arc::new(Semaphore::new(64)),
1047 vec![],
1048 )
1049 }
1050
1051 #[allow(clippy::too_many_arguments)]
1057 pub fn try_new_with_source(
1058 input: Arc<dyn ExecutionPlan>,
1059 lookup_cache: Arc<LookupMemoryCache>,
1060 stream_key_indices: Vec<usize>,
1061 key_sort_fields: Vec<SortField>,
1062 join_type: LookupJoinType,
1063 lookup_schema: SchemaRef,
1064 output_schema: SchemaRef,
1065 source: Option<Arc<dyn LookupSourceDyn>>,
1066 fetch_semaphore: Arc<Semaphore>,
1067 projection: Vec<ColumnId>,
1068 ) -> Result<Self> {
1069 let output_schema = if join_type == LookupJoinType::LeftOuter {
1070 let stream_count = input.schema().fields().len();
1071 let mut fields = output_schema.fields().to_vec();
1072 for f in &mut fields[stream_count..] {
1073 if !f.is_nullable() {
1074 *f = Arc::new(f.as_ref().clone().with_nullable(true));
1075 }
1076 }
1077 Arc::new(Schema::new_with_metadata(
1078 fields,
1079 output_schema.metadata().clone(),
1080 ))
1081 } else {
1082 output_schema
1083 };
1084
1085 let properties = PlanProperties::new(
1086 EquivalenceProperties::new(Arc::clone(&output_schema)),
1087 Partitioning::UnknownPartitioning(1),
1088 EmissionType::Incremental,
1089 Boundedness::Unbounded {
1090 requires_infinite_memory: false,
1091 },
1092 );
1093
1094 let stream_field_count = input.schema().fields().len();
1095 let converter = Arc::new(RowConverter::new(key_sort_fields)?);
1096
1097 Ok(Self {
1098 input,
1099 lookup_cache,
1100 stream_key_indices,
1101 join_type,
1102 schema: output_schema,
1103 properties,
1104 converter,
1105 stream_field_count,
1106 lookup_schema,
1107 source,
1108 fetch_semaphore,
1109 projection,
1110 })
1111 }
1112}
1113
1114impl Debug for PartialLookupJoinExec {
1115 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1116 f.debug_struct("PartialLookupJoinExec")
1117 .field("join_type", &self.join_type)
1118 .field("stream_keys", &self.stream_key_indices)
1119 .field("cache_table_id", &self.lookup_cache.table_id())
1120 .finish_non_exhaustive()
1121 }
1122}
1123
1124impl DisplayAs for PartialLookupJoinExec {
1125 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1126 match t {
1127 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1128 write!(
1129 f,
1130 "PartialLookupJoinExec: type={}, stream_keys={:?}, cache_entries={}",
1131 self.join_type,
1132 self.stream_key_indices,
1133 self.lookup_cache.len(),
1134 )
1135 }
1136 DisplayFormatType::TreeRender => write!(f, "PartialLookupJoinExec"),
1137 }
1138 }
1139}
1140
1141impl ExecutionPlan for PartialLookupJoinExec {
1142 fn name(&self) -> &'static str {
1143 "PartialLookupJoinExec"
1144 }
1145
1146 fn as_any(&self) -> &dyn Any {
1147 self
1148 }
1149
1150 fn schema(&self) -> SchemaRef {
1151 Arc::clone(&self.schema)
1152 }
1153
1154 fn properties(&self) -> &PlanProperties {
1155 &self.properties
1156 }
1157
1158 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1159 vec![&self.input]
1160 }
1161
1162 fn with_new_children(
1163 self: Arc<Self>,
1164 mut children: Vec<Arc<dyn ExecutionPlan>>,
1165 ) -> Result<Arc<dyn ExecutionPlan>> {
1166 if children.len() != 1 {
1167 return Err(DataFusionError::Plan(
1168 "PartialLookupJoinExec requires exactly one child".into(),
1169 ));
1170 }
1171 Ok(Arc::new(Self {
1172 input: children.swap_remove(0),
1173 lookup_cache: Arc::clone(&self.lookup_cache),
1174 stream_key_indices: self.stream_key_indices.clone(),
1175 join_type: self.join_type,
1176 schema: Arc::clone(&self.schema),
1177 properties: self.properties.clone(),
1178 converter: Arc::clone(&self.converter),
1179 stream_field_count: self.stream_field_count,
1180 lookup_schema: Arc::clone(&self.lookup_schema),
1181 source: self.source.clone(),
1182 fetch_semaphore: Arc::clone(&self.fetch_semaphore),
1183 projection: self.projection.clone(),
1184 }))
1185 }
1186
1187 fn execute(
1188 &self,
1189 partition: usize,
1190 context: Arc<TaskContext>,
1191 ) -> Result<SendableRecordBatchStream> {
1192 let input_stream = self.input.execute(partition, context)?;
1193 let converter = Arc::clone(&self.converter);
1194 let lookup_cache = Arc::clone(&self.lookup_cache);
1195 let stream_key_indices = self.stream_key_indices.clone();
1196 let join_type = self.join_type;
1197 let schema = self.schema();
1198 let stream_field_count = self.stream_field_count;
1199 let lookup_schema = Arc::clone(&self.lookup_schema);
1200 let source = self.source.clone();
1201 let fetch_semaphore = Arc::clone(&self.fetch_semaphore);
1202 let projection = self.projection.clone();
1203
1204 let output = input_stream.then(move |result| {
1205 let lookup_cache = Arc::clone(&lookup_cache);
1206 let converter = Arc::clone(&converter);
1207 let stream_key_indices = stream_key_indices.clone();
1208 let schema = Arc::clone(&schema);
1209 let lookup_schema = Arc::clone(&lookup_schema);
1210 let source = source.clone();
1211 let fetch_semaphore = Arc::clone(&fetch_semaphore);
1212 let projection = projection.clone();
1213 async move {
1214 let batch = result?;
1215 if batch.num_rows() == 0 {
1216 return Ok(RecordBatch::new_empty(Arc::clone(&schema)));
1217 }
1218 probe_partial_batch_with_fallback(
1219 &batch,
1220 &converter,
1221 &lookup_cache,
1222 &stream_key_indices,
1223 join_type,
1224 &schema,
1225 stream_field_count,
1226 &lookup_schema,
1227 source.as_deref(),
1228 &fetch_semaphore,
1229 &projection,
1230 )
1231 .await
1232 }
1233 });
1234
1235 Ok(Box::pin(RecordBatchStreamAdapter::new(
1236 self.schema(),
1237 output,
1238 )))
1239 }
1240}
1241
1242impl datafusion::physical_plan::ExecutionPlanProperties for PartialLookupJoinExec {
1243 fn output_partitioning(&self) -> &Partitioning {
1244 self.properties.output_partitioning()
1245 }
1246
1247 fn output_ordering(&self) -> Option<&LexOrdering> {
1248 self.properties.output_ordering()
1249 }
1250
1251 fn boundedness(&self) -> Boundedness {
1252 Boundedness::Unbounded {
1253 requires_infinite_memory: false,
1254 }
1255 }
1256
1257 fn pipeline_behavior(&self) -> EmissionType {
1258 EmissionType::Incremental
1259 }
1260
1261 fn equivalence_properties(&self) -> &EquivalenceProperties {
1262 self.properties.equivalence_properties()
1263 }
1264}
1265
1266#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1270async fn probe_partial_batch_with_fallback(
1271 stream_batch: &RecordBatch,
1272 converter: &RowConverter,
1273 lookup_cache: &LookupMemoryCache,
1274 stream_key_indices: &[usize],
1275 join_type: LookupJoinType,
1276 output_schema: &SchemaRef,
1277 stream_field_count: usize,
1278 lookup_schema: &SchemaRef,
1279 source: Option<&dyn LookupSourceDyn>,
1280 fetch_semaphore: &Semaphore,
1281 projection: &[ColumnId],
1282) -> Result<RecordBatch> {
1283 let key_cols: Vec<_> = stream_key_indices
1284 .iter()
1285 .map(|&i| stream_batch.column(i).clone())
1286 .collect();
1287 let rows = converter.convert_columns(&key_cols)?;
1288
1289 let num_rows = stream_batch.num_rows();
1290 let mut stream_indices: Vec<u32> = Vec::with_capacity(num_rows);
1291 let mut lookup_batches: Vec<Option<RecordBatch>> = Vec::with_capacity(num_rows);
1292 let mut miss_keys: Vec<(usize, Vec<u8>)> = Vec::new();
1293
1294 #[allow(clippy::cast_possible_truncation)]
1295 for row in 0..num_rows {
1296 if key_cols.iter().any(|c| c.is_null(row)) {
1298 if join_type == LookupJoinType::LeftOuter {
1299 stream_indices.push(row as u32);
1300 lookup_batches.push(None);
1301 }
1302 continue;
1303 }
1304
1305 let key = rows.row(row);
1306 let result = lookup_cache.get_cached(key.as_ref());
1307 if let Some(batch) = result.into_batch() {
1308 stream_indices.push(row as u32);
1309 lookup_batches.push(Some(batch));
1310 } else {
1311 let idx = stream_indices.len();
1312 stream_indices.push(row as u32);
1313 lookup_batches.push(None);
1314 miss_keys.push((idx, key.as_ref().to_vec()));
1315 }
1316 }
1317
1318 if let Some(source) = source {
1320 if !miss_keys.is_empty() {
1321 let _permit = fetch_semaphore
1322 .acquire()
1323 .await
1324 .map_err(|_| DataFusionError::Internal("fetch semaphore closed".into()))?;
1325
1326 let key_refs: Vec<&[u8]> = miss_keys.iter().map(|(_, k)| k.as_slice()).collect();
1327
1328 let results = match tokio::time::timeout(
1332 LOOKUP_SOURCE_TIMEOUT,
1333 source.query_batch(&key_refs, &[], projection),
1334 )
1335 .await
1336 {
1337 Ok(Ok(results)) => results,
1338 Ok(Err(e)) => {
1339 return Err(DataFusionError::Execution(format!(
1340 "lookup source query failed ({} keys): {e}",
1341 miss_keys.len()
1342 )));
1343 }
1344 Err(_elapsed) => {
1345 return Err(DataFusionError::Execution(format!(
1346 "lookup source query timed out after {LOOKUP_SOURCE_TIMEOUT:?} \
1347 ({} keys)",
1348 miss_keys.len()
1349 )));
1350 }
1351 };
1352
1353 if results.len() != miss_keys.len() {
1354 return Err(DataFusionError::Execution(format!(
1355 "Lookup source returned mismatched results cardinality. Expected {} results, but got {} results. Miss keys: {:?}",
1356 miss_keys.len(),
1357 results.len(),
1358 miss_keys
1359 )));
1360 }
1361
1362 for ((idx, key_bytes), maybe_batch) in miss_keys.iter().zip(results) {
1363 if let Some(batch) = maybe_batch {
1364 lookup_cache.insert(key_bytes, batch.clone());
1365 lookup_batches[*idx] = Some(batch);
1366 }
1367 }
1368 }
1369 }
1370
1371 if join_type == LookupJoinType::Inner {
1373 let mut write = 0;
1374 for read in 0..stream_indices.len() {
1375 if lookup_batches[read].is_some() {
1376 stream_indices[write] = stream_indices[read];
1377 lookup_batches.swap(write, read);
1378 write += 1;
1379 }
1380 }
1381 stream_indices.truncate(write);
1382 lookup_batches.truncate(write);
1383 }
1384
1385 if stream_indices.is_empty() {
1386 return Ok(RecordBatch::new_empty(Arc::clone(output_schema)));
1387 }
1388
1389 let take_indices = UInt32Array::from(stream_indices);
1390 let mut columns = Vec::with_capacity(output_schema.fields().len());
1391
1392 for col in stream_batch.columns() {
1393 columns.push(take(col.as_ref(), &take_indices, None)?);
1394 }
1395
1396 let lookup_col_count = lookup_schema.fields().len();
1397 for col_idx in 0..lookup_col_count {
1398 let arrays: Vec<_> = lookup_batches
1399 .iter()
1400 .map(|opt| match opt {
1401 Some(b) => b.column(col_idx).clone(),
1402 None => arrow_array::new_null_array(lookup_schema.field(col_idx).data_type(), 1),
1403 })
1404 .collect();
1405 let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(AsRef::as_ref).collect();
1406 columns.push(arrow::compute::concat(&refs)?);
1407 }
1408
1409 debug_assert_eq!(
1410 columns.len(),
1411 stream_field_count + lookup_col_count,
1412 "output column count mismatch"
1413 );
1414
1415 Ok(RecordBatch::try_new(Arc::clone(output_schema), columns)?)
1416}
1417
1418fn resolve_physical_projection(
1419 logical_schema: &datafusion::common::DFSchema,
1420 stream_schema: &arrow::datatypes::Schema,
1421 lookup_schema: &arrow::datatypes::Schema,
1422 lookup_table: &str,
1423 lookup_alias: Option<&str>,
1424) -> Result<Vec<usize>> {
1425 let mut projection = Vec::with_capacity(logical_schema.fields().len());
1426 for idx in 0..logical_schema.fields().len() {
1427 let (qualifier, field) = logical_schema.qualified_field(idx);
1428 let name = field.name();
1429 let is_lookup = if let Some(relation) = qualifier {
1430 let table = relation.table();
1431 table == lookup_table || Some(table) == lookup_alias
1432 } else {
1433 if stream_schema.index_of(name).is_ok() && lookup_schema.index_of(name).is_err() {
1435 false
1436 } else if lookup_schema.index_of(name).is_ok() && stream_schema.index_of(name).is_err()
1437 {
1438 true
1439 } else {
1440 stream_schema.index_of(name).is_ok()
1441 }
1442 };
1443
1444 if is_lookup {
1445 let lookup_idx = lookup_schema.index_of(name).map_err(|_| {
1446 DataFusionError::Plan(format!(
1447 "lookup join projection: output field '{name}' not found in lookup \
1448 schema {lookup_schema:?}"
1449 ))
1450 })?;
1451 projection.push(stream_schema.fields().len() + lookup_idx);
1452 } else {
1453 let stream_idx = stream_schema.index_of(name).map_err(|_| {
1454 DataFusionError::Plan(format!(
1455 "lookup join projection: output field '{name}' not found in stream \
1456 schema {stream_schema:?}"
1457 ))
1458 })?;
1459 projection.push(stream_idx);
1460 }
1461 }
1462 Ok(projection)
1463}
1464
1465pub struct LookupJoinExtensionPlanner {
1471 registry: Arc<LookupTableRegistry>,
1472}
1473
1474impl LookupJoinExtensionPlanner {
1475 pub fn new(registry: Arc<LookupTableRegistry>) -> Self {
1477 Self { registry }
1478 }
1479}
1480
1481#[async_trait]
1482impl ExtensionPlanner for LookupJoinExtensionPlanner {
1483 #[allow(clippy::too_many_lines)]
1484 async fn plan_extension(
1485 &self,
1486 _planner: &dyn PhysicalPlanner,
1487 node: &dyn UserDefinedLogicalNode,
1488 _logical_inputs: &[&LogicalPlan],
1489 physical_inputs: &[Arc<dyn ExecutionPlan>],
1490 session_state: &SessionState,
1491 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1492 let Some(lookup_node) = node.as_any().downcast_ref::<LookupJoinNode>() else {
1493 return Ok(None);
1494 };
1495
1496 let entry = self
1497 .registry
1498 .get_entry(lookup_node.lookup_table_name())
1499 .ok_or_else(|| {
1500 DataFusionError::Plan(format!(
1501 "lookup table '{}' not registered",
1502 lookup_node.lookup_table_name()
1503 ))
1504 })?;
1505
1506 let input = Arc::clone(&physical_inputs[0]);
1507 let stream_schema = input.schema();
1508
1509 match entry {
1510 RegisteredLookup::Partial(partial_state) => {
1511 let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1512
1513 let mut output_fields = stream_schema.fields().to_vec();
1514 output_fields.extend(partial_state.schema.fields().iter().cloned());
1515 let output_schema = Arc::new(Schema::new(output_fields));
1516
1517 let exec = PartialLookupJoinExec::try_new_with_source(
1518 input,
1519 Arc::clone(&partial_state.lookup_cache),
1520 stream_key_indices,
1521 partial_state.key_sort_fields.clone(),
1522 lookup_node.join_type(),
1523 Arc::clone(&partial_state.schema),
1524 output_schema,
1525 partial_state.source.clone(),
1526 Arc::clone(&partial_state.fetch_semaphore),
1527 partial_state.projection.clone(),
1528 )?;
1529 Ok(Some(Arc::new(exec)))
1530 }
1531 RegisteredLookup::Snapshot(snapshot) => {
1532 let lookup_schema = snapshot.batch.schema();
1533 let lookup_key_indices = resolve_lookup_keys(lookup_node, &lookup_schema)?;
1534
1535 let lookup_batch = if lookup_node.pushdown_predicates().is_empty()
1536 || snapshot.batch.num_rows() == 0
1537 {
1538 snapshot.batch.clone()
1539 } else {
1540 apply_pushdown_predicates(
1541 &snapshot.batch,
1542 lookup_node.pushdown_predicates(),
1543 session_state,
1544 )?
1545 };
1546
1547 let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1548
1549 for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1551 let st = stream_schema.field(*si).data_type();
1552 let lt = lookup_schema.field(*li).data_type();
1553 if st != lt {
1554 return Err(DataFusionError::Plan(format!(
1555 "Lookup join key type mismatch: stream '{}' is {st:?} \
1556 but lookup '{}' is {lt:?}",
1557 stream_schema.field(*si).name(),
1558 lookup_schema.field(*li).name(),
1559 )));
1560 }
1561 }
1562
1563 let logical_schema = lookup_node.schema();
1564 let physical_projection = resolve_physical_projection(
1565 logical_schema,
1566 &stream_schema,
1567 &lookup_schema,
1568 lookup_node.lookup_table_name(),
1569 lookup_node.lookup_alias(),
1570 )?;
1571
1572 let output_schema = Arc::new(Schema::new(logical_schema.fields().to_vec()));
1573
1574 let exec = LookupJoinExec::try_new(
1575 input,
1576 lookup_batch,
1577 stream_key_indices,
1578 lookup_key_indices,
1579 lookup_node.join_type(),
1580 output_schema,
1581 )?
1582 .with_projection(physical_projection);
1583
1584 Ok(Some(Arc::new(exec)))
1585 }
1586 RegisteredLookup::Versioned(versioned_state) => {
1587 let table_schema = versioned_state.batch.schema();
1588 let lookup_key_indices = resolve_lookup_keys(lookup_node, &table_schema)?;
1589 let stream_key_indices = resolve_stream_keys(lookup_node, &stream_schema)?;
1590
1591 for (si, li) in stream_key_indices.iter().zip(&lookup_key_indices) {
1593 let st = stream_schema.field(*si).data_type();
1594 let lt = table_schema.field(*li).data_type();
1595 if st != lt {
1596 return Err(DataFusionError::Plan(format!(
1597 "Temporal join key type mismatch: stream '{}' is {st:?} \
1598 but table '{}' is {lt:?}",
1599 stream_schema.field(*si).name(),
1600 table_schema.field(*li).name(),
1601 )));
1602 }
1603 }
1604
1605 let stream_time_col_idx = stream_schema
1606 .index_of(&versioned_state.stream_time_column)
1607 .map_err(|_| {
1608 DataFusionError::Plan(format!(
1609 "stream time column '{}' not found in stream schema",
1610 versioned_state.stream_time_column
1611 ))
1612 })?;
1613
1614 let key_sort_fields: Vec<SortField> = lookup_key_indices
1615 .iter()
1616 .map(|&i| SortField::new(table_schema.field(i).data_type().clone()))
1617 .collect();
1618
1619 let mut output_fields = stream_schema.fields().to_vec();
1620 output_fields.extend(table_schema.fields().iter().cloned());
1621 let output_schema = Arc::new(Schema::new(output_fields));
1622
1623 let exec = VersionedLookupJoinExec::try_new(
1624 input,
1625 versioned_state.batch.clone(),
1626 Arc::clone(&versioned_state.index),
1627 stream_key_indices,
1628 stream_time_col_idx,
1629 lookup_node.join_type(),
1630 output_schema,
1631 key_sort_fields,
1632 )?;
1633
1634 Ok(Some(Arc::new(exec)))
1635 }
1636 }
1637 }
1638}
1639
1640fn apply_pushdown_predicates(
1643 batch: &RecordBatch,
1644 predicates: &[Expr],
1645 session_state: &SessionState,
1646) -> Result<RecordBatch> {
1647 use arrow::compute::filter_record_batch;
1648 use datafusion::physical_expr::create_physical_expr;
1649
1650 let schema = batch.schema();
1651 let df_schema = datafusion::common::DFSchema::try_from(schema.as_ref().clone())?;
1652
1653 let mut mask = None::<arrow_array::BooleanArray>;
1654 for pred in predicates {
1655 let phys_expr = create_physical_expr(pred, &df_schema, session_state.execution_props())?;
1656 let result = phys_expr.evaluate(batch)?;
1657 let bool_arr = result
1658 .into_array(batch.num_rows())?
1659 .as_any()
1660 .downcast_ref::<arrow_array::BooleanArray>()
1661 .ok_or_else(|| {
1662 DataFusionError::Internal("pushdown predicate did not evaluate to boolean".into())
1663 })?
1664 .clone();
1665 mask = Some(match mask {
1666 Some(existing) => arrow::compute::and(&existing, &bool_arr)?,
1667 None => bool_arr,
1668 });
1669 }
1670
1671 match mask {
1672 Some(m) => Ok(filter_record_batch(batch, &m)?),
1673 None => Ok(batch.clone()),
1674 }
1675}
1676
1677fn resolve_stream_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1678 node.join_keys()
1679 .iter()
1680 .map(|pair| match &pair.stream_expr {
1681 Expr::Column(col) => schema.index_of(&col.name).map_err(|_| {
1682 DataFusionError::Plan(format!(
1683 "stream key column '{}' not found in physical schema",
1684 col.name
1685 ))
1686 }),
1687 other => Err(DataFusionError::NotImplemented(format!(
1688 "lookup join requires column references as stream keys, got: {other}"
1689 ))),
1690 })
1691 .collect()
1692}
1693
1694fn resolve_lookup_keys(node: &LookupJoinNode, schema: &SchemaRef) -> Result<Vec<usize>> {
1695 node.join_keys()
1696 .iter()
1697 .map(|pair| {
1698 schema.index_of(&pair.lookup_column).map_err(|_| {
1699 DataFusionError::Plan(format!(
1700 "lookup key column '{}' not found in lookup table schema",
1701 pair.lookup_column
1702 ))
1703 })
1704 })
1705 .collect()
1706}
1707
1708#[cfg(test)]
1711mod tests {
1712 use super::*;
1713 use arrow_array::{Array, Float64Array, Int64Array, StringArray};
1714 use arrow_schema::{DataType, Field};
1715 use datafusion::physical_plan::stream::RecordBatchStreamAdapter as TestStreamAdapter;
1716 use futures::TryStreamExt;
1717
1718 fn batch_exec(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
1720 let schema = batch.schema();
1721 let batches = vec![batch];
1722 let stream_schema = Arc::clone(&schema);
1723 Arc::new(StreamExecStub {
1724 schema,
1725 batches: std::sync::Mutex::new(Some(batches)),
1726 stream_schema,
1727 })
1728 }
1729
1730 struct StreamExecStub {
1732 schema: SchemaRef,
1733 batches: std::sync::Mutex<Option<Vec<RecordBatch>>>,
1734 stream_schema: SchemaRef,
1735 }
1736
1737 impl Debug for StreamExecStub {
1738 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1739 write!(f, "StreamExecStub")
1740 }
1741 }
1742
1743 impl DisplayAs for StreamExecStub {
1744 fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
1745 write!(f, "StreamExecStub")
1746 }
1747 }
1748
1749 impl ExecutionPlan for StreamExecStub {
1750 fn name(&self) -> &'static str {
1751 "StreamExecStub"
1752 }
1753 fn as_any(&self) -> &dyn Any {
1754 self
1755 }
1756 fn schema(&self) -> SchemaRef {
1757 Arc::clone(&self.schema)
1758 }
1759 fn properties(&self) -> &PlanProperties {
1760 Box::leak(Box::new(PlanProperties::new(
1762 EquivalenceProperties::new(Arc::clone(&self.schema)),
1763 Partitioning::UnknownPartitioning(1),
1764 EmissionType::Final,
1765 Boundedness::Bounded,
1766 )))
1767 }
1768 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1769 vec![]
1770 }
1771 fn with_new_children(
1772 self: Arc<Self>,
1773 _: Vec<Arc<dyn ExecutionPlan>>,
1774 ) -> Result<Arc<dyn ExecutionPlan>> {
1775 Ok(self)
1776 }
1777 fn execute(&self, _: usize, _: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
1778 let batches = self.batches.lock().unwrap().take().unwrap_or_default();
1779 let schema = Arc::clone(&self.stream_schema);
1780 let stream = futures::stream::iter(batches.into_iter().map(Ok));
1781 Ok(Box::pin(TestStreamAdapter::new(schema, stream)))
1782 }
1783 }
1784
1785 impl datafusion::physical_plan::ExecutionPlanProperties for StreamExecStub {
1786 fn output_partitioning(&self) -> &Partitioning {
1787 self.properties().output_partitioning()
1788 }
1789 fn output_ordering(&self) -> Option<&LexOrdering> {
1790 None
1791 }
1792 fn boundedness(&self) -> Boundedness {
1793 Boundedness::Bounded
1794 }
1795 fn pipeline_behavior(&self) -> EmissionType {
1796 EmissionType::Final
1797 }
1798 fn equivalence_properties(&self) -> &EquivalenceProperties {
1799 self.properties().equivalence_properties()
1800 }
1801 }
1802
1803 fn orders_schema() -> SchemaRef {
1804 Arc::new(Schema::new(vec![
1805 Field::new("order_id", DataType::Int64, false),
1806 Field::new("customer_id", DataType::Int64, false),
1807 Field::new("amount", DataType::Float64, false),
1808 ]))
1809 }
1810
1811 fn customers_schema() -> SchemaRef {
1812 Arc::new(Schema::new(vec![
1813 Field::new("id", DataType::Int64, false),
1814 Field::new("name", DataType::Utf8, true),
1815 ]))
1816 }
1817
1818 fn output_schema() -> SchemaRef {
1819 Arc::new(Schema::new(vec![
1820 Field::new("order_id", DataType::Int64, false),
1821 Field::new("customer_id", DataType::Int64, false),
1822 Field::new("amount", DataType::Float64, false),
1823 Field::new("id", DataType::Int64, false),
1824 Field::new("name", DataType::Utf8, true),
1825 ]))
1826 }
1827
1828 fn customers_batch() -> RecordBatch {
1829 RecordBatch::try_new(
1830 customers_schema(),
1831 vec![
1832 Arc::new(Int64Array::from(vec![1, 2, 3])),
1833 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1834 ],
1835 )
1836 .unwrap()
1837 }
1838
1839 fn orders_batch() -> RecordBatch {
1840 RecordBatch::try_new(
1841 orders_schema(),
1842 vec![
1843 Arc::new(Int64Array::from(vec![100, 101, 102, 103])),
1844 Arc::new(Int64Array::from(vec![1, 2, 99, 3])),
1845 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0, 40.0])),
1846 ],
1847 )
1848 .unwrap()
1849 }
1850
1851 fn make_exec(join_type: LookupJoinType) -> LookupJoinExec {
1852 let input = batch_exec(orders_batch());
1853 LookupJoinExec::try_new(
1854 input,
1855 customers_batch(),
1856 vec![1], vec![0], join_type,
1859 output_schema(),
1860 )
1861 .unwrap()
1862 }
1863
1864 #[tokio::test]
1865 async fn inner_join_filters_non_matches() {
1866 let exec = make_exec(LookupJoinType::Inner);
1867 let ctx = Arc::new(TaskContext::default());
1868 let stream = exec.execute(0, ctx).unwrap();
1869 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1870
1871 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1872 assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
1873
1874 let names = batches[0]
1875 .column(4)
1876 .as_any()
1877 .downcast_ref::<StringArray>()
1878 .unwrap();
1879 assert_eq!(names.value(0), "Alice");
1880 assert_eq!(names.value(1), "Bob");
1881 assert_eq!(names.value(2), "Charlie");
1882 }
1883
1884 #[tokio::test]
1885 async fn left_outer_preserves_non_matches() {
1886 let exec = make_exec(LookupJoinType::LeftOuter);
1887 let ctx = Arc::new(TaskContext::default());
1888 let stream = exec.execute(0, ctx).unwrap();
1889 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1890
1891 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1892 assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
1893
1894 let names = batches[0]
1895 .column(4)
1896 .as_any()
1897 .downcast_ref::<StringArray>()
1898 .unwrap();
1899 assert!(names.is_null(2));
1901 }
1902
1903 #[tokio::test]
1904 async fn empty_lookup_inner_produces_no_rows() {
1905 let empty = RecordBatch::new_empty(customers_schema());
1906 let input = batch_exec(orders_batch());
1907 let exec = LookupJoinExec::try_new(
1908 input,
1909 empty,
1910 vec![1],
1911 vec![0],
1912 LookupJoinType::Inner,
1913 output_schema(),
1914 )
1915 .unwrap();
1916
1917 let ctx = Arc::new(TaskContext::default());
1918 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1919 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1920 assert_eq!(total, 0);
1921 }
1922
1923 #[tokio::test]
1924 async fn empty_lookup_left_outer_preserves_all_stream_rows() {
1925 let empty = RecordBatch::new_empty(customers_schema());
1926 let input = batch_exec(orders_batch());
1927 let exec = LookupJoinExec::try_new(
1928 input,
1929 empty,
1930 vec![1],
1931 vec![0],
1932 LookupJoinType::LeftOuter,
1933 output_schema(),
1934 )
1935 .unwrap();
1936
1937 let ctx = Arc::new(TaskContext::default());
1938 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1939 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1940 assert_eq!(total, 4);
1941 }
1942
1943 #[tokio::test]
1944 async fn duplicate_keys_produce_multiple_rows() {
1945 let lookup = RecordBatch::try_new(
1946 customers_schema(),
1947 vec![
1948 Arc::new(Int64Array::from(vec![1, 1])),
1949 Arc::new(StringArray::from(vec!["Alice-A", "Alice-B"])),
1950 ],
1951 )
1952 .unwrap();
1953
1954 let stream = RecordBatch::try_new(
1955 orders_schema(),
1956 vec![
1957 Arc::new(Int64Array::from(vec![100])),
1958 Arc::new(Int64Array::from(vec![1])),
1959 Arc::new(Float64Array::from(vec![10.0])),
1960 ],
1961 )
1962 .unwrap();
1963
1964 let input = batch_exec(stream);
1965 let exec = LookupJoinExec::try_new(
1966 input,
1967 lookup,
1968 vec![1],
1969 vec![0],
1970 LookupJoinType::Inner,
1971 output_schema(),
1972 )
1973 .unwrap();
1974
1975 let ctx = Arc::new(TaskContext::default());
1976 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
1977 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
1978 assert_eq!(total, 2, "one stream row matched two lookup rows");
1979 }
1980
1981 #[test]
1982 fn with_new_children_preserves_state() {
1983 let exec = Arc::new(make_exec(LookupJoinType::Inner));
1984 let expected_schema = exec.schema();
1985 let children = exec.children().into_iter().cloned().collect();
1986 let rebuilt = exec.with_new_children(children).unwrap();
1987 assert_eq!(rebuilt.schema(), expected_schema);
1988 assert_eq!(rebuilt.name(), "LookupJoinExec");
1989 }
1990
1991 #[test]
1992 fn display_format() {
1993 let exec = make_exec(LookupJoinType::Inner);
1994 let s = format!("{exec:?}");
1995 assert!(s.contains("LookupJoinExec"));
1996 assert!(s.contains("lookup_rows: 3"));
1997 }
1998
1999 #[test]
2000 fn registry_crud() {
2001 let reg = LookupTableRegistry::new();
2002 assert!(reg.get("customers").is_none());
2003
2004 reg.register(
2005 "customers",
2006 LookupSnapshot {
2007 batch: customers_batch(),
2008 },
2009 );
2010 assert!(reg.get("customers").is_some());
2011 assert!(reg.get("CUSTOMERS").is_some(), "case-insensitive");
2012
2013 reg.unregister("customers");
2014 assert!(reg.get("customers").is_none());
2015 }
2016
2017 #[test]
2018 fn registry_update_replaces() {
2019 let reg = LookupTableRegistry::new();
2020 reg.register(
2021 "t",
2022 LookupSnapshot {
2023 batch: RecordBatch::new_empty(customers_schema()),
2024 },
2025 );
2026 assert_eq!(reg.get("t").unwrap().batch.num_rows(), 0);
2027
2028 reg.register(
2029 "t",
2030 LookupSnapshot {
2031 batch: customers_batch(),
2032 },
2033 );
2034 assert_eq!(reg.get("t").unwrap().batch.num_rows(), 3);
2035 }
2036
2037 #[test]
2038 fn pushdown_predicates_filter_snapshot() {
2039 use datafusion::logical_expr::{col, lit};
2040
2041 let batch = customers_batch(); let ctx = datafusion::prelude::SessionContext::new();
2043 let state = ctx.state();
2044
2045 let predicates = vec![col("id").gt(lit(1i64))];
2047 let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
2048 assert_eq!(filtered.num_rows(), 2);
2049
2050 let ids = filtered
2051 .column(0)
2052 .as_any()
2053 .downcast_ref::<Int64Array>()
2054 .unwrap();
2055 assert_eq!(ids.value(0), 2);
2056 assert_eq!(ids.value(1), 3);
2057 }
2058
2059 #[test]
2060 fn pushdown_predicates_empty_passes_all() {
2061 let batch = customers_batch();
2062 let ctx = datafusion::prelude::SessionContext::new();
2063 let state = ctx.state();
2064
2065 let filtered = apply_pushdown_predicates(&batch, &[], &state).unwrap();
2066 assert_eq!(filtered.num_rows(), 3);
2067 }
2068
2069 #[test]
2070 fn pushdown_predicates_multiple_and() {
2071 use datafusion::logical_expr::{col, lit};
2072
2073 let batch = customers_batch(); let ctx = datafusion::prelude::SessionContext::new();
2075 let state = ctx.state();
2076
2077 let predicates = vec![col("id").gt_eq(lit(2i64)), col("id").lt(lit(3i64))];
2079 let filtered = apply_pushdown_predicates(&batch, &predicates, &state).unwrap();
2080 assert_eq!(filtered.num_rows(), 1);
2081 }
2082
2083 use laminar_core::lookup::lookup_cache::LookupMemoryCacheConfig;
2086
2087 fn make_lookup_cache() -> Arc<LookupMemoryCache> {
2088 Arc::new(LookupMemoryCache::new(
2089 1,
2090 LookupMemoryCacheConfig {
2091 capacity_bytes: 64 * 1024,
2092 ttl: None,
2093 },
2094 ))
2095 }
2096
2097 fn customer_row(id: i64, name: &str) -> RecordBatch {
2098 RecordBatch::try_new(
2099 customers_schema(),
2100 vec![
2101 Arc::new(Int64Array::from(vec![id])),
2102 Arc::new(StringArray::from(vec![name])),
2103 ],
2104 )
2105 .unwrap()
2106 }
2107
2108 fn warm_cache(cache: &LookupMemoryCache) {
2109 let converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap();
2110
2111 for (id, name) in [(1, "Alice"), (2, "Bob"), (3, "Charlie")] {
2112 let key_col = Int64Array::from(vec![id]);
2113 let rows = converter.convert_columns(&[Arc::new(key_col)]).unwrap();
2114 let key = rows.row(0);
2115 cache.insert(key.as_ref(), customer_row(id, name));
2116 }
2117 }
2118
2119 fn make_partial_exec(join_type: LookupJoinType) -> PartialLookupJoinExec {
2120 let cache = make_lookup_cache();
2121 warm_cache(&cache);
2122
2123 let input = batch_exec(orders_batch());
2124 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2125
2126 PartialLookupJoinExec::try_new(
2127 input,
2128 cache,
2129 vec![1], key_sort_fields,
2131 join_type,
2132 customers_schema(),
2133 output_schema(),
2134 )
2135 .unwrap()
2136 }
2137
2138 #[tokio::test]
2139 async fn partial_inner_join_filters_non_matches() {
2140 let exec = make_partial_exec(LookupJoinType::Inner);
2141 let ctx = Arc::new(TaskContext::default());
2142 let stream = exec.execute(0, ctx).unwrap();
2143 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2144
2145 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2146 assert_eq!(total, 3, "customer_id=99 has no match, filtered by inner");
2147
2148 let names = batches[0]
2149 .column(4)
2150 .as_any()
2151 .downcast_ref::<StringArray>()
2152 .unwrap();
2153 assert_eq!(names.value(0), "Alice");
2154 assert_eq!(names.value(1), "Bob");
2155 assert_eq!(names.value(2), "Charlie");
2156 }
2157
2158 #[tokio::test]
2159 async fn partial_left_outer_preserves_non_matches() {
2160 let exec = make_partial_exec(LookupJoinType::LeftOuter);
2161 let ctx = Arc::new(TaskContext::default());
2162 let stream = exec.execute(0, ctx).unwrap();
2163 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2164
2165 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2166 assert_eq!(total, 4, "all 4 stream rows preserved in left outer");
2167
2168 let names = batches[0]
2169 .column(4)
2170 .as_any()
2171 .downcast_ref::<StringArray>()
2172 .unwrap();
2173 assert!(names.is_null(2), "customer_id=99 should have null name");
2174 }
2175
2176 #[tokio::test]
2177 async fn partial_empty_cache_inner_produces_no_rows() {
2178 let cache = make_lookup_cache();
2179 let input = batch_exec(orders_batch());
2180 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2181
2182 let exec = PartialLookupJoinExec::try_new(
2183 input,
2184 cache,
2185 vec![1],
2186 key_sort_fields,
2187 LookupJoinType::Inner,
2188 customers_schema(),
2189 output_schema(),
2190 )
2191 .unwrap();
2192
2193 let ctx = Arc::new(TaskContext::default());
2194 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2195 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2196 assert_eq!(total, 0);
2197 }
2198
2199 #[tokio::test]
2200 async fn partial_empty_cache_left_outer_preserves_all() {
2201 let cache = make_lookup_cache();
2202 let input = batch_exec(orders_batch());
2203 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2204
2205 let exec = PartialLookupJoinExec::try_new(
2206 input,
2207 cache,
2208 vec![1],
2209 key_sort_fields,
2210 LookupJoinType::LeftOuter,
2211 customers_schema(),
2212 output_schema(),
2213 )
2214 .unwrap();
2215
2216 let ctx = Arc::new(TaskContext::default());
2217 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2218 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2219 assert_eq!(total, 4);
2220 }
2221
2222 #[test]
2223 fn partial_with_new_children_preserves_state() {
2224 let exec = Arc::new(make_partial_exec(LookupJoinType::Inner));
2225 let expected_schema = exec.schema();
2226 let children = exec.children().into_iter().cloned().collect();
2227 let rebuilt = exec.with_new_children(children).unwrap();
2228 assert_eq!(rebuilt.schema(), expected_schema);
2229 assert_eq!(rebuilt.name(), "PartialLookupJoinExec");
2230 }
2231
2232 #[test]
2233 fn partial_display_format() {
2234 let exec = make_partial_exec(LookupJoinType::Inner);
2235 let s = format!("{exec:?}");
2236 assert!(s.contains("PartialLookupJoinExec"));
2237 assert!(s.contains("cache_table_id: 1"));
2238 }
2239
2240 #[test]
2241 fn registry_partial_entry() {
2242 let reg = LookupTableRegistry::new();
2243 let cache = make_lookup_cache();
2244 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2245
2246 reg.register_partial(
2247 "customers",
2248 PartialLookupState {
2249 lookup_cache: cache,
2250 schema: customers_schema(),
2251 key_columns: vec!["id".into()],
2252 key_sort_fields,
2253 source: None,
2254 fetch_semaphore: Arc::new(Semaphore::new(64)),
2255 projection: Vec::new(),
2256 },
2257 );
2258
2259 assert!(reg.get("customers").is_none());
2260
2261 let entry = reg.get_entry("customers");
2262 assert!(entry.is_some());
2263 assert!(matches!(entry.unwrap(), RegisteredLookup::Partial(_)));
2264 }
2265
2266 #[tokio::test]
2267 async fn partial_source_fallback_on_miss() {
2268 use laminar_core::lookup::source::LookupError;
2269 use laminar_core::lookup::source::LookupSourceDyn;
2270
2271 struct TestSource;
2272
2273 #[async_trait]
2274 impl LookupSourceDyn for TestSource {
2275 async fn query_batch(
2276 &self,
2277 keys: &[&[u8]],
2278 _predicates: &[laminar_core::lookup::predicate::Predicate],
2279 _projection: &[laminar_core::lookup::source::ColumnId],
2280 ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2281 Ok(keys
2282 .iter()
2283 .map(|_| Some(customer_row(99, "FromSource")))
2284 .collect())
2285 }
2286
2287 fn schema(&self) -> SchemaRef {
2288 customers_schema()
2289 }
2290 }
2291
2292 let cache = make_lookup_cache();
2293 warm_cache(&cache);
2295
2296 let orders = RecordBatch::try_new(
2297 orders_schema(),
2298 vec![
2299 Arc::new(Int64Array::from(vec![200])),
2300 Arc::new(Int64Array::from(vec![99])), Arc::new(Float64Array::from(vec![50.0])),
2302 ],
2303 )
2304 .unwrap();
2305
2306 let input = batch_exec(orders);
2307 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2308 let source: Arc<dyn LookupSourceDyn> = Arc::new(TestSource);
2309
2310 let exec = PartialLookupJoinExec::try_new_with_source(
2311 input,
2312 cache,
2313 vec![1],
2314 key_sort_fields,
2315 LookupJoinType::Inner,
2316 customers_schema(),
2317 output_schema(),
2318 Some(source),
2319 Arc::new(Semaphore::new(64)),
2320 vec![],
2321 )
2322 .unwrap();
2323
2324 let ctx = Arc::new(TaskContext::default());
2325 let batches: Vec<RecordBatch> = exec.execute(0, ctx).unwrap().try_collect().await.unwrap();
2326 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2327 assert_eq!(total, 1, "source fallback should produce 1 row");
2328
2329 let names = batches[0]
2330 .column(4)
2331 .as_any()
2332 .downcast_ref::<StringArray>()
2333 .unwrap();
2334 assert_eq!(names.value(0), "FromSource");
2335 }
2336
2337 #[tokio::test]
2338 async fn partial_source_error_propagates() {
2339 use laminar_core::lookup::source::LookupError;
2340 use laminar_core::lookup::source::LookupSourceDyn;
2341
2342 struct FailingSource;
2343
2344 #[async_trait]
2345 impl LookupSourceDyn for FailingSource {
2346 async fn query_batch(
2347 &self,
2348 _keys: &[&[u8]],
2349 _predicates: &[laminar_core::lookup::predicate::Predicate],
2350 _projection: &[laminar_core::lookup::source::ColumnId],
2351 ) -> std::result::Result<Vec<Option<RecordBatch>>, LookupError> {
2352 Err(LookupError::Internal("source unavailable".into()))
2353 }
2354
2355 fn schema(&self) -> SchemaRef {
2356 customers_schema()
2357 }
2358 }
2359
2360 let cache = make_lookup_cache();
2361 let input = batch_exec(orders_batch());
2362 let key_sort_fields = vec![SortField::new(DataType::Int64)];
2363 let source: Arc<dyn LookupSourceDyn> = Arc::new(FailingSource);
2364
2365 let exec = PartialLookupJoinExec::try_new_with_source(
2366 input,
2367 cache,
2368 vec![1],
2369 key_sort_fields,
2370 LookupJoinType::LeftOuter,
2371 customers_schema(),
2372 output_schema(),
2373 Some(source),
2374 Arc::new(Semaphore::new(64)),
2375 vec![],
2376 )
2377 .unwrap();
2378
2379 let ctx = Arc::new(TaskContext::default());
2382 let result: std::result::Result<Vec<RecordBatch>, _> =
2383 exec.execute(0, ctx).unwrap().try_collect().await;
2384 let err = result.expect_err("source error must propagate, not degrade silently");
2385 assert!(
2386 err.to_string().contains("lookup source query failed"),
2387 "unexpected error: {err}"
2388 );
2389 }
2390
2391 #[test]
2392 fn registry_snapshot_entry_via_get_entry() {
2393 let reg = LookupTableRegistry::new();
2394 reg.register(
2395 "t",
2396 LookupSnapshot {
2397 batch: customers_batch(),
2398 },
2399 );
2400
2401 let entry = reg.get_entry("t");
2402 assert!(matches!(entry.unwrap(), RegisteredLookup::Snapshot(_)));
2403 assert!(reg.get("t").is_some());
2404 }
2405
2406 fn nullable_orders_schema() -> SchemaRef {
2409 Arc::new(Schema::new(vec![
2410 Field::new("order_id", DataType::Int64, false),
2411 Field::new("customer_id", DataType::Int64, true), Field::new("amount", DataType::Float64, false),
2413 ]))
2414 }
2415
2416 fn nullable_output_schema(join_type: LookupJoinType) -> SchemaRef {
2417 let lookup_nullable = join_type == LookupJoinType::LeftOuter;
2418 Arc::new(Schema::new(vec![
2419 Field::new("order_id", DataType::Int64, false),
2420 Field::new("customer_id", DataType::Int64, true),
2421 Field::new("amount", DataType::Float64, false),
2422 Field::new("id", DataType::Int64, lookup_nullable),
2423 Field::new("name", DataType::Utf8, true),
2424 ]))
2425 }
2426
2427 #[tokio::test]
2428 async fn null_key_inner_join_no_match() {
2429 let stream_batch = RecordBatch::try_new(
2431 nullable_orders_schema(),
2432 vec![
2433 Arc::new(Int64Array::from(vec![100, 101, 102])),
2434 Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2435 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2436 ],
2437 )
2438 .unwrap();
2439
2440 let input = batch_exec(stream_batch);
2441 let exec = LookupJoinExec::try_new(
2442 input,
2443 customers_batch(),
2444 vec![1],
2445 vec![0],
2446 LookupJoinType::Inner,
2447 nullable_output_schema(LookupJoinType::Inner),
2448 )
2449 .unwrap();
2450
2451 let ctx = Arc::new(TaskContext::default());
2452 let stream = exec.execute(0, ctx).unwrap();
2453 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2454
2455 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2456 assert_eq!(total, 2, "NULL key row should not match in inner join");
2458 }
2459
2460 #[tokio::test]
2461 async fn null_key_left_outer_produces_nulls() {
2462 let stream_batch = RecordBatch::try_new(
2464 nullable_orders_schema(),
2465 vec![
2466 Arc::new(Int64Array::from(vec![100, 101, 102])),
2467 Arc::new(Int64Array::from(vec![Some(1), None, Some(2)])),
2468 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
2469 ],
2470 )
2471 .unwrap();
2472
2473 let input = batch_exec(stream_batch);
2474 let out_schema = nullable_output_schema(LookupJoinType::LeftOuter);
2475 let exec = LookupJoinExec::try_new(
2476 input,
2477 customers_batch(),
2478 vec![1],
2479 vec![0],
2480 LookupJoinType::LeftOuter,
2481 out_schema,
2482 )
2483 .unwrap();
2484
2485 let ctx = Arc::new(TaskContext::default());
2486 let stream = exec.execute(0, ctx).unwrap();
2487 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2488
2489 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
2490 assert_eq!(total, 3, "all rows preserved in left outer");
2492
2493 let names = batches[0]
2494 .column(4)
2495 .as_any()
2496 .downcast_ref::<StringArray>()
2497 .unwrap();
2498 assert_eq!(names.value(0), "Alice");
2499 assert!(
2500 names.is_null(1),
2501 "NULL key row should have null lookup name"
2502 );
2503 assert_eq!(names.value(2), "Bob");
2504 }
2505
2506 fn versioned_table_batch() -> RecordBatch {
2509 let schema = Arc::new(Schema::new(vec![
2512 Field::new("currency", DataType::Utf8, false),
2513 Field::new("valid_from", DataType::Int64, false),
2514 Field::new("rate", DataType::Float64, false),
2515 ]));
2516 RecordBatch::try_new(
2517 schema,
2518 vec![
2519 Arc::new(StringArray::from(vec!["USD", "USD", "EUR", "EUR", "EUR"])),
2520 Arc::new(Int64Array::from(vec![100, 200, 100, 150, 300])),
2521 Arc::new(Float64Array::from(vec![1.0, 1.1, 0.85, 0.90, 0.88])),
2522 ],
2523 )
2524 .unwrap()
2525 }
2526
2527 fn stream_batch_with_time() -> RecordBatch {
2528 let schema = Arc::new(Schema::new(vec![
2529 Field::new("order_id", DataType::Int64, false),
2530 Field::new("currency", DataType::Utf8, false),
2531 Field::new("event_ts", DataType::Int64, false),
2532 ]));
2533 RecordBatch::try_new(
2534 schema,
2535 vec![
2536 Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
2537 Arc::new(StringArray::from(vec!["USD", "EUR", "USD", "EUR"])),
2538 Arc::new(Int64Array::from(vec![150, 160, 250, 50])),
2539 ],
2540 )
2541 .unwrap()
2542 }
2543
2544 #[test]
2545 fn test_versioned_index_build_and_probe() {
2546 let batch = versioned_table_batch();
2547 let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2548
2549 let key_sf = vec![SortField::new(DataType::Utf8)];
2552 let converter = RowConverter::new(key_sf).unwrap();
2553 let usd_col = Arc::new(StringArray::from(vec!["USD"]));
2554 let usd_rows = converter.convert_columns(&[usd_col]).unwrap();
2555 let usd_key = usd_rows.row(0);
2556
2557 let result = index.probe_at_time(usd_key.as_ref(), 150);
2558 assert!(result.is_some());
2559 assert_eq!(result.unwrap(), 0);
2561
2562 let result = index.probe_at_time(usd_key.as_ref(), 250);
2564 assert_eq!(result.unwrap(), 1);
2565 }
2566
2567 #[test]
2568 fn test_versioned_index_no_version_before_ts() {
2569 let batch = versioned_table_batch();
2570 let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2571
2572 let key_sf = vec![SortField::new(DataType::Utf8)];
2573 let converter = RowConverter::new(key_sf).unwrap();
2574 let eur_col = Arc::new(StringArray::from(vec!["EUR"]));
2575 let eur_rows = converter.convert_columns(&[eur_col]).unwrap();
2576 let eur_key = eur_rows.row(0);
2577
2578 let result = index.probe_at_time(eur_key.as_ref(), 50);
2580 assert!(result.is_none());
2581 }
2582
2583 fn build_versioned_exec(
2585 table: RecordBatch,
2586 stream: &RecordBatch,
2587 join_type: LookupJoinType,
2588 ) -> VersionedLookupJoinExec {
2589 let input = batch_exec(stream.clone());
2590 let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2591 let key_sort_fields = vec![SortField::new(DataType::Utf8)];
2592 let mut output_fields = stream.schema().fields().to_vec();
2593 output_fields.extend(table.schema().fields().iter().cloned());
2594 let output_schema = Arc::new(Schema::new(output_fields));
2595 VersionedLookupJoinExec::try_new(
2596 input,
2597 table,
2598 index,
2599 vec![1], 2, join_type,
2602 output_schema,
2603 key_sort_fields,
2604 )
2605 .unwrap()
2606 }
2607
2608 #[tokio::test]
2609 async fn test_versioned_join_exec_inner() {
2610 let table = versioned_table_batch();
2611 let stream = stream_batch_with_time();
2612 let exec = build_versioned_exec(table, &stream, LookupJoinType::Inner);
2613
2614 let ctx = Arc::new(TaskContext::default());
2615 let stream_out = exec.execute(0, ctx).unwrap();
2616 let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2617
2618 assert_eq!(batches.len(), 1);
2619 let batch = &batches[0];
2620 assert_eq!(batch.num_rows(), 3);
2625
2626 let rates = batch
2627 .column(5) .as_any()
2629 .downcast_ref::<Float64Array>()
2630 .unwrap();
2631 assert!((rates.value(0) - 1.0).abs() < f64::EPSILON); assert!((rates.value(1) - 0.90).abs() < f64::EPSILON); assert!((rates.value(2) - 1.1).abs() < f64::EPSILON); }
2635
2636 #[tokio::test]
2637 async fn test_versioned_join_exec_left_outer() {
2638 let table = versioned_table_batch();
2639 let stream = stream_batch_with_time();
2640 let exec = build_versioned_exec(table, &stream, LookupJoinType::LeftOuter);
2641
2642 let ctx = Arc::new(TaskContext::default());
2643 let stream_out = exec.execute(0, ctx).unwrap();
2644 let batches: Vec<RecordBatch> = stream_out.try_collect().await.unwrap();
2645
2646 assert_eq!(batches.len(), 1);
2647 let batch = &batches[0];
2648 assert_eq!(batch.num_rows(), 4);
2650
2651 let rates = batch
2653 .column(5)
2654 .as_any()
2655 .downcast_ref::<Float64Array>()
2656 .unwrap();
2657 assert!(rates.is_null(3), "EUR@50 should have null rate");
2658 }
2659
2660 #[test]
2661 fn test_versioned_index_empty_batch() {
2662 let schema = Arc::new(Schema::new(vec![
2663 Field::new("k", DataType::Utf8, false),
2664 Field::new("v", DataType::Int64, false),
2665 ]));
2666 let batch = RecordBatch::new_empty(schema);
2667 let index = VersionedIndex::build(&batch, &[0], 1, usize::MAX).unwrap();
2668 assert!(index.map.is_empty());
2669 }
2670
2671 #[test]
2672 fn test_versioned_lookup_registry() {
2673 let registry = LookupTableRegistry::new();
2674 let table = versioned_table_batch();
2675 let index = Arc::new(VersionedIndex::build(&table, &[0], 1, usize::MAX).unwrap());
2676
2677 registry.register_versioned(
2678 "rates",
2679 VersionedLookupState {
2680 batch: table,
2681 index,
2682 key_columns: vec!["currency".to_string()],
2683 version_column: "valid_from".to_string(),
2684 stream_time_column: "event_ts".to_string(),
2685 max_versions_per_key: usize::MAX,
2686 },
2687 );
2688
2689 let entry = registry.get_entry("rates");
2690 assert!(entry.is_some());
2691 assert!(matches!(entry.unwrap(), RegisteredLookup::Versioned(_)));
2692
2693 assert!(registry.get("rates").is_none());
2695 }
2696}