1use super::{
63 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
64 TimerKey,
65};
66use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
67use arrow_schema::{DataType, Field, Schema, SchemaRef};
68use rkyv::{
69 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
70};
71use rustc_hash::FxHashMap;
72use smallvec::SmallVec;
73use std::collections::BTreeMap;
74use std::sync::atomic::{AtomicU64, Ordering};
75use std::sync::Arc;
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
79pub enum TemporalJoinSemantics {
80 #[default]
83 EventTime,
84
85 ProcessTime,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
92pub enum TableCharacteristics {
93 #[default]
96 AppendOnly,
97
98 NonAppendOnly,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
105pub enum TemporalJoinType {
106 #[default]
108 Inner,
109 Left,
111}
112
113impl TemporalJoinType {
114 #[must_use]
116 pub fn emits_unmatched(&self) -> bool {
117 matches!(self, TemporalJoinType::Left)
118 }
119}
120
121#[derive(Debug, Clone)]
123pub struct TemporalJoinConfig {
124 pub stream_key_column: String,
126 pub table_key_column: String,
128 pub table_version_column: String,
130 pub semantics: TemporalJoinSemantics,
132 pub table_characteristics: TableCharacteristics,
134 pub join_type: TemporalJoinType,
136 pub operator_id: Option<String>,
138 pub max_versions_per_key: usize,
140}
141
142impl TemporalJoinConfig {
143 #[must_use]
145 pub fn builder() -> TemporalJoinConfigBuilder {
146 TemporalJoinConfigBuilder::default()
147 }
148}
149
150#[derive(Debug, Default)]
152pub struct TemporalJoinConfigBuilder {
153 stream_key_column: Option<String>,
154 table_key_column: Option<String>,
155 table_version_column: Option<String>,
156 semantics: Option<TemporalJoinSemantics>,
157 table_characteristics: Option<TableCharacteristics>,
158 join_type: Option<TemporalJoinType>,
159 operator_id: Option<String>,
160 max_versions_per_key: Option<usize>,
161}
162
163impl TemporalJoinConfigBuilder {
164 #[must_use]
166 pub fn stream_key_column(mut self, column: String) -> Self {
167 self.stream_key_column = Some(column);
168 self
169 }
170
171 #[must_use]
173 pub fn table_key_column(mut self, column: String) -> Self {
174 self.table_key_column = Some(column);
175 self
176 }
177
178 #[must_use]
180 pub fn table_version_column(mut self, column: String) -> Self {
181 self.table_version_column = Some(column);
182 self
183 }
184
185 #[must_use]
187 pub fn semantics(mut self, semantics: TemporalJoinSemantics) -> Self {
188 self.semantics = Some(semantics);
189 self
190 }
191
192 #[must_use]
194 pub fn table_characteristics(mut self, characteristics: TableCharacteristics) -> Self {
195 self.table_characteristics = Some(characteristics);
196 self
197 }
198
199 #[must_use]
201 pub fn join_type(mut self, join_type: TemporalJoinType) -> Self {
202 self.join_type = Some(join_type);
203 self
204 }
205
206 #[must_use]
208 pub fn operator_id(mut self, id: String) -> Self {
209 self.operator_id = Some(id);
210 self
211 }
212
213 #[must_use]
215 pub fn max_versions_per_key(mut self, max: usize) -> Self {
216 self.max_versions_per_key = Some(max);
217 self
218 }
219
220 pub fn build(self) -> std::result::Result<TemporalJoinConfig, OperatorError> {
226 Ok(TemporalJoinConfig {
227 stream_key_column: self.stream_key_column.ok_or_else(|| {
228 OperatorError::ConfigError("stream_key_column is required".into())
229 })?,
230 table_key_column: self
231 .table_key_column
232 .ok_or_else(|| OperatorError::ConfigError("table_key_column is required".into()))?,
233 table_version_column: self.table_version_column.ok_or_else(|| {
234 OperatorError::ConfigError("table_version_column is required".into())
235 })?,
236 semantics: self.semantics.unwrap_or_default(),
237 table_characteristics: self.table_characteristics.unwrap_or_default(),
238 join_type: self.join_type.unwrap_or_default(),
239 operator_id: self.operator_id,
240 max_versions_per_key: self.max_versions_per_key.unwrap_or(0),
241 })
242 }
243}
244
245const TEMPORAL_TIMER_PREFIX: u8 = 0x60;
247
248static TEMPORAL_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
250
251#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
253pub struct TableRow {
254 pub version_timestamp: i64,
256 pub key_value: Vec<u8>,
258 pub data: Vec<u8>,
260}
261
262impl TableRow {
263 fn new(
265 version_timestamp: i64,
266 key_value: Vec<u8>,
267 batch: &RecordBatch,
268 ) -> Result<Self, OperatorError> {
269 let data = Self::serialize_batch(batch)?;
270 Ok(Self {
271 version_timestamp,
272 key_value,
273 data,
274 })
275 }
276
277 fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
278 Ok(crate::serialization::serialize_batch_stream(batch)?)
279 }
280
281 fn deserialize_batch(data: &[u8]) -> Result<RecordBatch, OperatorError> {
282 Ok(crate::serialization::deserialize_batch_stream(data)?)
283 }
284
285 pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
291 Self::deserialize_batch(&self.data)
292 }
293}
294
295#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
297pub struct JoinedEventRecord {
298 pub event_timestamp: i64,
300 pub event_data: Vec<u8>,
302 pub table_version: i64,
304 pub key_value: Vec<u8>,
306}
307
308#[derive(Debug, Clone, Default)]
310pub struct VersionedKeyState {
311 pub versions: BTreeMap<i64, SmallVec<[TableRow; 1]>>,
314 pub min_version: i64,
316 pub max_version: i64,
318}
319
320impl VersionedKeyState {
321 #[must_use]
323 pub fn new() -> Self {
324 Self {
325 versions: BTreeMap::new(),
326 min_version: i64::MAX,
327 max_version: i64::MIN,
328 }
329 }
330
331 pub fn insert(&mut self, row: TableRow) {
333 let version = row.version_timestamp;
334 self.versions.entry(version).or_default().push(row);
335 self.min_version = self.min_version.min(version);
336 self.max_version = self.max_version.max(version);
337 }
338
339 #[must_use]
341 pub fn len(&self) -> usize {
342 self.versions.values().map(SmallVec::len).sum()
343 }
344
345 #[must_use]
347 pub fn is_empty(&self) -> bool {
348 self.versions.is_empty()
349 }
350
351 #[must_use]
354 pub fn lookup_at_time(&self, timestamp: i64) -> Option<&TableRow> {
355 let (_, rows) = self.versions.range(..=timestamp).next_back()?;
356 rows.last()
357 }
358
359 #[must_use]
361 pub fn lookup_latest(&self) -> Option<&TableRow> {
362 let (_, rows) = self.versions.iter().next_back()?;
363 rows.last()
364 }
365
366 pub fn cleanup_before(&mut self, threshold: i64) {
368 self.versions = self.versions.split_off(&threshold);
369 self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
370 }
371
372 pub fn remove_version(&mut self, version: i64) -> Option<SmallVec<[TableRow; 1]>> {
374 let removed = self.versions.remove(&version);
375 if removed.is_some() {
376 self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
377 self.max_version = self
378 .versions
379 .keys()
380 .next_back()
381 .copied()
382 .unwrap_or(i64::MIN);
383 }
384 removed
385 }
386
387 pub fn limit_versions(&mut self, max_versions: usize) {
389 if max_versions == 0 || self.versions.len() <= max_versions {
390 return;
391 }
392
393 let to_remove = self.versions.len() - max_versions;
394 if let Some(&split_key) = self.versions.keys().nth(to_remove) {
397 self.versions = self.versions.split_off(&split_key);
398 }
399 self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
400 }
401}
402
403#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
405struct SerializableVersionedKeyState {
406 versions: Vec<(i64, Vec<TableRow>)>,
407 min_version: i64,
408 max_version: i64,
409}
410
411impl From<&VersionedKeyState> for SerializableVersionedKeyState {
412 fn from(state: &VersionedKeyState) -> Self {
413 Self {
414 versions: state
415 .versions
416 .iter()
417 .map(|(ts, rows)| (*ts, rows.to_vec()))
418 .collect(),
419 min_version: state.min_version,
420 max_version: state.max_version,
421 }
422 }
423}
424
425impl From<SerializableVersionedKeyState> for VersionedKeyState {
426 fn from(state: SerializableVersionedKeyState) -> Self {
427 let mut versions = BTreeMap::new();
428 for (ts, rows) in state.versions {
429 versions.insert(ts, SmallVec::from_vec(rows));
430 }
431 Self {
432 versions,
433 min_version: state.min_version,
434 max_version: state.max_version,
435 }
436 }
437}
438
439#[derive(Debug, Clone)]
441pub enum TableChange {
442 Insert(TableRow),
444 Update {
446 old: TableRow,
448 new: TableRow,
450 },
451 Delete(TableRow),
453}
454
455#[derive(Debug, Clone, Default)]
457pub struct TemporalJoinMetrics {
458 pub stream_events: u64,
460 pub table_inserts: u64,
462 pub table_updates: u64,
464 pub table_deletes: u64,
466 pub matches: u64,
468 pub unmatched: u64,
470 pub retractions: u64,
472 pub state_cleanups: u64,
474}
475
476impl TemporalJoinMetrics {
477 #[must_use]
479 pub fn new() -> Self {
480 Self::default()
481 }
482
483 pub fn reset(&mut self) {
485 *self = Self::default();
486 }
487}
488
489pub struct TemporalJoinOperator {
495 config: TemporalJoinConfig,
497 operator_id: String,
499 table_state: FxHashMap<Vec<u8>, VersionedKeyState>,
501 stream_state: FxHashMap<Vec<u8>, Vec<JoinedEventRecord>>,
504 watermark: i64,
506 metrics: TemporalJoinMetrics,
508 output_schema: Option<SchemaRef>,
510 stream_schema: Option<SchemaRef>,
512 table_schema: Option<SchemaRef>,
514 stream_key_index: Option<usize>,
516 table_key_index: Option<usize>,
518 table_ts_index: Option<usize>,
520}
521
522impl TemporalJoinOperator {
523 #[must_use]
525 pub fn new(config: TemporalJoinConfig) -> Self {
526 let operator_id = config.operator_id.clone().unwrap_or_else(|| {
527 let num = TEMPORAL_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
528 format!("temporal_join_{num}")
529 });
530
531 Self {
532 config,
533 operator_id,
534 table_state: FxHashMap::default(),
535 stream_state: FxHashMap::default(),
536 watermark: i64::MIN,
537 metrics: TemporalJoinMetrics::new(),
538 output_schema: None,
539 stream_schema: None,
540 table_schema: None,
541 stream_key_index: None,
542 table_key_index: None,
543 table_ts_index: None,
544 }
545 }
546
547 #[must_use]
549 pub fn with_id(mut config: TemporalJoinConfig, operator_id: String) -> Self {
550 config.operator_id = Some(operator_id);
551 Self::new(config)
552 }
553
554 #[must_use]
556 pub fn config(&self) -> &TemporalJoinConfig {
557 &self.config
558 }
559
560 #[must_use]
562 pub fn metrics(&self) -> &TemporalJoinMetrics {
563 &self.metrics
564 }
565
566 pub fn reset_metrics(&mut self) {
568 self.metrics.reset();
569 }
570
571 #[must_use]
573 pub fn watermark(&self) -> i64 {
574 self.watermark
575 }
576
577 #[must_use]
579 pub fn table_state_size(&self) -> usize {
580 self.table_state.values().map(VersionedKeyState::len).sum()
581 }
582
583 #[must_use]
585 pub fn stream_state_size(&self) -> usize {
586 self.stream_state.values().map(Vec::len).sum()
587 }
588
589 pub fn process_stream(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
591 self.metrics.stream_events += 1;
592
593 if self.stream_schema.is_none() {
595 self.stream_schema = Some(event.data.schema());
596 self.update_output_schema();
597 }
598
599 let mut output = OutputVec::new();
600
601 let Some(key_value) = Self::extract_key(
603 &event.data,
604 &self.config.stream_key_column,
605 &mut self.stream_key_index,
606 ) else {
607 return output;
608 };
609
610 let lookup_ts = match self.config.semantics {
612 TemporalJoinSemantics::EventTime => event.timestamp,
613 TemporalJoinSemantics::ProcessTime => ctx.processing_time,
614 };
615
616 if let Some(table_row) = self.lookup_table(&key_value, lookup_ts) {
618 self.metrics.matches += 1;
619
620 if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
622 if let Ok(event_data) = TableRow::serialize_batch(&event.data) {
623 let record = JoinedEventRecord {
624 event_timestamp: event.timestamp,
625 event_data,
626 table_version: table_row.version_timestamp,
627 key_value: key_value.clone(),
628 };
629 self.stream_state.entry(key_value).or_default().push(record);
630 }
631 }
632
633 if let Some(joined) = self.create_joined_event(event, &table_row) {
635 output.push(Output::Event(joined));
636 }
637 } else {
638 self.metrics.unmatched += 1;
639 if self.config.join_type.emits_unmatched() {
640 if let Some(unmatched) = self.create_unmatched_event(event) {
641 output.push(Output::Event(unmatched));
642 }
643 }
644 }
645
646 output
647 }
648
649 pub fn process_table_insert(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
651 self.metrics.table_inserts += 1;
652
653 if self.table_schema.is_none() {
655 self.table_schema = Some(event.data.schema());
656 self.update_output_schema();
657 }
658
659 let Some(key_value) = Self::extract_key(
661 &event.data,
662 &self.config.table_key_column,
663 &mut self.table_key_index,
664 ) else {
665 return OutputVec::new();
666 };
667
668 let version_ts = Self::extract_timestamp(
669 &event.data,
670 &self.config.table_version_column,
671 &mut self.table_ts_index,
672 )
673 .unwrap_or(event.timestamp);
674
675 let Ok(row) = TableRow::new(version_ts, key_value.clone(), &event.data) else {
677 return OutputVec::new();
678 };
679
680 let key_state = self.table_state.entry(key_value).or_default();
681 key_state.insert(row);
682
683 if self.config.max_versions_per_key > 0 {
685 key_state.limit_versions(self.config.max_versions_per_key);
686 }
687
688 OutputVec::new()
689 }
690
691 pub fn process_table_change(
694 &mut self,
695 change: &TableChange,
696 _ctx: &mut OperatorContext,
697 ) -> OutputVec {
698 if self.config.table_characteristics != TableCharacteristics::NonAppendOnly {
699 if let TableChange::Insert(row) = change {
701 let key_state = self.table_state.entry(row.key_value.clone()).or_default();
702 key_state.insert(row.clone());
703 }
704 return OutputVec::new();
705 }
706
707 let mut output = OutputVec::new();
708
709 match change {
710 TableChange::Insert(row) => {
711 self.metrics.table_inserts += 1;
712 let key_state = self.table_state.entry(row.key_value.clone()).or_default();
713 key_state.insert(row.clone());
714 }
715 TableChange::Update { old, new } => {
716 self.metrics.table_updates += 1;
717
718 self.emit_retractions_for_version(
720 &old.key_value,
721 old.version_timestamp,
722 &mut output,
723 );
724
725 if let Some(key_state) = self.table_state.get_mut(&old.key_value) {
727 key_state.remove_version(old.version_timestamp);
728 }
729 let key_state = self.table_state.entry(new.key_value.clone()).or_default();
730 key_state.insert(new.clone());
731
732 self.rejoin_affected_events(&new.key_value, new.version_timestamp, &mut output);
734 }
735 TableChange::Delete(row) => {
736 self.metrics.table_deletes += 1;
737
738 self.emit_retractions_for_version(
740 &row.key_value,
741 row.version_timestamp,
742 &mut output,
743 );
744
745 if let Some(key_state) = self.table_state.get_mut(&row.key_value) {
747 key_state.remove_version(row.version_timestamp);
748 }
749 }
750 }
751
752 output
753 }
754
755 fn emit_retractions_for_version(&mut self, key: &[u8], version: i64, output: &mut OutputVec) {
757 let Some(records) = self.stream_state.get(key) else {
758 return;
759 };
760
761 for record in records {
762 if record.table_version == version {
763 if let Ok(event_batch) = TableRow::deserialize_batch(&record.event_data) {
765 let event = Event::new(record.event_timestamp, event_batch);
766
767 if let Some(key_state) = self.table_state.get(key) {
769 if let Some((_, rows)) = key_state.versions.get_key_value(&version) {
770 if let Some(table_row) = rows.last() {
771 if let Some(joined) = self.create_joined_event(&event, table_row) {
772 output.push(Output::LateEvent(joined));
775 self.metrics.retractions += 1;
776 }
777 }
778 }
779 }
780 }
781 }
782 }
783 }
784
785 fn rejoin_affected_events(&mut self, key: &[u8], new_version: i64, output: &mut OutputVec) {
787 let events_to_rejoin: Vec<(i64, Vec<u8>)> = {
789 let Some(records) = self.stream_state.get(key) else {
790 return;
791 };
792 let Some(key_state) = self.table_state.get(key) else {
793 return;
794 };
795
796 records
797 .iter()
798 .filter_map(|record| {
799 let lookup_ts = record.event_timestamp;
800 if let Some(new_row) = key_state.lookup_at_time(lookup_ts) {
801 if new_row.version_timestamp == new_version {
802 return Some((record.event_timestamp, record.event_data.clone()));
803 }
804 }
805 None
806 })
807 .collect()
808 };
809
810 if let Some(key_state) = self.table_state.get(key) {
812 for (event_ts, event_data) in &events_to_rejoin {
813 if let Ok(event_batch) = TableRow::deserialize_batch(event_data) {
814 let event = Event::new(*event_ts, event_batch);
815 if let Some(new_row) = key_state.lookup_at_time(*event_ts) {
816 if let Some(joined) = self.create_joined_event(&event, new_row) {
817 output.push(Output::Event(joined));
818 }
819 }
820 }
821 }
822 }
823
824 if let Some(records) = self.stream_state.get_mut(key) {
826 for record in records.iter_mut() {
827 if events_to_rejoin
828 .iter()
829 .any(|(ts, _)| *ts == record.event_timestamp)
830 {
831 record.table_version = new_version;
832 }
833 }
834 }
835 }
836
837 fn lookup_table(&self, key: &[u8], timestamp: i64) -> Option<TableRow> {
839 let key_state = self.table_state.get(key)?;
840
841 match self.config.semantics {
842 TemporalJoinSemantics::EventTime => key_state.lookup_at_time(timestamp).cloned(),
843 TemporalJoinSemantics::ProcessTime => key_state.lookup_latest().cloned(),
844 }
845 }
846
847 pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
849 self.watermark = watermark;
850
851 if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
853 self.cleanup_stream_state(watermark);
854 }
855
856 OutputVec::new()
857 }
858
859 fn cleanup_stream_state(&mut self, watermark: i64) {
861 let initial_count: usize = self.stream_state.values().map(Vec::len).sum();
862
863 for records in self.stream_state.values_mut() {
864 records.retain(|r| r.event_timestamp >= watermark);
865 }
866 self.stream_state.retain(|_, v| !v.is_empty());
867
868 let final_count: usize = self.stream_state.values().map(Vec::len).sum();
869 if final_count < initial_count {
870 self.metrics.state_cleanups += (initial_count - final_count) as u64;
871 }
872 }
873
874 fn extract_key(
878 batch: &RecordBatch,
879 column_name: &str,
880 cached_index: &mut Option<usize>,
881 ) -> Option<Vec<u8>> {
882 let column_index = if let Some(idx) = *cached_index {
883 idx
884 } else {
885 let idx = batch.schema().index_of(column_name).ok()?;
886 *cached_index = Some(idx);
887 idx
888 };
889 let column = batch.column(column_index);
890
891 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
892 if string_array.is_empty() || string_array.is_null(0) {
893 return None;
894 }
895 return Some(string_array.value(0).as_bytes().to_vec());
896 }
897
898 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
899 if int_array.is_empty() || int_array.is_null(0) {
900 return None;
901 }
902 return Some(int_array.value(0).to_le_bytes().to_vec());
903 }
904
905 None
906 }
907
908 fn extract_timestamp(
912 batch: &RecordBatch,
913 column_name: &str,
914 cached_index: &mut Option<usize>,
915 ) -> Option<i64> {
916 let column_index = if let Some(idx) = *cached_index {
917 idx
918 } else {
919 let idx = batch.schema().index_of(column_name).ok()?;
920 *cached_index = Some(idx);
921 idx
922 };
923 let column = batch.column(column_index);
924
925 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
926 if int_array.is_empty() || int_array.is_null(0) {
927 return None;
928 }
929 return Some(int_array.value(0));
930 }
931
932 None
933 }
934
935 #[allow(dead_code)]
938 fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
939 let mut key = TimerKey::new();
940 key.push(TEMPORAL_TIMER_PREFIX);
941 key.extend_from_slice(key_suffix);
942 key
943 }
944
945 fn update_output_schema(&mut self) {
947 if let (Some(stream), Some(table)) = (&self.stream_schema, &self.table_schema) {
948 let mut fields: Vec<Field> =
949 Vec::with_capacity(stream.fields().len() + table.fields().len());
950 fields.extend(stream.fields().iter().map(|f| f.as_ref().clone()));
951
952 for field in table.fields() {
954 let name = if stream.field_with_name(field.name()).is_ok() {
955 format!("table_{}", field.name())
956 } else {
957 field.name().clone()
958 };
959 fields.push(Field::new(
960 name,
961 field.data_type().clone(),
962 true, ));
964 }
965
966 self.output_schema = Some(Arc::new(Schema::new(fields)));
967 }
968 }
969
970 fn create_joined_event(&self, stream_event: &Event, table_row: &TableRow) -> Option<Event> {
972 let schema = self.output_schema.as_ref()?;
973 let table_batch = table_row.to_batch().ok()?;
974
975 let stream_cols = stream_event.data.columns();
976 let table_cols = table_batch.columns();
977 let mut columns: Vec<ArrayRef> = Vec::with_capacity(stream_cols.len() + table_cols.len());
978 columns.extend_from_slice(stream_cols);
979 for column in table_cols {
980 columns.push(Arc::clone(column));
981 }
982
983 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
984
985 Some(Event::new(stream_event.timestamp, joined_batch))
986 }
987
988 fn create_unmatched_event(&self, stream_event: &Event) -> Option<Event> {
990 let schema = self.output_schema.as_ref()?;
991 let table_schema = self.table_schema.as_ref()?;
992
993 let num_rows = stream_event.data.num_rows();
994 let stream_cols = stream_event.data.columns();
995 let mut columns: Vec<ArrayRef> =
996 Vec::with_capacity(stream_cols.len() + table_schema.fields().len());
997 columns.extend_from_slice(stream_cols);
998
999 for field in table_schema.fields() {
1001 columns.push(Self::create_null_array(field.data_type(), num_rows));
1002 }
1003
1004 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
1005
1006 Some(Event::new(stream_event.timestamp, joined_batch))
1007 }
1008
1009 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
1011 match data_type {
1012 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
1013 DataType::Float64 => {
1014 use arrow_array::Float64Array;
1015 Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
1016 }
1017 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
1018 }
1019 }
1020}
1021
1022impl Operator for TemporalJoinOperator {
1023 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1024 self.process_stream(event, ctx)
1026 }
1027
1028 fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
1029 if timer.key.first() == Some(&TEMPORAL_TIMER_PREFIX) {
1030 self.cleanup_stream_state(timer.timestamp);
1032 }
1033 OutputVec::new()
1034 }
1035
1036 fn checkpoint(&self) -> OperatorState {
1037 let table_entries: Vec<(Vec<u8>, SerializableVersionedKeyState)> = self
1039 .table_state
1040 .iter()
1041 .map(|(k, v)| (k.clone(), v.into()))
1042 .collect();
1043
1044 let stream_entries: Vec<(Vec<u8>, Vec<JoinedEventRecord>)> = self
1046 .stream_state
1047 .iter()
1048 .map(|(k, v)| (k.clone(), v.clone()))
1049 .collect();
1050
1051 let checkpoint_data = (
1052 self.watermark,
1053 self.metrics.stream_events,
1054 self.metrics.table_inserts,
1055 self.metrics.matches,
1056 self.metrics.unmatched,
1057 self.metrics.retractions,
1058 table_entries,
1059 stream_entries,
1060 );
1061
1062 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
1063 .map(|v| v.to_vec())
1064 .unwrap_or_default();
1065
1066 OperatorState {
1067 operator_id: self.operator_id.clone(),
1068 data,
1069 }
1070 }
1071
1072 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
1073 type CheckpointData = (
1074 i64,
1075 u64,
1076 u64,
1077 u64,
1078 u64,
1079 u64,
1080 Vec<(Vec<u8>, SerializableVersionedKeyState)>,
1081 Vec<(Vec<u8>, Vec<JoinedEventRecord>)>,
1082 );
1083
1084 if state.operator_id != self.operator_id {
1085 return Err(OperatorError::StateAccessFailed(format!(
1086 "Operator ID mismatch: expected {}, got {}",
1087 self.operator_id, state.operator_id
1088 )));
1089 }
1090
1091 let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
1092 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1093 let (
1094 watermark,
1095 stream_events,
1096 table_inserts,
1097 matches,
1098 unmatched,
1099 retractions,
1100 table_entries,
1101 stream_entries,
1102 ) = rkyv::deserialize::<CheckpointData, RkyvError>(archived)
1103 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1104
1105 self.watermark = watermark;
1106 self.metrics.stream_events = stream_events;
1107 self.metrics.table_inserts = table_inserts;
1108 self.metrics.matches = matches;
1109 self.metrics.unmatched = unmatched;
1110 self.metrics.retractions = retractions;
1111
1112 self.table_state.clear();
1114 for (key, serializable) in table_entries {
1115 self.table_state.insert(key, serializable.into());
1116 }
1117
1118 self.stream_state.clear();
1120 for (key, records) in stream_entries {
1121 self.stream_state.insert(key, records);
1122 }
1123
1124 Ok(())
1125 }
1126}
1127
1128#[cfg(test)]
1129#[allow(clippy::cast_precision_loss)]
1130#[allow(clippy::unnecessary_to_owned)]
1131mod tests {
1132 use super::*;
1133 use crate::state::{InMemoryStore, StateStore};
1134 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
1135 use arrow_array::Float64Array;
1136 use arrow_schema::{DataType, Field, Schema};
1137
1138 fn create_order_event(timestamp: i64, currency: &str, amount: f64) -> Event {
1140 let schema = Arc::new(Schema::new(vec![
1141 Field::new("currency", DataType::Utf8, false),
1142 Field::new("amount", DataType::Float64, false),
1143 ]));
1144 let batch = RecordBatch::try_new(
1145 schema,
1146 vec![
1147 Arc::new(StringArray::from(vec![currency])),
1148 Arc::new(Float64Array::from(vec![amount])),
1149 ],
1150 )
1151 .unwrap();
1152 Event::new(timestamp, batch)
1153 }
1154
1155 fn create_rate_event(timestamp: i64, currency: &str, rate: f64, valid_from: i64) -> Event {
1157 let schema = Arc::new(Schema::new(vec![
1158 Field::new("currency", DataType::Utf8, false),
1159 Field::new("rate", DataType::Float64, false),
1160 Field::new("valid_from", DataType::Int64, false),
1161 ]));
1162 let batch = RecordBatch::try_new(
1163 schema,
1164 vec![
1165 Arc::new(StringArray::from(vec![currency])),
1166 Arc::new(Float64Array::from(vec![rate])),
1167 Arc::new(Int64Array::from(vec![valid_from])),
1168 ],
1169 )
1170 .unwrap();
1171 Event::new(timestamp, batch)
1172 }
1173
1174 fn create_test_context<'a>(
1175 timers: &'a mut TimerService,
1176 state: &'a mut dyn StateStore,
1177 watermark_gen: &'a mut dyn WatermarkGenerator,
1178 ) -> OperatorContext<'a> {
1179 OperatorContext {
1180 event_time: 0,
1181 processing_time: 0,
1182 timers,
1183 state,
1184 watermark_generator: watermark_gen,
1185 operator_index: 0,
1186 }
1187 }
1188
1189 #[test]
1190 fn test_temporal_join_semantics_default() {
1191 assert_eq!(
1192 TemporalJoinSemantics::default(),
1193 TemporalJoinSemantics::EventTime
1194 );
1195 }
1196
1197 #[test]
1198 fn test_table_characteristics_default() {
1199 assert_eq!(
1200 TableCharacteristics::default(),
1201 TableCharacteristics::AppendOnly
1202 );
1203 }
1204
1205 #[test]
1206 fn test_temporal_join_type_properties() {
1207 assert!(!TemporalJoinType::Inner.emits_unmatched());
1208 assert!(TemporalJoinType::Left.emits_unmatched());
1209 }
1210
1211 #[test]
1212 fn test_config_builder() {
1213 let config = TemporalJoinConfig::builder()
1214 .stream_key_column("currency".to_string())
1215 .table_key_column("currency".to_string())
1216 .table_version_column("valid_from".to_string())
1217 .semantics(TemporalJoinSemantics::EventTime)
1218 .table_characteristics(TableCharacteristics::AppendOnly)
1219 .join_type(TemporalJoinType::Left)
1220 .max_versions_per_key(100)
1221 .operator_id("test_temporal".to_string())
1222 .build()
1223 .unwrap();
1224
1225 assert_eq!(config.stream_key_column, "currency");
1226 assert_eq!(config.table_key_column, "currency");
1227 assert_eq!(config.table_version_column, "valid_from");
1228 assert_eq!(config.semantics, TemporalJoinSemantics::EventTime);
1229 assert_eq!(
1230 config.table_characteristics,
1231 TableCharacteristics::AppendOnly
1232 );
1233 assert_eq!(config.join_type, TemporalJoinType::Left);
1234 assert_eq!(config.max_versions_per_key, 100);
1235 }
1236
1237 #[test]
1238 fn test_event_time_temporal_join_basic() {
1239 let config = TemporalJoinConfig::builder()
1240 .stream_key_column("currency".to_string())
1241 .table_key_column("currency".to_string())
1242 .table_version_column("valid_from".to_string())
1243 .semantics(TemporalJoinSemantics::EventTime)
1244 .join_type(TemporalJoinType::Inner)
1245 .build()
1246 .unwrap();
1247
1248 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1249
1250 let mut timers = TimerService::new();
1251 let mut state = InMemoryStore::new();
1252 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1253
1254 let rate1 = create_rate_event(500, "USD", 1.1, 500);
1257 {
1258 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1259 operator.process_table_insert(&rate1, &mut ctx);
1260 }
1261
1262 let rate2 = create_rate_event(800, "USD", 1.2, 800);
1264 {
1265 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1266 operator.process_table_insert(&rate2, &mut ctx);
1267 }
1268
1269 let rate3 = create_rate_event(1200, "USD", 1.3, 1200);
1271 {
1272 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1273 operator.process_table_insert(&rate3, &mut ctx);
1274 }
1275
1276 let order = create_order_event(1000, "USD", 100.0);
1278 let outputs = {
1279 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1280 operator.process_stream(&order, &mut ctx)
1281 };
1282
1283 assert_eq!(
1284 outputs
1285 .iter()
1286 .filter(|o| matches!(o, Output::Event(_)))
1287 .count(),
1288 1
1289 );
1290 assert_eq!(operator.metrics().matches, 1);
1291
1292 if let Some(Output::Event(event)) = outputs.first() {
1294 assert_eq!(event.data.num_columns(), 5); }
1296 }
1297
1298 #[test]
1299 fn test_event_time_multiple_versions() {
1300 let config = TemporalJoinConfig::builder()
1301 .stream_key_column("currency".to_string())
1302 .table_key_column("currency".to_string())
1303 .table_version_column("valid_from".to_string())
1304 .semantics(TemporalJoinSemantics::EventTime)
1305 .join_type(TemporalJoinType::Inner)
1306 .build()
1307 .unwrap();
1308
1309 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1310
1311 let mut timers = TimerService::new();
1312 let mut state = InMemoryStore::new();
1313 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1314
1315 {
1317 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1318 operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1319 operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1320 operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1321 }
1322
1323 let order1 = create_order_event(150, "USD", 100.0);
1325 let outputs1 = {
1326 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1327 operator.process_stream(&order1, &mut ctx)
1328 };
1329 assert_eq!(
1330 outputs1
1331 .iter()
1332 .filter(|o| matches!(o, Output::Event(_)))
1333 .count(),
1334 1
1335 );
1336
1337 let order2 = create_order_event(250, "USD", 100.0);
1339 let outputs2 = {
1340 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1341 operator.process_stream(&order2, &mut ctx)
1342 };
1343 assert_eq!(
1344 outputs2
1345 .iter()
1346 .filter(|o| matches!(o, Output::Event(_)))
1347 .count(),
1348 1
1349 );
1350
1351 let order3 = create_order_event(350, "USD", 100.0);
1353 let outputs3 = {
1354 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1355 operator.process_stream(&order3, &mut ctx)
1356 };
1357 assert_eq!(
1358 outputs3
1359 .iter()
1360 .filter(|o| matches!(o, Output::Event(_)))
1361 .count(),
1362 1
1363 );
1364
1365 assert_eq!(operator.metrics().matches, 3);
1366 }
1367
1368 #[test]
1369 fn test_no_match_before_first_version() {
1370 let config = TemporalJoinConfig::builder()
1371 .stream_key_column("currency".to_string())
1372 .table_key_column("currency".to_string())
1373 .table_version_column("valid_from".to_string())
1374 .semantics(TemporalJoinSemantics::EventTime)
1375 .join_type(TemporalJoinType::Inner)
1376 .build()
1377 .unwrap();
1378
1379 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1380
1381 let mut timers = TimerService::new();
1382 let mut state = InMemoryStore::new();
1383 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1384
1385 {
1387 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1388 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1389 }
1390
1391 let order = create_order_event(400, "USD", 100.0);
1393 let outputs = {
1394 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1395 operator.process_stream(&order, &mut ctx)
1396 };
1397
1398 assert_eq!(outputs.len(), 0);
1400 assert_eq!(operator.metrics().unmatched, 1);
1401 }
1402
1403 #[test]
1404 fn test_left_join_no_match() {
1405 let config = TemporalJoinConfig::builder()
1406 .stream_key_column("currency".to_string())
1407 .table_key_column("currency".to_string())
1408 .table_version_column("valid_from".to_string())
1409 .semantics(TemporalJoinSemantics::EventTime)
1410 .join_type(TemporalJoinType::Left)
1411 .build()
1412 .unwrap();
1413
1414 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1415
1416 let mut timers = TimerService::new();
1417 let mut state = InMemoryStore::new();
1418 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1419
1420 {
1422 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1423 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1424 }
1425 {
1426 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1427 operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1428 }
1429
1430 let order = create_order_event(700, "EUR", 100.0);
1432 let outputs = {
1433 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1434 operator.process_stream(&order, &mut ctx)
1435 };
1436
1437 assert_eq!(
1439 outputs
1440 .iter()
1441 .filter(|o| matches!(o, Output::Event(_)))
1442 .count(),
1443 1
1444 );
1445 assert_eq!(operator.metrics().unmatched, 1);
1446
1447 if let Some(Output::Event(event)) = outputs.first() {
1448 assert_eq!(event.data.num_columns(), 5); }
1450 }
1451
1452 #[test]
1453 fn test_process_time_semantics() {
1454 let config = TemporalJoinConfig::builder()
1455 .stream_key_column("currency".to_string())
1456 .table_key_column("currency".to_string())
1457 .table_version_column("valid_from".to_string())
1458 .semantics(TemporalJoinSemantics::ProcessTime) .join_type(TemporalJoinType::Inner)
1460 .build()
1461 .unwrap();
1462
1463 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1464
1465 let mut timers = TimerService::new();
1466 let mut state = InMemoryStore::new();
1467 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1468
1469 {
1471 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1472 operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1473 operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1474 operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1475 }
1476
1477 let order = create_order_event(150, "USD", 100.0);
1480 let outputs = {
1481 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1482 ctx.processing_time = 1000; operator.process_stream(&order, &mut ctx)
1484 };
1485
1486 assert_eq!(
1487 outputs
1488 .iter()
1489 .filter(|o| matches!(o, Output::Event(_)))
1490 .count(),
1491 1
1492 );
1493 }
1494
1495 #[test]
1496 fn test_append_only_no_stream_state() {
1497 let config = TemporalJoinConfig::builder()
1498 .stream_key_column("currency".to_string())
1499 .table_key_column("currency".to_string())
1500 .table_version_column("valid_from".to_string())
1501 .table_characteristics(TableCharacteristics::AppendOnly)
1502 .build()
1503 .unwrap();
1504
1505 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1506
1507 let mut timers = TimerService::new();
1508 let mut state = InMemoryStore::new();
1509 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1510
1511 {
1513 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1514 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1515 }
1516
1517 let order = create_order_event(600, "USD", 100.0);
1519 {
1520 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1521 operator.process_stream(&order, &mut ctx);
1522 }
1523
1524 assert_eq!(operator.stream_state_size(), 0);
1526 }
1527
1528 #[test]
1529 fn test_non_append_only_tracks_stream_state() {
1530 let config = TemporalJoinConfig::builder()
1531 .stream_key_column("currency".to_string())
1532 .table_key_column("currency".to_string())
1533 .table_version_column("valid_from".to_string())
1534 .table_characteristics(TableCharacteristics::NonAppendOnly)
1535 .build()
1536 .unwrap();
1537
1538 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1539
1540 let mut timers = TimerService::new();
1541 let mut state = InMemoryStore::new();
1542 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1543
1544 {
1546 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1547 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1548 }
1549
1550 let order = create_order_event(600, "USD", 100.0);
1552 {
1553 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1554 operator.process_stream(&order, &mut ctx);
1555 }
1556
1557 assert_eq!(operator.stream_state_size(), 1);
1559 }
1560
1561 #[test]
1562 fn test_table_delete_emits_retraction() {
1563 let config = TemporalJoinConfig::builder()
1564 .stream_key_column("currency".to_string())
1565 .table_key_column("currency".to_string())
1566 .table_version_column("valid_from".to_string())
1567 .table_characteristics(TableCharacteristics::NonAppendOnly)
1568 .build()
1569 .unwrap();
1570
1571 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1572
1573 let mut timers = TimerService::new();
1574 let mut state = InMemoryStore::new();
1575 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1576
1577 let rate = create_rate_event(500, "USD", 1.1, 500);
1579 {
1580 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1581 operator.process_table_insert(&rate, &mut ctx);
1582 }
1583
1584 let order = create_order_event(600, "USD", 100.0);
1585 {
1586 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1587 operator.process_stream(&order, &mut ctx);
1588 }
1589
1590 let table_row = TableRow::new(500, b"USD".to_vec(), &rate.data).unwrap();
1592
1593 let change = TableChange::Delete(table_row);
1595 let outputs = {
1596 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1597 operator.process_table_change(&change, &mut ctx)
1598 };
1599
1600 assert_eq!(operator.metrics().table_deletes, 1);
1602 assert!(
1603 operator.metrics().retractions >= 1
1604 || outputs.iter().any(|o| matches!(o, Output::LateEvent(_)))
1605 );
1606 }
1607
1608 #[test]
1609 fn test_multiple_keys() {
1610 let config = TemporalJoinConfig::builder()
1611 .stream_key_column("currency".to_string())
1612 .table_key_column("currency".to_string())
1613 .table_version_column("valid_from".to_string())
1614 .build()
1615 .unwrap();
1616
1617 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1618
1619 let mut timers = TimerService::new();
1620 let mut state = InMemoryStore::new();
1621 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1622
1623 {
1625 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1626 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1627 operator.process_table_insert(&create_rate_event(500, "EUR", 0.9, 500), &mut ctx);
1628 operator.process_table_insert(&create_rate_event(500, "GBP", 0.8, 500), &mut ctx);
1629 }
1630
1631 {
1633 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1634 let outputs1 =
1635 operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1636 assert_eq!(
1637 outputs1
1638 .iter()
1639 .filter(|o| matches!(o, Output::Event(_)))
1640 .count(),
1641 1
1642 );
1643 }
1644
1645 {
1646 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1647 let outputs2 =
1648 operator.process_stream(&create_order_event(600, "EUR", 100.0), &mut ctx);
1649 assert_eq!(
1650 outputs2
1651 .iter()
1652 .filter(|o| matches!(o, Output::Event(_)))
1653 .count(),
1654 1
1655 );
1656 }
1657
1658 assert_eq!(operator.metrics().matches, 2);
1659 }
1660
1661 #[test]
1662 fn test_max_versions_per_key() {
1663 let config = TemporalJoinConfig::builder()
1664 .stream_key_column("currency".to_string())
1665 .table_key_column("currency".to_string())
1666 .table_version_column("valid_from".to_string())
1667 .max_versions_per_key(2) .build()
1669 .unwrap();
1670
1671 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1672
1673 let mut timers = TimerService::new();
1674 let mut state = InMemoryStore::new();
1675 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1676
1677 for i in 0..5 {
1679 let rate =
1680 create_rate_event(100 * (i + 1), "USD", 1.0 + (i as f64) * 0.1, 100 * (i + 1));
1681 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1682 operator.process_table_insert(&rate, &mut ctx);
1683 }
1684
1685 assert_eq!(operator.table_state_size(), 2);
1687
1688 let key_state = operator.table_state.get(&b"USD".to_vec()).unwrap();
1690 assert!(key_state.lookup_at_time(400).is_some());
1691 assert!(key_state.lookup_at_time(500).is_some());
1692 assert!(key_state.lookup_at_time(100).is_none());
1694 }
1695
1696 #[test]
1697 fn test_checkpoint_restore() {
1698 let config = TemporalJoinConfig::builder()
1699 .stream_key_column("currency".to_string())
1700 .table_key_column("currency".to_string())
1701 .table_version_column("valid_from".to_string())
1702 .table_characteristics(TableCharacteristics::NonAppendOnly)
1703 .build()
1704 .unwrap();
1705
1706 let mut operator =
1707 TemporalJoinOperator::with_id(config.clone(), "test_temporal".to_string());
1708
1709 let mut timers = TimerService::new();
1710 let mut state = InMemoryStore::new();
1711 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1712
1713 {
1715 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1716 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1717 operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1718 }
1719
1720 {
1721 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1722 operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1723 }
1724
1725 operator.watermark = 500;
1726 operator.metrics.matches = 10;
1727 operator.metrics.retractions = 2;
1728
1729 let checkpoint = operator.checkpoint();
1731
1732 let mut restored = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1734 restored.restore(checkpoint).unwrap();
1735
1736 assert_eq!(restored.watermark(), 500);
1738 assert_eq!(restored.metrics().matches, 10);
1739 assert_eq!(restored.metrics().retractions, 2);
1740 assert_eq!(restored.table_state_size(), 2);
1741 assert_eq!(restored.stream_state_size(), 1);
1742 }
1743
1744 #[test]
1745 fn test_schema_composition() {
1746 let config = TemporalJoinConfig::builder()
1747 .stream_key_column("currency".to_string())
1748 .table_key_column("currency".to_string())
1749 .table_version_column("valid_from".to_string())
1750 .build()
1751 .unwrap();
1752
1753 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1754
1755 let mut timers = TimerService::new();
1756 let mut state = InMemoryStore::new();
1757 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1758
1759 {
1761 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1762 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1763 }
1764
1765 let order = create_order_event(600, "USD", 100.0);
1767 let outputs = {
1768 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1769 operator.process_stream(&order, &mut ctx)
1770 };
1771
1772 assert_eq!(outputs.len(), 1);
1773
1774 if let Some(Output::Event(event)) = outputs.first() {
1775 let schema = event.data.schema();
1776
1777 assert!(schema.field_with_name("amount").is_ok());
1779
1780 assert!(schema.field_with_name("table_currency").is_ok());
1782 assert!(schema.field_with_name("rate").is_ok());
1783 assert!(schema.field_with_name("valid_from").is_ok());
1784 }
1785 }
1786
1787 #[test]
1788 fn test_metrics_tracking() {
1789 let config = TemporalJoinConfig::builder()
1790 .stream_key_column("currency".to_string())
1791 .table_key_column("currency".to_string())
1792 .table_version_column("valid_from".to_string())
1793 .build()
1794 .unwrap();
1795
1796 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1797
1798 let mut timers = TimerService::new();
1799 let mut state = InMemoryStore::new();
1800 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1801
1802 {
1804 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1805 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1806 operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1807 }
1808
1809 {
1810 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1811 operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1812 operator.process_stream(&create_order_event(650, "USD", 200.0), &mut ctx);
1813 }
1814
1815 assert_eq!(operator.metrics().table_inserts, 2);
1816 assert_eq!(operator.metrics().stream_events, 2);
1817 assert_eq!(operator.metrics().matches, 2);
1818 }
1819
1820 #[test]
1821 fn test_versioned_key_state_operations() {
1822 let mut key_state = VersionedKeyState::new();
1823 assert!(key_state.is_empty());
1824
1825 let row1 = TableRow {
1827 version_timestamp: 100,
1828 key_value: b"test".to_vec(),
1829 data: vec![],
1830 };
1831 let row2 = TableRow {
1832 version_timestamp: 200,
1833 key_value: b"test".to_vec(),
1834 data: vec![],
1835 };
1836 let row3 = TableRow {
1837 version_timestamp: 300,
1838 key_value: b"test".to_vec(),
1839 data: vec![],
1840 };
1841
1842 key_state.insert(row1);
1843 key_state.insert(row2);
1844 key_state.insert(row3);
1845
1846 assert_eq!(key_state.len(), 3);
1847 assert_eq!(key_state.min_version, 100);
1848 assert_eq!(key_state.max_version, 300);
1849
1850 assert!(key_state.lookup_at_time(50).is_none()); assert_eq!(
1853 key_state.lookup_at_time(150).unwrap().version_timestamp,
1854 100
1855 );
1856 assert_eq!(
1857 key_state.lookup_at_time(250).unwrap().version_timestamp,
1858 200
1859 );
1860 assert_eq!(
1861 key_state.lookup_at_time(350).unwrap().version_timestamp,
1862 300
1863 );
1864
1865 assert_eq!(key_state.lookup_latest().unwrap().version_timestamp, 300);
1867
1868 key_state.cleanup_before(200);
1870 assert_eq!(key_state.len(), 2);
1871 assert_eq!(key_state.min_version, 200);
1872
1873 key_state.remove_version(200);
1875 assert_eq!(key_state.len(), 1);
1876 }
1877
1878 #[test]
1879 fn test_version_limiting() {
1880 let mut key_state = VersionedKeyState::new();
1881
1882 for i in 0..10 {
1884 key_state.insert(TableRow {
1885 version_timestamp: 100 * (i + 1),
1886 key_value: b"test".to_vec(),
1887 data: vec![],
1888 });
1889 }
1890
1891 assert_eq!(key_state.len(), 10);
1892
1893 key_state.limit_versions(3);
1895 assert_eq!(key_state.len(), 3);
1896
1897 assert!(key_state.lookup_at_time(700).is_none());
1899 assert!(key_state.lookup_at_time(800).is_some());
1900 }
1901
1902 #[test]
1903 fn test_metrics_reset() {
1904 let mut metrics = TemporalJoinMetrics::new();
1905 metrics.stream_events = 100;
1906 metrics.matches = 50;
1907 metrics.retractions = 5;
1908
1909 metrics.reset();
1910
1911 assert_eq!(metrics.stream_events, 0);
1912 assert_eq!(metrics.matches, 0);
1913 assert_eq!(metrics.retractions, 0);
1914 }
1915
1916 #[test]
1917 fn test_table_row_serialization() {
1918 let schema = Arc::new(Schema::new(vec![
1919 Field::new("currency", DataType::Utf8, false),
1920 Field::new("rate", DataType::Float64, false),
1921 ]));
1922 let batch = RecordBatch::try_new(
1923 schema,
1924 vec![
1925 Arc::new(StringArray::from(vec!["USD"])),
1926 Arc::new(Float64Array::from(vec![1.25])),
1927 ],
1928 )
1929 .unwrap();
1930
1931 let row = TableRow::new(1000, b"USD".to_vec(), &batch).unwrap();
1932
1933 let restored_batch = row.to_batch().unwrap();
1935 assert_eq!(restored_batch.num_rows(), 1);
1936 assert_eq!(restored_batch.num_columns(), 2);
1937 }
1938
1939 #[test]
1940 fn test_stream_state_cleanup() {
1941 let config = TemporalJoinConfig::builder()
1942 .stream_key_column("currency".to_string())
1943 .table_key_column("currency".to_string())
1944 .table_version_column("valid_from".to_string())
1945 .table_characteristics(TableCharacteristics::NonAppendOnly)
1946 .build()
1947 .unwrap();
1948
1949 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1950
1951 let mut timers = TimerService::new();
1952 let mut state = InMemoryStore::new();
1953 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1954
1955 {
1957 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1958 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1959 }
1960
1961 for i in 0..5 {
1963 let order = create_order_event(600 + i * 100, "USD", 100.0);
1964 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1965 operator.process_stream(&order, &mut ctx);
1966 }
1967
1968 assert_eq!(operator.stream_state_size(), 5);
1969
1970 {
1972 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1973 operator.on_watermark(900, &mut ctx);
1974 }
1975
1976 assert!(operator.stream_state_size() < 5);
1978 assert!(operator.metrics().state_cleanups > 0);
1979 }
1980}