1use super::{
80 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
81 TimerKey,
82};
83use crate::state::{StateStore, StateStoreExt};
84use arrow_array::{Array, ArrayRef, Float64Array, Int64Array, RecordBatch, StringArray};
85use arrow_schema::{DataType, Field, Schema, SchemaRef};
86use bytes::Bytes;
87use rkyv::{
88 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
89};
90use rustc_hash::FxHashMap;
91use smallvec::SmallVec;
92use std::hash::{Hash, Hasher};
93use std::sync::atomic::{AtomicU64, Ordering};
94use std::sync::Arc;
95use std::time::Duration;
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum JoinType {
100 #[default]
102 Inner,
103 Left,
105 Right,
107 Full,
109 LeftSemi,
111 LeftAnti,
113 RightSemi,
115 RightAnti,
117}
118
119impl JoinType {
120 #[must_use]
122 pub fn emits_unmatched_left(&self) -> bool {
123 matches!(self, JoinType::Left | JoinType::Full | JoinType::LeftAnti)
124 }
125
126 #[must_use]
128 pub fn emits_unmatched_right(&self) -> bool {
129 matches!(self, JoinType::Right | JoinType::Full | JoinType::RightAnti)
130 }
131
132 #[must_use]
134 pub fn is_semi(&self) -> bool {
135 matches!(self, JoinType::LeftSemi | JoinType::RightSemi)
136 }
137
138 #[must_use]
140 pub fn is_anti(&self) -> bool {
141 matches!(self, JoinType::LeftAnti | JoinType::RightAnti)
142 }
143
144 #[must_use]
146 pub fn kept_side(&self) -> Option<JoinSide> {
147 match self {
148 JoinType::LeftSemi | JoinType::LeftAnti => Some(JoinSide::Left),
149 JoinType::RightSemi | JoinType::RightAnti => Some(JoinSide::Right),
150 _ => None,
151 }
152 }
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum JoinSide {
158 Left,
160 Right,
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
171pub enum JoinRowEncoding {
172 #[default]
178 Compact,
179
180 CpuFriendly,
186}
187
188impl std::fmt::Display for JoinRowEncoding {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 match self {
191 Self::Compact => write!(f, "compact"),
192 Self::CpuFriendly => write!(f, "cpu_friendly"),
193 }
194 }
195}
196
197impl std::str::FromStr for JoinRowEncoding {
198 type Err = String;
199
200 fn from_str(s: &str) -> Result<Self, Self::Err> {
201 match s.to_lowercase().as_str() {
202 "compact" => Ok(Self::Compact),
203 "cpu_friendly" | "cpufriendly" | "cpu-friendly" => Ok(Self::CpuFriendly),
204 _ => Err(format!(
205 "Unknown encoding: {s}. Expected 'compact' or 'cpu_friendly'"
206 )),
207 }
208 }
209}
210
211#[derive(Debug, Clone)]
232pub struct StreamJoinConfig {
233 pub left_key_column: String,
235 pub right_key_column: String,
237 pub time_bound_ms: i64,
239 pub join_type: JoinType,
241 pub operator_id: Option<String>,
243
244 pub row_encoding: JoinRowEncoding,
247 pub asymmetric_compaction: bool,
249 pub idle_threshold_ms: i64,
251 pub per_key_tracking: bool,
253 pub key_idle_threshold_ms: i64,
255 pub build_side_pruning: bool,
257 pub build_side: Option<JoinSide>,
259}
260
261impl Default for StreamJoinConfig {
262 fn default() -> Self {
263 Self {
264 left_key_column: String::new(),
265 right_key_column: String::new(),
266 time_bound_ms: 0,
267 join_type: JoinType::Inner,
268 operator_id: None,
269 row_encoding: JoinRowEncoding::Compact,
270 asymmetric_compaction: true,
271 idle_threshold_ms: 60_000, per_key_tracking: true,
273 key_idle_threshold_ms: 300_000, build_side_pruning: true,
275 build_side: None,
276 }
277 }
278}
279
280impl StreamJoinConfig {
281 #[must_use]
283 pub fn builder() -> StreamJoinConfigBuilder {
284 StreamJoinConfigBuilder::default()
285 }
286}
287
288#[derive(Debug, Default)]
290pub struct StreamJoinConfigBuilder {
291 config: StreamJoinConfig,
292}
293
294impl StreamJoinConfigBuilder {
295 #[must_use]
297 pub fn left_key_column(mut self, column: impl Into<String>) -> Self {
298 self.config.left_key_column = column.into();
299 self
300 }
301
302 #[must_use]
304 pub fn right_key_column(mut self, column: impl Into<String>) -> Self {
305 self.config.right_key_column = column.into();
306 self
307 }
308
309 #[must_use]
311 #[allow(clippy::cast_possible_truncation)] pub fn time_bound(mut self, duration: Duration) -> Self {
313 self.config.time_bound_ms = duration.as_millis() as i64;
314 self
315 }
316
317 #[must_use]
319 pub fn time_bound_ms(mut self, ms: i64) -> Self {
320 self.config.time_bound_ms = ms;
321 self
322 }
323
324 #[must_use]
326 pub fn join_type(mut self, join_type: JoinType) -> Self {
327 self.config.join_type = join_type;
328 self
329 }
330
331 #[must_use]
333 pub fn operator_id(mut self, id: impl Into<String>) -> Self {
334 self.config.operator_id = Some(id.into());
335 self
336 }
337
338 #[must_use]
340 pub fn row_encoding(mut self, encoding: JoinRowEncoding) -> Self {
341 self.config.row_encoding = encoding;
342 self
343 }
344
345 #[must_use]
347 pub fn asymmetric_compaction(mut self, enabled: bool) -> Self {
348 self.config.asymmetric_compaction = enabled;
349 self
350 }
351
352 #[must_use]
354 #[allow(clippy::cast_possible_truncation)] pub fn idle_threshold(mut self, duration: Duration) -> Self {
356 self.config.idle_threshold_ms = duration.as_millis() as i64;
357 self
358 }
359
360 #[must_use]
362 pub fn per_key_tracking(mut self, enabled: bool) -> Self {
363 self.config.per_key_tracking = enabled;
364 self
365 }
366
367 #[must_use]
369 #[allow(clippy::cast_possible_truncation)] pub fn key_idle_threshold(mut self, duration: Duration) -> Self {
371 self.config.key_idle_threshold_ms = duration.as_millis() as i64;
372 self
373 }
374
375 #[must_use]
377 pub fn build_side_pruning(mut self, enabled: bool) -> Self {
378 self.config.build_side_pruning = enabled;
379 self
380 }
381
382 #[must_use]
384 pub fn build_side(mut self, side: JoinSide) -> Self {
385 self.config.build_side = Some(side);
386 self
387 }
388
389 pub fn build(self) -> std::result::Result<StreamJoinConfig, OperatorError> {
395 if self.config.left_key_column.is_empty() {
396 return Err(OperatorError::ConfigError(
397 "left_key_column is required".into(),
398 ));
399 }
400 if self.config.right_key_column.is_empty() {
401 return Err(OperatorError::ConfigError(
402 "right_key_column is required".into(),
403 ));
404 }
405 Ok(self.config)
406 }
407}
408
409#[derive(Debug, Clone, Default)]
411pub struct SideStats {
412 pub events_received: u64,
414 pub events_this_window: u64,
416 pub last_event_time: i64,
418 pub write_rate: f64,
420 window_start: i64,
422}
423
424impl SideStats {
425 #[must_use]
427 pub fn new() -> Self {
428 Self::default()
429 }
430
431 #[allow(clippy::cast_precision_loss)]
433 pub fn record_event(&mut self, processing_time: i64) {
434 self.events_received += 1;
435 self.events_this_window += 1;
436 self.last_event_time = processing_time;
437
438 if self.window_start == 0 {
440 self.window_start = processing_time;
441 } else {
442 let elapsed_ms = processing_time - self.window_start;
443 if elapsed_ms >= 1000 {
444 self.write_rate = (self.events_this_window as f64 * 1000.0) / elapsed_ms as f64;
446 self.events_this_window = 0;
447 self.window_start = processing_time;
448 }
449 }
450 }
451
452 #[must_use]
454 pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
455 if self.events_received == 0 {
456 return false; }
458 let time_since_last = current_time - self.last_event_time;
459 time_since_last > threshold_ms && self.events_this_window == 0
460 }
461}
462
463#[derive(Debug, Clone)]
465pub struct KeyMetadata {
466 pub last_activity: i64,
468 pub event_count: u64,
470 pub state_entries: u64,
472}
473
474impl KeyMetadata {
475 #[must_use]
477 pub fn new(processing_time: i64) -> Self {
478 Self {
479 last_activity: processing_time,
480 event_count: 1,
481 state_entries: 1,
482 }
483 }
484
485 pub fn record_event(&mut self, processing_time: i64) {
487 self.last_activity = processing_time;
488 self.event_count += 1;
489 self.state_entries += 1;
490 }
491
492 pub fn decrement_entries(&mut self) {
494 self.state_entries = self.state_entries.saturating_sub(1);
495 }
496
497 #[must_use]
499 pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
500 current_time - self.last_activity > threshold_ms
501 }
502}
503
504const LEFT_STATE_PREFIX: &[u8; 4] = b"sjl:";
506const RIGHT_STATE_PREFIX: &[u8; 4] = b"sjr:";
507
508const LEFT_TIMER_PREFIX: u8 = 0x10;
510const RIGHT_TIMER_PREFIX: u8 = 0x20;
512const UNMATCHED_TIMER_PREFIX: u8 = 0x30;
514const MATCHED_FLAG_PREFIX: u8 = 0x40;
516
517#[inline]
519fn matched_flag_key(state_key: &[u8]) -> [u8; 29] {
520 debug_assert_eq!(state_key.len(), 28, "join state keys are always 28 bytes");
521 let mut k = [0u8; 29];
522 k[0] = MATCHED_FLAG_PREFIX;
523 k[1..].copy_from_slice(&state_key[..28]);
524 k
525}
526
527#[inline]
529fn mark_matched(state: &mut dyn StateStore, state_key: &[u8]) {
530 let k = matched_flag_key(state_key);
531 let _ = state.put(&k, Bytes::from_static(&[1]));
532}
533
534#[inline]
536fn is_matched(state: &dyn StateStore, state_key: &[u8]) -> bool {
537 let k = matched_flag_key(state_key);
538 state.get_ref(&k).is_some()
539}
540
541static JOIN_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
543
544static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
546
547#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
549pub struct JoinRow {
550 pub timestamp: i64,
552 pub key_value: Vec<u8>,
554 pub data: Vec<u8>,
556 pub matched: bool,
558 encoding: u8,
561}
562
563const CPU_FRIENDLY_MAGIC: [u8; 4] = *b"CPUF";
565
566#[repr(u8)]
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569enum CpuFriendlyType {
570 Null = 0,
571 Int64 = 1,
572 Float64 = 2,
573 Utf8 = 3,
574}
575
576impl JoinRow {
577 #[cfg(test)]
580 fn new(timestamp: i64, key_value: Vec<u8>, batch: &RecordBatch) -> Result<Self, OperatorError> {
581 Self::with_encoding(timestamp, key_value, batch, JoinRowEncoding::Compact)
582 }
583
584 fn with_encoding(
586 timestamp: i64,
587 key_value: Vec<u8>,
588 batch: &RecordBatch,
589 encoding: JoinRowEncoding,
590 ) -> Result<Self, OperatorError> {
591 let (data, encoding_byte) = match encoding {
592 JoinRowEncoding::Compact => (Self::serialize_compact(batch)?, 0),
593 JoinRowEncoding::CpuFriendly => (Self::serialize_cpu_friendly(batch)?, 1),
594 };
595 Ok(Self {
596 timestamp,
597 key_value,
598 data,
599 matched: false,
600 encoding: encoding_byte,
601 })
602 }
603
604 fn serialize_compact(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
605 Ok(crate::serialization::serialize_batch_stream(batch)?)
606 }
607
608 #[allow(clippy::cast_possible_truncation)] fn serialize_cpu_friendly(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
625 let schema = batch.schema();
626 let num_rows = batch.num_rows();
627 let num_cols = batch.num_columns();
628
629 let mut buf = Vec::with_capacity(4 + 2 + 4 + num_cols * 64 + num_rows * num_cols * 8);
631
632 buf.extend_from_slice(&CPU_FRIENDLY_MAGIC);
634 buf.extend_from_slice(&(num_cols as u16).to_le_bytes());
635 buf.extend_from_slice(&(num_rows as u32).to_le_bytes());
636
637 for (i, field) in schema.fields().iter().enumerate() {
639 let column = batch.column(i);
640
641 let name_bytes = field.name().as_bytes();
643 buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
644 buf.extend_from_slice(name_bytes);
645
646 buf.push(u8::from(field.is_nullable()));
648
649 match field.data_type() {
651 DataType::Int64 => {
652 buf.push(CpuFriendlyType::Int64 as u8);
653 Self::write_int64_column(&mut buf, column, num_rows)?;
654 }
655 DataType::Float64 => {
656 buf.push(CpuFriendlyType::Float64 as u8);
657 Self::write_float64_column(&mut buf, column, num_rows)?;
658 }
659 DataType::Utf8 => {
660 buf.push(CpuFriendlyType::Utf8 as u8);
661 Self::write_utf8_column(&mut buf, column, num_rows)?;
662 }
663 _ => {
664 buf.push(CpuFriendlyType::Null as u8);
666 }
667 }
668 }
669
670 Ok(buf)
671 }
672
673 fn write_int64_column(
675 buf: &mut Vec<u8>,
676 column: &ArrayRef,
677 num_rows: usize,
678 ) -> Result<(), OperatorError> {
679 let arr = column
680 .as_any()
681 .downcast_ref::<Int64Array>()
682 .ok_or_else(|| OperatorError::SerializationFailed("Expected Int64Array".into()))?;
683
684 let validity_bytes = num_rows.div_ceil(8);
686 if let Some(nulls) = arr.nulls() {
687 let buffer = nulls.buffer();
689 let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
690 buf.extend_from_slice(slice);
691 for _ in slice.len()..validity_bytes {
693 buf.push(0xFF);
694 }
695 } else {
696 buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
698 }
699
700 let values = arr.values();
704 let value_bytes =
705 unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
706 buf.extend_from_slice(value_bytes);
707
708 Ok(())
709 }
710
711 fn write_float64_column(
713 buf: &mut Vec<u8>,
714 column: &ArrayRef,
715 num_rows: usize,
716 ) -> Result<(), OperatorError> {
717 let arr = column
718 .as_any()
719 .downcast_ref::<Float64Array>()
720 .ok_or_else(|| OperatorError::SerializationFailed("Expected Float64Array".into()))?;
721
722 let validity_bytes = num_rows.div_ceil(8);
724 if let Some(nulls) = arr.nulls() {
725 let buffer = nulls.buffer();
726 let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
727 buf.extend_from_slice(slice);
728 for _ in slice.len()..validity_bytes {
729 buf.push(0xFF);
730 }
731 } else {
732 buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
733 }
734
735 let values = arr.values();
738 let value_bytes =
739 unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
740 buf.extend_from_slice(value_bytes);
741
742 Ok(())
743 }
744
745 #[allow(clippy::cast_sign_loss)]
747 fn write_utf8_column(
748 buf: &mut Vec<u8>,
749 column: &ArrayRef,
750 num_rows: usize,
751 ) -> Result<(), OperatorError> {
752 let arr = column
753 .as_any()
754 .downcast_ref::<StringArray>()
755 .ok_or_else(|| OperatorError::SerializationFailed("Expected StringArray".into()))?;
756
757 let validity_bytes = num_rows.div_ceil(8);
759 if let Some(nulls) = arr.nulls() {
760 let buffer = nulls.buffer();
761 let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
762 buf.extend_from_slice(slice);
763 for _ in slice.len()..validity_bytes {
764 buf.push(0xFF);
765 }
766 } else {
767 buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
768 }
769
770 let offsets = arr.offsets();
773 for offset in offsets.iter() {
774 buf.extend_from_slice(&(*offset as u32).to_le_bytes());
775 }
776
777 let values = arr.values();
779 buf.extend_from_slice(values.as_slice());
780
781 Ok(())
782 }
783
784 fn deserialize_batch(data: &[u8], encoding: u8) -> Result<RecordBatch, OperatorError> {
786 if encoding == 1 && data.starts_with(&CPU_FRIENDLY_MAGIC) {
787 Self::deserialize_cpu_friendly(data)
788 } else {
789 Self::deserialize_compact(data)
790 }
791 }
792
793 fn deserialize_compact(data: &[u8]) -> Result<RecordBatch, OperatorError> {
794 Ok(crate::serialization::deserialize_batch_stream(data)?)
795 }
796
797 fn deserialize_cpu_friendly(data: &[u8]) -> Result<RecordBatch, OperatorError> {
799 if data.len() < 10 {
800 return Err(OperatorError::SerializationFailed(
801 "Buffer too short".into(),
802 ));
803 }
804
805 let num_cols = u16::from_le_bytes([data[4], data[5]]) as usize;
807 let num_rows = u32::from_le_bytes([data[6], data[7], data[8], data[9]]) as usize;
808
809 let mut offset = 10;
810 let mut fields = Vec::with_capacity(num_cols);
811 let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_cols);
812
813 for _ in 0..num_cols {
814 if offset + 2 > data.len() {
815 return Err(OperatorError::SerializationFailed(
816 "Truncated column header".into(),
817 ));
818 }
819
820 let name_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
822 offset += 2;
823
824 if offset + name_len > data.len() {
825 return Err(OperatorError::SerializationFailed(
826 "Truncated column name".into(),
827 ));
828 }
829 let name = String::from_utf8_lossy(&data[offset..offset + name_len]).to_string();
830 offset += name_len;
831
832 if offset + 2 > data.len() {
833 return Err(OperatorError::SerializationFailed(
834 "Truncated type info".into(),
835 ));
836 }
837
838 let nullable = data[offset] != 0;
840 offset += 1;
841 let type_tag = data[offset];
842 offset += 1;
843
844 let validity_bytes = num_rows.div_ceil(8);
846
847 match type_tag {
848 t if t == CpuFriendlyType::Int64 as u8 => {
849 let (arr, new_offset) =
850 Self::read_int64_column(data, offset, num_rows, validity_bytes)?;
851 offset = new_offset;
852 fields.push(Field::new(&name, DataType::Int64, nullable));
853 columns.push(Arc::new(arr));
854 }
855 t if t == CpuFriendlyType::Float64 as u8 => {
856 let (arr, new_offset) =
857 Self::read_float64_column(data, offset, num_rows, validity_bytes)?;
858 offset = new_offset;
859 fields.push(Field::new(&name, DataType::Float64, nullable));
860 columns.push(Arc::new(arr));
861 }
862 t if t == CpuFriendlyType::Utf8 as u8 => {
863 let (arr, new_offset) =
864 Self::read_utf8_column(data, offset, num_rows, validity_bytes)?;
865 offset = new_offset;
866 fields.push(Field::new(&name, DataType::Utf8, nullable));
867 columns.push(Arc::new(arr));
868 }
869 _ => {
870 fields.push(Field::new(&name, DataType::Int64, true));
872 columns.push(Arc::new(Int64Array::from(vec![None; num_rows])));
873 }
874 }
875 }
876
877 let schema = Arc::new(Schema::new(fields));
878 RecordBatch::try_new(schema, columns)
879 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
880 }
881
882 fn read_int64_column(
884 data: &[u8],
885 offset: usize,
886 num_rows: usize,
887 validity_bytes: usize,
888 ) -> Result<(Int64Array, usize), OperatorError> {
889 let mut pos = offset;
890
891 if pos + validity_bytes > data.len() {
893 return Err(OperatorError::SerializationFailed(
894 "Truncated validity".into(),
895 ));
896 }
897 pos += validity_bytes;
898
899 let values_bytes = num_rows * 8;
901 if pos + values_bytes > data.len() {
902 return Err(OperatorError::SerializationFailed(
903 "Truncated int64 values".into(),
904 ));
905 }
906
907 let mut values = Vec::with_capacity(num_rows);
908 for i in 0..num_rows {
909 let start = pos + i * 8;
910 let bytes = [
911 data[start],
912 data[start + 1],
913 data[start + 2],
914 data[start + 3],
915 data[start + 4],
916 data[start + 5],
917 data[start + 6],
918 data[start + 7],
919 ];
920 values.push(i64::from_le_bytes(bytes));
921 }
922 pos += values_bytes;
923
924 Ok((Int64Array::from(values), pos))
925 }
926
927 fn read_float64_column(
929 data: &[u8],
930 offset: usize,
931 num_rows: usize,
932 validity_bytes: usize,
933 ) -> Result<(Float64Array, usize), OperatorError> {
934 let mut pos = offset;
935
936 if pos + validity_bytes > data.len() {
938 return Err(OperatorError::SerializationFailed(
939 "Truncated validity".into(),
940 ));
941 }
942 pos += validity_bytes;
943
944 let values_bytes = num_rows * 8;
946 if pos + values_bytes > data.len() {
947 return Err(OperatorError::SerializationFailed(
948 "Truncated float64 values".into(),
949 ));
950 }
951
952 let mut values = Vec::with_capacity(num_rows);
953 for i in 0..num_rows {
954 let start = pos + i * 8;
955 let bytes = [
956 data[start],
957 data[start + 1],
958 data[start + 2],
959 data[start + 3],
960 data[start + 4],
961 data[start + 5],
962 data[start + 6],
963 data[start + 7],
964 ];
965 values.push(f64::from_le_bytes(bytes));
966 }
967 pos += values_bytes;
968
969 Ok((Float64Array::from(values), pos))
970 }
971
972 fn read_utf8_column(
974 data: &[u8],
975 offset: usize,
976 num_rows: usize,
977 validity_bytes: usize,
978 ) -> Result<(StringArray, usize), OperatorError> {
979 let mut pos = offset;
980
981 if pos + validity_bytes > data.len() {
983 return Err(OperatorError::SerializationFailed(
984 "Truncated validity".into(),
985 ));
986 }
987 pos += validity_bytes;
988
989 let offsets_bytes = (num_rows + 1) * 4;
991 if pos + offsets_bytes > data.len() {
992 return Err(OperatorError::SerializationFailed(
993 "Truncated offsets".into(),
994 ));
995 }
996
997 let mut offsets = Vec::with_capacity(num_rows + 1);
998 for i in 0..=num_rows {
999 let start = pos + i * 4;
1000 let bytes = [
1001 data[start],
1002 data[start + 1],
1003 data[start + 2],
1004 data[start + 3],
1005 ];
1006 offsets.push(u32::from_le_bytes(bytes) as usize);
1007 }
1008 pos += offsets_bytes;
1009
1010 let data_len = offsets.last().copied().unwrap_or(0);
1012 if pos + data_len > data.len() {
1013 return Err(OperatorError::SerializationFailed(
1014 "Truncated string data".into(),
1015 ));
1016 }
1017
1018 let string_data = &data[pos..pos + data_len];
1019 pos += data_len;
1020
1021 let mut strings = Vec::with_capacity(num_rows);
1023 for i in 0..num_rows {
1024 let start = offsets[i];
1025 let end = offsets[i + 1];
1026 let s = String::from_utf8_lossy(&string_data[start..end]).to_string();
1027 strings.push(s);
1028 }
1029
1030 Ok((StringArray::from(strings), pos))
1031 }
1032
1033 pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
1039 Self::deserialize_batch(&self.data, self.encoding)
1040 }
1041
1042 #[must_use]
1044 pub fn encoding(&self) -> JoinRowEncoding {
1045 if self.encoding == 1 {
1046 JoinRowEncoding::CpuFriendly
1047 } else {
1048 JoinRowEncoding::Compact
1049 }
1050 }
1051}
1052
1053#[derive(Debug, Clone, Default)]
1055pub struct JoinMetrics {
1056 pub left_events: u64,
1058 pub right_events: u64,
1060 pub matches: u64,
1062 pub unmatched_left: u64,
1064 pub unmatched_right: u64,
1066 pub late_events: u64,
1068 pub state_cleanups: u64,
1070
1071 pub cpu_friendly_encodes: u64,
1074 pub compact_encodes: u64,
1076 pub asymmetric_skips: u64,
1078 pub idle_key_cleanups: u64,
1080 pub build_side_prunes: u64,
1082 pub tracked_keys: u64,
1084}
1085
1086impl JoinMetrics {
1087 #[must_use]
1089 pub fn new() -> Self {
1090 Self::default()
1091 }
1092
1093 pub fn reset(&mut self) {
1095 *self = Self::default();
1096 }
1097}
1098
1099pub struct StreamJoinOperator {
1124 left_key_column: String,
1126 right_key_column: String,
1128 time_bound_ms: i64,
1130 join_type: JoinType,
1132 operator_id: String,
1134 metrics: JoinMetrics,
1136 output_schema: Option<SchemaRef>,
1138 left_schema: Option<SchemaRef>,
1140 right_schema: Option<SchemaRef>,
1142
1143 row_encoding: JoinRowEncoding,
1146 asymmetric_compaction: bool,
1148 idle_threshold_ms: i64,
1150 per_key_tracking: bool,
1152 key_idle_threshold_ms: i64,
1154 build_side_pruning: bool,
1156 build_side: Option<JoinSide>,
1158 left_stats: SideStats,
1160 right_stats: SideStats,
1162 key_metadata: FxHashMap<u64, KeyMetadata>,
1164 left_watermark: i64,
1166 right_watermark: i64,
1168 prune_buffer: Vec<Bytes>,
1170 left_key_index: Option<usize>,
1172 right_key_index: Option<usize>,
1174}
1175
1176impl StreamJoinOperator {
1177 #[must_use]
1186 #[allow(clippy::cast_possible_truncation)] pub fn new(
1188 left_key_column: String,
1189 right_key_column: String,
1190 time_bound: Duration,
1191 join_type: JoinType,
1192 ) -> Self {
1193 let operator_num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1194 Self {
1195 left_key_column,
1196 right_key_column,
1197 time_bound_ms: time_bound.as_millis() as i64,
1198 join_type,
1199 operator_id: format!("stream_join_{operator_num}"),
1200 metrics: JoinMetrics::new(),
1201 output_schema: None,
1202 left_schema: None,
1203 right_schema: None,
1204 row_encoding: JoinRowEncoding::Compact,
1206 asymmetric_compaction: true,
1207 idle_threshold_ms: 60_000,
1208 per_key_tracking: true,
1209 key_idle_threshold_ms: 300_000,
1210 build_side_pruning: true,
1211 build_side: None,
1212 left_stats: SideStats::new(),
1213 right_stats: SideStats::new(),
1214 key_metadata: FxHashMap::default(),
1215 left_watermark: i64::MIN,
1216 right_watermark: i64::MIN,
1217 prune_buffer: Vec::with_capacity(100),
1218 left_key_index: None,
1219 right_key_index: None,
1220 }
1221 }
1222
1223 #[must_use]
1225 #[allow(clippy::cast_possible_truncation)] pub fn with_id(
1227 left_key_column: String,
1228 right_key_column: String,
1229 time_bound: Duration,
1230 join_type: JoinType,
1231 operator_id: String,
1232 ) -> Self {
1233 Self {
1234 left_key_column,
1235 right_key_column,
1236 time_bound_ms: time_bound.as_millis() as i64,
1237 join_type,
1238 operator_id,
1239 metrics: JoinMetrics::new(),
1240 output_schema: None,
1241 left_schema: None,
1242 right_schema: None,
1243 row_encoding: JoinRowEncoding::Compact,
1245 asymmetric_compaction: true,
1246 idle_threshold_ms: 60_000,
1247 per_key_tracking: true,
1248 key_idle_threshold_ms: 300_000,
1249 build_side_pruning: true,
1250 build_side: None,
1251 left_stats: SideStats::new(),
1252 right_stats: SideStats::new(),
1253 key_metadata: FxHashMap::default(),
1254 left_watermark: i64::MIN,
1255 right_watermark: i64::MIN,
1256 prune_buffer: Vec::with_capacity(100),
1257 left_key_index: None,
1258 right_key_index: None,
1259 }
1260 }
1261
1262 #[must_use]
1287 pub fn from_config(config: StreamJoinConfig) -> Self {
1288 let operator_id = config.operator_id.unwrap_or_else(|| {
1289 let num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1290 format!("stream_join_{num}")
1291 });
1292
1293 Self {
1294 left_key_column: config.left_key_column,
1295 right_key_column: config.right_key_column,
1296 time_bound_ms: config.time_bound_ms,
1297 join_type: config.join_type,
1298 operator_id,
1299 metrics: JoinMetrics::new(),
1300 output_schema: None,
1301 left_schema: None,
1302 right_schema: None,
1303 row_encoding: config.row_encoding,
1304 asymmetric_compaction: config.asymmetric_compaction,
1305 idle_threshold_ms: config.idle_threshold_ms,
1306 per_key_tracking: config.per_key_tracking,
1307 key_idle_threshold_ms: config.key_idle_threshold_ms,
1308 build_side_pruning: config.build_side_pruning,
1309 build_side: config.build_side,
1310 left_stats: SideStats::new(),
1311 right_stats: SideStats::new(),
1312 key_metadata: FxHashMap::default(),
1313 left_watermark: i64::MIN,
1314 right_watermark: i64::MIN,
1315 prune_buffer: Vec::with_capacity(100),
1316 left_key_index: None,
1317 right_key_index: None,
1318 }
1319 }
1320
1321 #[must_use]
1323 pub fn join_type(&self) -> JoinType {
1324 self.join_type
1325 }
1326
1327 #[must_use]
1329 pub fn time_bound_ms(&self) -> i64 {
1330 self.time_bound_ms
1331 }
1332
1333 #[must_use]
1335 pub fn metrics(&self) -> &JoinMetrics {
1336 &self.metrics
1337 }
1338
1339 pub fn reset_metrics(&mut self) {
1341 self.metrics.reset();
1342 }
1343
1344 #[must_use]
1346 pub fn row_encoding(&self) -> JoinRowEncoding {
1347 self.row_encoding
1348 }
1349
1350 #[must_use]
1352 pub fn asymmetric_compaction_enabled(&self) -> bool {
1353 self.asymmetric_compaction
1354 }
1355
1356 #[must_use]
1358 pub fn per_key_tracking_enabled(&self) -> bool {
1359 self.per_key_tracking
1360 }
1361
1362 #[must_use]
1364 pub fn left_stats(&self) -> &SideStats {
1365 &self.left_stats
1366 }
1367
1368 #[must_use]
1370 pub fn right_stats(&self) -> &SideStats {
1371 &self.right_stats
1372 }
1373
1374 #[must_use]
1376 pub fn tracked_key_count(&self) -> usize {
1377 self.key_metadata.len()
1378 }
1379
1380 #[must_use]
1382 pub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool {
1383 match side {
1384 JoinSide::Left => self
1385 .left_stats
1386 .is_idle(current_time, self.idle_threshold_ms),
1387 JoinSide::Right => self
1388 .right_stats
1389 .is_idle(current_time, self.idle_threshold_ms),
1390 }
1391 }
1392
1393 #[must_use]
1395 pub fn effective_build_side(&self) -> JoinSide {
1396 if let Some(side) = self.build_side {
1398 return side;
1399 }
1400
1401 if self.left_stats.events_received < self.right_stats.events_received {
1403 JoinSide::Left
1404 } else {
1405 JoinSide::Right
1406 }
1407 }
1408
1409 pub fn process_side(
1414 &mut self,
1415 event: &Event,
1416 side: JoinSide,
1417 ctx: &mut OperatorContext,
1418 ) -> OutputVec {
1419 match side {
1420 JoinSide::Left => self.process_left(event, ctx),
1421 JoinSide::Right => self.process_right(event, ctx),
1422 }
1423 }
1424
1425 fn process_left(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1427 self.metrics.left_events += 1;
1428
1429 self.left_stats.record_event(ctx.processing_time);
1431
1432 if self.left_schema.is_none() {
1434 self.left_schema = Some(event.data.schema());
1435 self.update_output_schema();
1436 }
1437
1438 self.process_event(event, JoinSide::Left, ctx)
1439 }
1440
1441 fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1443 self.metrics.right_events += 1;
1444
1445 self.right_stats.record_event(ctx.processing_time);
1447
1448 if self.right_schema.is_none() {
1450 self.right_schema = Some(event.data.schema());
1451 self.update_output_schema();
1452 }
1453
1454 self.process_event(event, JoinSide::Right, ctx)
1455 }
1456
1457 fn update_output_schema(&mut self) {
1459 if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
1460 match self.join_type.kept_side() {
1462 Some(JoinSide::Left) => {
1463 self.output_schema = Some(Arc::clone(left));
1464 return;
1465 }
1466 Some(JoinSide::Right) => {
1467 self.output_schema = Some(Arc::clone(right));
1468 return;
1469 }
1470 None => {}
1471 }
1472
1473 let mut fields: Vec<Field> =
1474 Vec::with_capacity(left.fields().len() + right.fields().len());
1475 fields.extend(left.fields().iter().map(|f| f.as_ref().clone()));
1476
1477 for field in right.fields() {
1479 let name = if left.field_with_name(field.name()).is_ok() {
1480 format!("right_{}", field.name())
1481 } else {
1482 field.name().clone()
1483 };
1484 fields.push(Field::new(
1485 name,
1486 field.data_type().clone(),
1487 true, ));
1489 }
1490
1491 self.output_schema = Some(Arc::new(Schema::new(fields)));
1492 }
1493 }
1494
1495 fn process_event(
1497 &mut self,
1498 event: &Event,
1499 side: JoinSide,
1500 ctx: &mut OperatorContext,
1501 ) -> OutputVec {
1502 let mut output = OutputVec::new();
1503 let event_time = event.timestamp;
1504
1505 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
1507
1508 let current_wm = ctx.watermark_generator.current_watermark();
1510 match side {
1511 JoinSide::Left => self.left_watermark = self.left_watermark.max(current_wm),
1512 JoinSide::Right => self.right_watermark = self.right_watermark.max(current_wm),
1513 }
1514
1515 if current_wm > i64::MIN && event_time + self.time_bound_ms < current_wm {
1517 self.metrics.late_events += 1;
1518 output.push(Output::LateEvent(event.clone()));
1519 return output;
1520 }
1521
1522 let (key_column, cached_index) = match side {
1524 JoinSide::Left => (&self.left_key_column, &mut self.left_key_index),
1525 JoinSide::Right => (&self.right_key_column, &mut self.right_key_index),
1526 };
1527 let Some(key_value) = Self::extract_key(&event.data, key_column, cached_index) else {
1528 return output;
1530 };
1531
1532 let key_hash = {
1534 let mut hasher = rustc_hash::FxHasher::default();
1535 key_value.hash(&mut hasher);
1536 hasher.finish()
1537 };
1538
1539 if self.per_key_tracking {
1541 self.key_metadata
1542 .entry(key_hash)
1543 .and_modify(|meta| meta.record_event(ctx.processing_time))
1544 .or_insert_with(|| KeyMetadata::new(ctx.processing_time));
1545 self.metrics.tracked_keys = self.key_metadata.len() as u64;
1546 }
1547
1548 let join_row = match JoinRow::with_encoding(
1550 event_time,
1551 key_value.to_vec(),
1552 &event.data,
1553 self.row_encoding,
1554 ) {
1555 Ok(row) => {
1556 match self.row_encoding {
1558 JoinRowEncoding::Compact => self.metrics.compact_encodes += 1,
1559 JoinRowEncoding::CpuFriendly => self.metrics.cpu_friendly_encodes += 1,
1560 }
1561 row
1562 }
1563 Err(_) => return output,
1564 };
1565
1566 let state_key = Self::make_state_key(side, &key_value, event_time);
1568 if ctx.state.put_typed(&state_key, &join_row).is_err() {
1569 return output;
1570 }
1571
1572 let cleanup_time = event_time + self.time_bound_ms;
1574 let timer_key = Self::make_timer_key(side, &state_key);
1575 ctx.timers
1576 .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
1577
1578 if (side == JoinSide::Left && self.join_type.emits_unmatched_left())
1580 || (side == JoinSide::Right && self.join_type.emits_unmatched_right())
1581 {
1582 let unmatched_timer_key = Self::make_unmatched_timer_key(side, &state_key);
1583 ctx.timers.register_timer(
1584 cleanup_time,
1585 Some(unmatched_timer_key),
1586 Some(ctx.operator_index),
1587 );
1588 }
1589
1590 if self.build_side_pruning {
1592 self.prune_build_side(side, ctx);
1593 }
1594
1595 let matches = self.probe_opposite_side(side, &key_value, event_time, ctx.state);
1597
1598 self.process_matches(
1599 matches,
1600 side,
1601 event,
1602 event_time,
1603 &join_row,
1604 &state_key,
1605 ctx,
1606 &mut output,
1607 );
1608
1609 if let Some(wm) = emitted_watermark {
1611 output.push(Output::Watermark(wm.timestamp()));
1612 }
1613
1614 output
1615 }
1616
1617 #[allow(clippy::too_many_arguments)]
1619 fn process_matches(
1620 &mut self,
1621 matches: SmallVec<[([u8; 28], JoinRow); 4]>,
1622 side: JoinSide,
1623 event: &Event,
1624 event_time: i64,
1625 join_row: &JoinRow,
1626 state_key: &[u8],
1627 ctx: &mut OperatorContext,
1628 output: &mut OutputVec,
1629 ) {
1630 if self.join_type.is_semi() {
1631 let kept_side = self.join_type.kept_side().unwrap();
1633 for (other_row_key, other_row) in matches {
1634 self.metrics.matches += 1;
1635 let was_other_matched = is_matched(&*ctx.state, &other_row_key);
1636 mark_matched(&mut *ctx.state, &other_row_key);
1637
1638 let was_our_matched = is_matched(&*ctx.state, state_key);
1639 mark_matched(&mut *ctx.state, state_key);
1640
1641 if side == kept_side && !was_our_matched {
1642 output.push(Output::Event(Event::new(
1643 event_time,
1644 RecordBatch::clone(&event.data),
1645 )));
1646 break; } else if side != kept_side && !was_other_matched {
1648 if let Ok(batch) = other_row.to_batch() {
1649 output.push(Output::Event(Event::new(other_row.timestamp, batch)));
1650 }
1651 }
1652 }
1653 } else if self.join_type.is_anti() {
1654 for (other_row_key, _other_row) in matches {
1656 self.metrics.matches += 1;
1657 mark_matched(&mut *ctx.state, &other_row_key);
1658 mark_matched(&mut *ctx.state, state_key);
1659 }
1660 } else {
1661 let needs_matched_flag =
1663 self.join_type.emits_unmatched_left() || self.join_type.emits_unmatched_right();
1664 for (other_row_key, other_row) in matches {
1665 self.metrics.matches += 1;
1666
1667 if needs_matched_flag {
1669 mark_matched(&mut *ctx.state, &other_row_key);
1670 mark_matched(&mut *ctx.state, state_key);
1671 }
1672
1673 if let Some(joined_event) = self.create_joined_event(
1675 side,
1676 join_row,
1677 &other_row,
1678 std::cmp::max(event_time, other_row.timestamp),
1679 ) {
1680 output.push(Output::Event(joined_event));
1681 }
1682 }
1683 }
1684 }
1685
1686 fn prune_build_side(&mut self, current_side: JoinSide, ctx: &mut OperatorContext) {
1691 let build_side = self.effective_build_side();
1692
1693 if current_side == build_side {
1695 return;
1696 }
1697
1698 let probe_watermark = match build_side {
1700 JoinSide::Left => self.right_watermark,
1701 JoinSide::Right => self.left_watermark,
1702 };
1703
1704 if probe_watermark == i64::MIN {
1705 return;
1706 }
1707
1708 let prune_threshold = probe_watermark - self.time_bound_ms;
1710 if prune_threshold == i64::MIN {
1711 return;
1712 }
1713
1714 if self.join_type == JoinType::Inner {
1716 let prefix = match build_side {
1717 JoinSide::Left => LEFT_STATE_PREFIX,
1718 JoinSide::Right => RIGHT_STATE_PREFIX,
1719 };
1720
1721 self.prune_buffer.clear();
1723 let time_bound = self.time_bound_ms;
1724 for (key, value) in ctx.state.prefix_scan(prefix) {
1725 if self.prune_buffer.len() >= 100 {
1726 break; }
1728 if key.len() >= 20 {
1730 if let Ok(ts_bytes) = <[u8; 8]>::try_from(&key[12..20]) {
1731 let timestamp = i64::from_be_bytes(ts_bytes);
1732 if timestamp + time_bound < prune_threshold {
1733 if let Ok(row) =
1735 rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1736 .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1737 {
1738 if row.timestamp + time_bound < prune_threshold {
1739 self.prune_buffer.push(key);
1740 }
1741 }
1742 }
1743 }
1744 }
1745 }
1746
1747 for key in &self.prune_buffer {
1748 if ctx.state.delete(key).is_ok() {
1749 let _ = ctx.state.delete(&matched_flag_key(key));
1750 self.metrics.build_side_prunes += 1;
1751 }
1752 }
1753 }
1754 }
1755
1756 pub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext) {
1761 if !self.per_key_tracking {
1762 return;
1763 }
1764
1765 let threshold = ctx.processing_time - self.key_idle_threshold_ms;
1766
1767 let idle_keys: Vec<u64> = self
1769 .key_metadata
1770 .iter()
1771 .filter(|(_, meta)| meta.last_activity < threshold && meta.state_entries == 0)
1772 .map(|(k, _)| *k)
1773 .collect();
1774
1775 for key_hash in idle_keys {
1777 self.key_metadata.remove(&key_hash);
1778 self.metrics.idle_key_cleanups += 1;
1779 }
1780
1781 self.metrics.tracked_keys = self.key_metadata.len() as u64;
1782 }
1783
1784 #[must_use]
1786 pub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool {
1787 if !self.asymmetric_compaction {
1788 return false;
1789 }
1790
1791 let is_idle = match side {
1793 JoinSide::Left => self
1794 .left_stats
1795 .is_idle(current_time, self.idle_threshold_ms),
1796 JoinSide::Right => self
1797 .right_stats
1798 .is_idle(current_time, self.idle_threshold_ms),
1799 };
1800
1801 if is_idle {
1802 true
1804 } else {
1805 false
1806 }
1807 }
1808
1809 fn extract_key(
1813 batch: &RecordBatch,
1814 column_name: &str,
1815 cached_index: &mut Option<usize>,
1816 ) -> Option<SmallVec<[u8; 16]>> {
1817 let column_index = if let Some(idx) = *cached_index {
1818 idx
1819 } else {
1820 let idx = batch.schema().index_of(column_name).ok()?;
1821 *cached_index = Some(idx);
1822 idx
1823 };
1824 let column = batch.column(column_index);
1825
1826 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
1828 if string_array.is_empty() || string_array.is_null(0) {
1829 return None;
1830 }
1831 return Some(SmallVec::from_slice(string_array.value(0).as_bytes()));
1832 }
1833
1834 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
1835 if int_array.is_empty() || int_array.is_null(0) {
1836 return None;
1837 }
1838 return Some(SmallVec::from_slice(&int_array.value(0).to_le_bytes()));
1839 }
1840
1841 None
1844 }
1845
1846 #[allow(clippy::cast_sign_loss)]
1848 fn make_state_key(side: JoinSide, key_value: &[u8], timestamp: i64) -> [u8; 28] {
1849 let prefix = match side {
1850 JoinSide::Left => LEFT_STATE_PREFIX,
1851 JoinSide::Right => RIGHT_STATE_PREFIX,
1852 };
1853
1854 let event_id = EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
1855
1856 let mut key = [0u8; 28];
1858 key[..4].copy_from_slice(prefix);
1859
1860 let key_hash = {
1862 let mut hasher = rustc_hash::FxHasher::default();
1863 key_value.hash(&mut hasher);
1864 hasher.finish()
1865 };
1866 key[4..12].copy_from_slice(&key_hash.to_be_bytes());
1867 key[12..20].copy_from_slice(×tamp.to_be_bytes());
1868 key[20..28].copy_from_slice(&event_id.to_be_bytes());
1869
1870 key
1871 }
1872
1873 fn make_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1875 let prefix = match side {
1876 JoinSide::Left => LEFT_TIMER_PREFIX,
1877 JoinSide::Right => RIGHT_TIMER_PREFIX,
1878 };
1879
1880 let mut key = TimerKey::new();
1881 key.push(prefix);
1882 key.extend_from_slice(state_key);
1883 key
1884 }
1885
1886 fn make_unmatched_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1888 let side_byte = match side {
1889 JoinSide::Left => 0x01,
1890 JoinSide::Right => 0x02,
1891 };
1892
1893 let mut key = TimerKey::new();
1894 key.push(UNMATCHED_TIMER_PREFIX);
1895 key.push(side_byte);
1896 key.extend_from_slice(state_key);
1897 key
1898 }
1899
1900 fn probe_opposite_side(
1902 &self,
1903 current_side: JoinSide,
1904 key_value: &[u8],
1905 timestamp: i64,
1906 state: &dyn StateStore,
1907 ) -> SmallVec<[([u8; 28], JoinRow); 4]> {
1908 let mut matches = SmallVec::new();
1909
1910 let prefix = match current_side {
1911 JoinSide::Left => RIGHT_STATE_PREFIX,
1912 JoinSide::Right => LEFT_STATE_PREFIX,
1913 };
1914
1915 let key_hash = {
1917 let mut hasher = rustc_hash::FxHasher::default();
1918 key_value.hash(&mut hasher);
1919 hasher.finish()
1920 };
1921 let mut scan_prefix = [0u8; 12];
1922 scan_prefix[..4].copy_from_slice(prefix);
1923 scan_prefix[4..12].copy_from_slice(&key_hash.to_be_bytes());
1924
1925 for (state_key, value) in state.prefix_scan(&scan_prefix) {
1927 let Ok(row) = rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1929 .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1930 else {
1931 continue;
1932 };
1933
1934 let time_diff = (timestamp - row.timestamp).abs();
1936 if time_diff <= self.time_bound_ms {
1937 if row.key_value == key_value {
1939 let mut key_buf = [0u8; 28];
1940 if state_key.len() == 28 {
1941 key_buf.copy_from_slice(&state_key);
1942 }
1943 matches.push((key_buf, row));
1944 }
1945 }
1946 }
1947
1948 matches
1949 }
1950
1951 fn create_joined_event(
1953 &self,
1954 current_side: JoinSide,
1955 current_row: &JoinRow,
1956 other_row: &JoinRow,
1957 output_timestamp: i64,
1958 ) -> Option<Event> {
1959 let (left_row, right_row) = match current_side {
1960 JoinSide::Left => (current_row, other_row),
1961 JoinSide::Right => (other_row, current_row),
1962 };
1963
1964 let left_batch = left_row.to_batch().ok()?;
1965 let right_batch = right_row.to_batch().ok()?;
1966
1967 let joined_batch = self.concat_batches(&left_batch, &right_batch)?;
1968
1969 Some(Event::new(output_timestamp, joined_batch))
1970 }
1971
1972 fn concat_batches(&self, left: &RecordBatch, right: &RecordBatch) -> Option<RecordBatch> {
1974 let schema = self.output_schema.as_ref()?;
1975
1976 let mut columns: Vec<ArrayRef> =
1977 Vec::with_capacity(left.num_columns() + right.num_columns());
1978 columns.extend(left.columns().iter().cloned());
1979 columns.extend(right.columns().iter().cloned());
1980
1981 RecordBatch::try_new(Arc::clone(schema), columns).ok()
1982 }
1983
1984 fn create_unmatched_event(&self, side: JoinSide, row: &JoinRow) -> Option<Event> {
1986 let batch = row.to_batch().ok()?;
1987 let schema = self.output_schema.as_ref()?;
1988
1989 let num_rows = batch.num_rows();
1990 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1991
1992 match side {
1993 JoinSide::Left => {
1994 columns.extend(batch.columns().iter().cloned());
1996
1997 if let Some(right_schema) = &self.right_schema {
1999 for field in right_schema.fields() {
2000 columns.push(Self::create_null_array(field.data_type(), num_rows));
2001 }
2002 }
2003 }
2004 JoinSide::Right => {
2005 if let Some(left_schema) = &self.left_schema {
2007 for field in left_schema.fields() {
2008 columns.push(Self::create_null_array(field.data_type(), num_rows));
2009 }
2010 }
2011
2012 columns.extend(batch.columns().iter().cloned());
2013 }
2014 }
2015
2016 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
2017
2018 Some(Event::new(row.timestamp, joined_batch))
2019 }
2020
2021 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
2023 match data_type {
2024 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
2025 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
2027 }
2028 }
2029
2030 fn handle_cleanup_timer(
2032 &mut self,
2033 side: JoinSide,
2034 state_key: &[u8],
2035 ctx: &mut OperatorContext,
2036 ) -> OutputVec {
2037 let output = OutputVec::new();
2038
2039 if self.should_skip_compaction(side, ctx.processing_time) {
2041 self.metrics.asymmetric_skips += 1;
2042 }
2044
2045 if self.per_key_tracking && state_key.len() >= 12 {
2047 if let Ok(hash_bytes) = state_key[4..12].try_into() {
2049 let key_hash = u64::from_be_bytes(hash_bytes);
2050 if let Some(meta) = self.key_metadata.get_mut(&key_hash) {
2051 meta.decrement_entries();
2052 }
2053 }
2054 }
2055
2056 let defer_cleanup = match side {
2059 JoinSide::Left => self.join_type.emits_unmatched_left(),
2060 JoinSide::Right => self.join_type.emits_unmatched_right(),
2061 };
2062 if !defer_cleanup && ctx.state.delete(state_key).is_ok() {
2063 let _ = ctx.state.delete(&matched_flag_key(state_key));
2064 self.metrics.state_cleanups += 1;
2065 }
2066
2067 output
2068 }
2069
2070 fn handle_unmatched_timer(
2072 &mut self,
2073 side: JoinSide,
2074 state_key: &[u8],
2075 ctx: &mut OperatorContext,
2076 ) -> OutputVec {
2077 let mut output = OutputVec::new();
2078
2079 let Ok(Some(row)) = ctx.state.get_typed::<JoinRow>(state_key) else {
2081 return output;
2082 };
2083
2084 if !is_matched(&*ctx.state, state_key) {
2086 match side {
2087 JoinSide::Left if self.join_type.emits_unmatched_left() => {
2088 self.metrics.unmatched_left += 1;
2089 if self.join_type.is_anti() {
2090 if let Ok(batch) = row.to_batch() {
2092 output.push(Output::Event(Event::new(row.timestamp, batch)));
2093 }
2094 } else if let Some(event) = self.create_unmatched_event(side, &row) {
2095 output.push(Output::Event(event));
2096 }
2097 }
2098 JoinSide::Right if self.join_type.emits_unmatched_right() => {
2099 self.metrics.unmatched_right += 1;
2100 if self.join_type.is_anti() {
2101 if let Ok(batch) = row.to_batch() {
2103 output.push(Output::Event(Event::new(row.timestamp, batch)));
2104 }
2105 } else if let Some(event) = self.create_unmatched_event(side, &row) {
2106 output.push(Output::Event(event));
2107 }
2108 }
2109 _ => {}
2110 }
2111 }
2112
2113 if ctx.state.delete(state_key).is_ok() {
2115 let _ = ctx.state.delete(&matched_flag_key(state_key));
2116 self.metrics.state_cleanups += 1;
2117 }
2118
2119 output
2120 }
2121
2122 fn parse_timer_key(key: &[u8]) -> Option<(TimerKeyType, JoinSide, Vec<u8>)> {
2124 if key.is_empty() {
2125 return None;
2126 }
2127
2128 match key[0] {
2129 LEFT_TIMER_PREFIX => {
2130 let state_key = key[1..].to_vec();
2131 Some((TimerKeyType::Cleanup, JoinSide::Left, state_key))
2132 }
2133 RIGHT_TIMER_PREFIX => {
2134 let state_key = key[1..].to_vec();
2135 Some((TimerKeyType::Cleanup, JoinSide::Right, state_key))
2136 }
2137 UNMATCHED_TIMER_PREFIX => {
2138 if key.len() < 2 {
2139 return None;
2140 }
2141 let side = match key[1] {
2142 0x01 => JoinSide::Left,
2143 0x02 => JoinSide::Right,
2144 _ => return None,
2145 };
2146 let state_key = key[2..].to_vec();
2147 Some((TimerKeyType::Unmatched, side, state_key))
2148 }
2149 _ => None,
2150 }
2151 }
2152}
2153
2154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2156enum TimerKeyType {
2157 Cleanup,
2159 Unmatched,
2161}
2162
2163impl Operator for StreamJoinOperator {
2164 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
2165 self.process_left(event, ctx)
2167 }
2168
2169 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
2170 let Some((timer_type, side, state_key)) = Self::parse_timer_key(&timer.key) else {
2171 return OutputVec::new();
2172 };
2173
2174 match timer_type {
2175 TimerKeyType::Cleanup => self.handle_cleanup_timer(side, &state_key, ctx),
2176 TimerKeyType::Unmatched => self.handle_unmatched_timer(side, &state_key, ctx),
2177 }
2178 }
2179
2180 fn checkpoint(&self) -> OperatorState {
2181 let checkpoint_data = (
2185 (
2187 self.left_key_column.clone(),
2188 self.right_key_column.clone(),
2189 self.time_bound_ms,
2190 ),
2191 (
2193 self.metrics.left_events,
2194 self.metrics.right_events,
2195 self.metrics.matches,
2196 ),
2197 (
2199 self.metrics.cpu_friendly_encodes,
2200 self.metrics.compact_encodes,
2201 self.metrics.asymmetric_skips,
2202 self.metrics.idle_key_cleanups,
2203 self.metrics.build_side_prunes,
2204 ),
2205 (
2207 self.left_stats.events_received,
2208 self.right_stats.events_received,
2209 self.left_watermark,
2210 self.right_watermark,
2211 ),
2212 );
2213
2214 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
2215 .map(|v| v.to_vec())
2216 .unwrap_or_default();
2217
2218 OperatorState {
2219 operator_id: self.operator_id.clone(),
2220 data,
2221 }
2222 }
2223
2224 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
2225 type CheckpointData = (
2227 (String, String, i64), (u64, u64, u64), (u64, u64, u64, u64, u64), (u64, u64, i64, i64), );
2232 type LegacyCheckpointData = (String, String, i64, u64, u64, u64);
2234
2235 if state.operator_id != self.operator_id {
2236 return Err(OperatorError::StateAccessFailed(format!(
2237 "Operator ID mismatch: expected {}, got {}",
2238 self.operator_id, state.operator_id
2239 )));
2240 }
2241
2242 if let Ok(archived) = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
2244 {
2245 if let Ok(data) = rkyv::deserialize::<CheckpointData, RkyvError>(archived) {
2246 let (
2247 _config,
2248 (left_events, right_events, matches),
2249 (
2250 cpu_friendly_encodes,
2251 compact_encodes,
2252 asymmetric_skips,
2253 idle_key_cleanups,
2254 build_side_prunes,
2255 ),
2256 (left_received, right_received, left_wm, right_wm),
2257 ) = data;
2258
2259 self.metrics.left_events = left_events;
2260 self.metrics.right_events = right_events;
2261 self.metrics.matches = matches;
2262 self.metrics.cpu_friendly_encodes = cpu_friendly_encodes;
2263 self.metrics.compact_encodes = compact_encodes;
2264 self.metrics.asymmetric_skips = asymmetric_skips;
2265 self.metrics.idle_key_cleanups = idle_key_cleanups;
2266 self.metrics.build_side_prunes = build_side_prunes;
2267 self.left_stats.events_received = left_received;
2268 self.right_stats.events_received = right_received;
2269 self.left_watermark = left_wm;
2270 self.right_watermark = right_wm;
2271
2272 return Ok(());
2273 }
2274 }
2275
2276 let archived = rkyv::access::<rkyv::Archived<LegacyCheckpointData>, RkyvError>(&state.data)
2278 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2279 let (_, _, _, left_events, right_events, matches) =
2280 rkyv::deserialize::<LegacyCheckpointData, RkyvError>(archived)
2281 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2282
2283 self.metrics.left_events = left_events;
2284 self.metrics.right_events = right_events;
2285 self.metrics.matches = matches;
2286
2287 Ok(())
2288 }
2289}
2290
2291#[cfg(test)]
2292#[allow(clippy::cast_possible_wrap)]
2293mod tests {
2294 use super::*;
2295 use crate::state::InMemoryStore;
2296 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
2297 use arrow_array::{Int64Array, StringArray};
2298 use arrow_schema::{DataType, Field, Schema};
2299
2300 fn create_order_event(timestamp: i64, order_id: &str, amount: i64) -> Event {
2301 let schema = Arc::new(Schema::new(vec![
2302 Field::new("order_id", DataType::Utf8, false),
2303 Field::new("amount", DataType::Int64, false),
2304 ]));
2305 let batch = RecordBatch::try_new(
2306 schema,
2307 vec![
2308 Arc::new(StringArray::from(vec![order_id])),
2309 Arc::new(Int64Array::from(vec![amount])),
2310 ],
2311 )
2312 .unwrap();
2313 Event::new(timestamp, batch)
2314 }
2315
2316 fn create_payment_event(timestamp: i64, order_id: &str, status: &str) -> Event {
2317 let schema = Arc::new(Schema::new(vec![
2318 Field::new("order_id", DataType::Utf8, false),
2319 Field::new("status", DataType::Utf8, false),
2320 ]));
2321 let batch = RecordBatch::try_new(
2322 schema,
2323 vec![
2324 Arc::new(StringArray::from(vec![order_id])),
2325 Arc::new(StringArray::from(vec![status])),
2326 ],
2327 )
2328 .unwrap();
2329 Event::new(timestamp, batch)
2330 }
2331
2332 fn create_test_context<'a>(
2333 timers: &'a mut TimerService,
2334 state: &'a mut dyn StateStore,
2335 watermark_gen: &'a mut dyn WatermarkGenerator,
2336 ) -> OperatorContext<'a> {
2337 OperatorContext {
2338 event_time: 0,
2339 processing_time: 0,
2340 timers,
2341 state,
2342 watermark_generator: watermark_gen,
2343 operator_index: 0,
2344 }
2345 }
2346
2347 #[test]
2348 fn test_join_type_properties() {
2349 assert!(!JoinType::Inner.emits_unmatched_left());
2350 assert!(!JoinType::Inner.emits_unmatched_right());
2351
2352 assert!(JoinType::Left.emits_unmatched_left());
2353 assert!(!JoinType::Left.emits_unmatched_right());
2354
2355 assert!(!JoinType::Right.emits_unmatched_left());
2356 assert!(JoinType::Right.emits_unmatched_right());
2357
2358 assert!(JoinType::Full.emits_unmatched_left());
2359 assert!(JoinType::Full.emits_unmatched_right());
2360
2361 assert!(!JoinType::LeftSemi.emits_unmatched_left());
2363 assert!(!JoinType::LeftSemi.emits_unmatched_right());
2364 assert!(!JoinType::RightSemi.emits_unmatched_left());
2365 assert!(!JoinType::RightSemi.emits_unmatched_right());
2366
2367 assert!(JoinType::LeftAnti.emits_unmatched_left());
2369 assert!(!JoinType::LeftAnti.emits_unmatched_right());
2370 assert!(!JoinType::RightAnti.emits_unmatched_left());
2371 assert!(JoinType::RightAnti.emits_unmatched_right());
2372
2373 assert!(JoinType::LeftSemi.is_semi());
2375 assert!(JoinType::RightSemi.is_semi());
2376 assert!(!JoinType::LeftAnti.is_semi());
2377 assert!(JoinType::LeftAnti.is_anti());
2378 assert!(JoinType::RightAnti.is_anti());
2379 assert!(!JoinType::Inner.is_semi());
2380 assert!(!JoinType::Inner.is_anti());
2381
2382 assert_eq!(JoinType::LeftSemi.kept_side(), Some(JoinSide::Left));
2384 assert_eq!(JoinType::LeftAnti.kept_side(), Some(JoinSide::Left));
2385 assert_eq!(JoinType::RightSemi.kept_side(), Some(JoinSide::Right));
2386 assert_eq!(JoinType::RightAnti.kept_side(), Some(JoinSide::Right));
2387 assert_eq!(JoinType::Inner.kept_side(), None);
2388 }
2389
2390 #[test]
2391 fn test_left_semi_join_only_emits_matched() {
2392 let mut operator = StreamJoinOperator::with_id(
2393 "order_id".to_string(),
2394 "order_id".to_string(),
2395 Duration::from_secs(3600),
2396 JoinType::LeftSemi,
2397 "test_semi".to_string(),
2398 );
2399
2400 let mut timers = TimerService::new();
2401 let mut state = InMemoryStore::new();
2402 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2403
2404 let order = create_order_event(1000, "order_1", 100);
2406 {
2407 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2408 let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2409 let events: Vec<_> = outputs
2410 .iter()
2411 .filter(|o| matches!(o, Output::Event(_)))
2412 .collect();
2413 assert_eq!(events.len(), 0, "semi join should not emit without match");
2414 }
2415
2416 let payment = create_payment_event(2000, "order_1", "paid");
2418 let outputs = {
2419 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2420 operator.process_side(&payment, JoinSide::Right, &mut ctx)
2421 };
2422 let events: Vec<_> = outputs
2423 .iter()
2424 .filter_map(|o| match o {
2425 Output::Event(e) => Some(e),
2426 _ => None,
2427 })
2428 .collect();
2429 assert_eq!(events.len(), 1, "semi join should emit left row on match");
2430 assert_eq!(events[0].data.num_columns(), 2);
2432 assert_eq!(events[0].data.schema().field(0).name(), "order_id");
2433 assert_eq!(events[0].data.schema().field(1).name(), "amount");
2434
2435 let payment2 = create_payment_event(2500, "order_1", "refunded");
2437 {
2438 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2439 let outputs = operator.process_side(&payment2, JoinSide::Right, &mut ctx);
2440 let events: Vec<_> = outputs
2441 .iter()
2442 .filter(|o| matches!(o, Output::Event(_)))
2443 .collect();
2444 assert_eq!(events.len(), 0, "semi join should not emit duplicate");
2445 }
2446 }
2447
2448 #[test]
2449 fn test_left_anti_join_only_emits_unmatched() {
2450 let mut operator = StreamJoinOperator::with_id(
2451 "order_id".to_string(),
2452 "order_id".to_string(),
2453 Duration::from_secs(3600),
2454 JoinType::LeftAnti,
2455 "test_anti".to_string(),
2456 );
2457
2458 let mut timers = TimerService::new();
2459 let mut state = InMemoryStore::new();
2460 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2461
2462 let matched_order = create_order_event(1000, "order_1", 100);
2464 {
2465 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2466 let outputs = operator.process_side(&matched_order, JoinSide::Left, &mut ctx);
2467 let events: Vec<_> = outputs
2468 .iter()
2469 .filter(|o| matches!(o, Output::Event(_)))
2470 .collect();
2471 assert_eq!(events.len(), 0, "anti join should not emit on insert");
2472 }
2473
2474 let unmatched_order = create_order_event(1100, "order_2", 200);
2476 {
2477 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2478 let outputs = operator.process_side(&unmatched_order, JoinSide::Left, &mut ctx);
2479 let events: Vec<_> = outputs
2480 .iter()
2481 .filter(|o| matches!(o, Output::Event(_)))
2482 .collect();
2483 assert_eq!(events.len(), 0, "anti join should not emit on insert");
2484 }
2485
2486 let payment = create_payment_event(2000, "order_1", "paid");
2488 {
2489 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2490 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2491 let events: Vec<_> = outputs
2492 .iter()
2493 .filter(|o| matches!(o, Output::Event(_)))
2494 .collect();
2495 assert_eq!(events.len(), 0, "anti join should never emit on match");
2496 }
2497
2498 let pending_timers = timers.poll_timers(i64::MAX);
2500 let mut anti_outputs = 0;
2501 for timer_reg in pending_timers {
2502 let timer = Timer {
2503 key: timer_reg.key.unwrap_or_default(),
2504 timestamp: timer_reg.timestamp,
2505 };
2506 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2507 let outputs = operator.on_timer(timer, &mut ctx);
2508 for out in &outputs {
2509 if let Output::Event(e) = out {
2510 anti_outputs += 1;
2511 assert_eq!(e.data.num_columns(), 2);
2513 assert_eq!(e.data.schema().field(0).name(), "order_id");
2514 }
2515 }
2516 }
2517 assert!(
2518 anti_outputs >= 1,
2519 "anti join should emit unmatched left rows on timer"
2520 );
2521 }
2522
2523 #[test]
2524 fn test_join_operator_creation() {
2525 let operator = StreamJoinOperator::new(
2526 "order_id".to_string(),
2527 "order_id".to_string(),
2528 Duration::from_secs(3600),
2529 JoinType::Inner,
2530 );
2531
2532 assert_eq!(operator.join_type(), JoinType::Inner);
2533 assert_eq!(operator.time_bound_ms(), 3_600_000);
2534 }
2535
2536 #[test]
2537 fn test_join_operator_with_id() {
2538 let operator = StreamJoinOperator::with_id(
2539 "order_id".to_string(),
2540 "order_id".to_string(),
2541 Duration::from_secs(3600),
2542 JoinType::Left,
2543 "test_join".to_string(),
2544 );
2545
2546 assert_eq!(operator.operator_id, "test_join");
2547 assert_eq!(operator.join_type(), JoinType::Left);
2548 }
2549
2550 #[test]
2551 fn test_inner_join_basic() {
2552 let mut operator = StreamJoinOperator::with_id(
2553 "order_id".to_string(),
2554 "order_id".to_string(),
2555 Duration::from_secs(3600),
2556 JoinType::Inner,
2557 "test_join".to_string(),
2558 );
2559
2560 let mut timers = TimerService::new();
2561 let mut state = InMemoryStore::new();
2562 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2563
2564 let order = create_order_event(1000, "order_1", 100);
2566 {
2567 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2568 let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2569 assert!(
2571 outputs
2572 .iter()
2573 .filter(|o| matches!(o, Output::Event(_)))
2574 .count()
2575 == 0
2576 );
2577 }
2578
2579 let payment = create_payment_event(2000, "order_1", "paid");
2581 {
2582 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2583 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2584 assert_eq!(
2586 outputs
2587 .iter()
2588 .filter(|o| matches!(o, Output::Event(_)))
2589 .count(),
2590 1
2591 );
2592 }
2593
2594 assert_eq!(operator.metrics().matches, 1);
2595 assert_eq!(operator.metrics().left_events, 1);
2596 assert_eq!(operator.metrics().right_events, 1);
2597 }
2598
2599 #[test]
2600 fn test_inner_join_no_match_different_key() {
2601 let mut operator = StreamJoinOperator::with_id(
2602 "order_id".to_string(),
2603 "order_id".to_string(),
2604 Duration::from_secs(3600),
2605 JoinType::Inner,
2606 "test_join".to_string(),
2607 );
2608
2609 let mut timers = TimerService::new();
2610 let mut state = InMemoryStore::new();
2611 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2612
2613 let order = create_order_event(1000, "order_1", 100);
2615 {
2616 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2617 operator.process_side(&order, JoinSide::Left, &mut ctx);
2618 }
2619
2620 let payment = create_payment_event(2000, "order_2", "paid");
2622 {
2623 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2624 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2625 assert_eq!(
2627 outputs
2628 .iter()
2629 .filter(|o| matches!(o, Output::Event(_)))
2630 .count(),
2631 0
2632 );
2633 }
2634
2635 assert_eq!(operator.metrics().matches, 0);
2636 }
2637
2638 #[test]
2639 fn test_inner_join_no_match_outside_time_bound() {
2640 let mut operator = StreamJoinOperator::with_id(
2641 "order_id".to_string(),
2642 "order_id".to_string(),
2643 Duration::from_secs(1), JoinType::Inner,
2645 "test_join".to_string(),
2646 );
2647
2648 let mut timers = TimerService::new();
2649 let mut state = InMemoryStore::new();
2650 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2651
2652 let order = create_order_event(1000, "order_1", 100);
2654 {
2655 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2656 operator.process_side(&order, JoinSide::Left, &mut ctx);
2657 }
2658
2659 let payment = create_payment_event(5000, "order_1", "paid");
2661 {
2662 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2663 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2664 assert_eq!(
2666 outputs
2667 .iter()
2668 .filter(|o| matches!(o, Output::Event(_)))
2669 .count(),
2670 0
2671 );
2672 }
2673
2674 assert_eq!(operator.metrics().matches, 0);
2675 }
2676
2677 #[test]
2678 fn test_join_multiple_matches() {
2679 let mut operator = StreamJoinOperator::with_id(
2680 "order_id".to_string(),
2681 "order_id".to_string(),
2682 Duration::from_secs(3600),
2683 JoinType::Inner,
2684 "test_join".to_string(),
2685 );
2686
2687 let mut timers = TimerService::new();
2688 let mut state = InMemoryStore::new();
2689 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2690
2691 for ts in [1000, 2000] {
2693 let order = create_order_event(ts, "order_1", 100);
2694 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2695 operator.process_side(&order, JoinSide::Left, &mut ctx);
2696 }
2697
2698 let payment = create_payment_event(1500, "order_1", "paid");
2700 {
2701 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2702 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2703 assert_eq!(
2705 outputs
2706 .iter()
2707 .filter(|o| matches!(o, Output::Event(_)))
2708 .count(),
2709 2
2710 );
2711 }
2712
2713 assert_eq!(operator.metrics().matches, 2);
2714 }
2715
2716 #[test]
2717 fn test_join_late_event() {
2718 let mut operator = StreamJoinOperator::with_id(
2719 "order_id".to_string(),
2720 "order_id".to_string(),
2721 Duration::from_secs(1),
2722 JoinType::Inner,
2723 "test_join".to_string(),
2724 );
2725
2726 let mut timers = TimerService::new();
2727 let mut state = InMemoryStore::new();
2728 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2729
2730 let future_order = create_order_event(10000, "order_2", 200);
2732 {
2733 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2734 operator.process_side(&future_order, JoinSide::Left, &mut ctx);
2735 }
2736
2737 let late_payment = create_payment_event(100, "order_1", "paid");
2739 {
2740 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2741 let outputs = operator.process_side(&late_payment, JoinSide::Right, &mut ctx);
2742 assert!(outputs.iter().any(|o| matches!(o, Output::LateEvent(_))));
2744 }
2745
2746 assert_eq!(operator.metrics().late_events, 1);
2747 }
2748
2749 #[test]
2750 fn test_join_row_serialization() {
2751 let schema = Arc::new(Schema::new(vec![
2752 Field::new("id", DataType::Utf8, false),
2753 Field::new("value", DataType::Int64, false),
2754 ]));
2755 let batch = RecordBatch::try_new(
2756 schema,
2757 vec![
2758 Arc::new(StringArray::from(vec!["test"])),
2759 Arc::new(Int64Array::from(vec![42])),
2760 ],
2761 )
2762 .unwrap();
2763
2764 let row = JoinRow::new(1000, b"key".to_vec(), &batch).unwrap();
2765
2766 let restored_batch = row.to_batch().unwrap();
2768 assert_eq!(restored_batch.num_rows(), 1);
2769 assert_eq!(restored_batch.num_columns(), 2);
2770 }
2771
2772 #[test]
2773 fn test_cleanup_timer() {
2774 let mut operator = StreamJoinOperator::with_id(
2775 "order_id".to_string(),
2776 "order_id".to_string(),
2777 Duration::from_secs(1),
2778 JoinType::Inner,
2779 "test_join".to_string(),
2780 );
2781
2782 let mut timers = TimerService::new();
2783 let mut state = InMemoryStore::new();
2784 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2785
2786 let order = create_order_event(1000, "order_1", 100);
2788 {
2789 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2790 operator.process_side(&order, JoinSide::Left, &mut ctx);
2791 }
2792
2793 assert!(state.len() > 0);
2795 let initial_state_len = state.len();
2796
2797 let registered_timers = timers.poll_timers(2001); assert!(!registered_timers.is_empty());
2800
2801 for timer_reg in registered_timers {
2803 let timer = Timer {
2804 key: timer_reg.key.unwrap_or_default(),
2805 timestamp: timer_reg.timestamp,
2806 };
2807 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2808 operator.on_timer(timer, &mut ctx);
2809 }
2810
2811 assert!(state.len() < initial_state_len || operator.metrics().state_cleanups > 0);
2813 }
2814
2815 #[test]
2816 fn test_checkpoint_restore() {
2817 let mut operator = StreamJoinOperator::with_id(
2818 "order_id".to_string(),
2819 "order_id".to_string(),
2820 Duration::from_secs(3600),
2821 JoinType::Inner,
2822 "test_join".to_string(),
2823 );
2824
2825 operator.metrics.left_events = 10;
2827 operator.metrics.right_events = 5;
2828 operator.metrics.matches = 3;
2829
2830 let checkpoint = operator.checkpoint();
2832
2833 let mut restored = StreamJoinOperator::with_id(
2835 "order_id".to_string(),
2836 "order_id".to_string(),
2837 Duration::from_secs(3600),
2838 JoinType::Inner,
2839 "test_join".to_string(),
2840 );
2841
2842 restored.restore(checkpoint).unwrap();
2843
2844 assert_eq!(restored.metrics().left_events, 10);
2845 assert_eq!(restored.metrics().right_events, 5);
2846 assert_eq!(restored.metrics().matches, 3);
2847 }
2848
2849 #[test]
2850 fn test_metrics_reset() {
2851 let mut operator = StreamJoinOperator::new(
2852 "order_id".to_string(),
2853 "order_id".to_string(),
2854 Duration::from_secs(3600),
2855 JoinType::Inner,
2856 );
2857
2858 operator.metrics.left_events = 10;
2859 operator.metrics.matches = 5;
2860
2861 operator.reset_metrics();
2862
2863 assert_eq!(operator.metrics().left_events, 0);
2864 assert_eq!(operator.metrics().matches, 0);
2865 }
2866
2867 #[test]
2868 fn test_bidirectional_join() {
2869 let mut operator = StreamJoinOperator::with_id(
2870 "order_id".to_string(),
2871 "order_id".to_string(),
2872 Duration::from_secs(3600),
2873 JoinType::Inner,
2874 "test_join".to_string(),
2875 );
2876
2877 let mut timers = TimerService::new();
2878 let mut state = InMemoryStore::new();
2879 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2880
2881 let payment = create_payment_event(1000, "order_1", "paid");
2883 {
2884 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2885 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2886 assert_eq!(
2887 outputs
2888 .iter()
2889 .filter(|o| matches!(o, Output::Event(_)))
2890 .count(),
2891 0
2892 );
2893 }
2894
2895 let order = create_order_event(1500, "order_1", 100);
2897 {
2898 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2899 let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2900 assert_eq!(
2901 outputs
2902 .iter()
2903 .filter(|o| matches!(o, Output::Event(_)))
2904 .count(),
2905 1
2906 );
2907 }
2908
2909 assert_eq!(operator.metrics().matches, 1);
2910 }
2911
2912 #[test]
2913 fn test_integer_key_join() {
2914 fn create_int_key_event(timestamp: i64, key: i64, value: i64) -> Event {
2915 let schema = Arc::new(Schema::new(vec![
2916 Field::new("key", DataType::Int64, false),
2917 Field::new("value", DataType::Int64, false),
2918 ]));
2919 let batch = RecordBatch::try_new(
2920 schema,
2921 vec![
2922 Arc::new(Int64Array::from(vec![key])),
2923 Arc::new(Int64Array::from(vec![value])),
2924 ],
2925 )
2926 .unwrap();
2927 Event::new(timestamp, batch)
2928 }
2929
2930 let mut operator = StreamJoinOperator::with_id(
2931 "key".to_string(),
2932 "key".to_string(),
2933 Duration::from_secs(3600),
2934 JoinType::Inner,
2935 "test_join".to_string(),
2936 );
2937
2938 let mut timers = TimerService::new();
2939 let mut state = InMemoryStore::new();
2940 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2941
2942 let left = create_int_key_event(1000, 42, 100);
2944 {
2945 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2946 operator.process_side(&left, JoinSide::Left, &mut ctx);
2947 }
2948
2949 let right = create_int_key_event(1500, 42, 200);
2951 {
2952 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2953 let outputs = operator.process_side(&right, JoinSide::Right, &mut ctx);
2954 assert_eq!(
2955 outputs
2956 .iter()
2957 .filter(|o| matches!(o, Output::Event(_)))
2958 .count(),
2959 1
2960 );
2961 }
2962
2963 assert_eq!(operator.metrics().matches, 1);
2964 }
2965
2966 #[test]
2969 fn test_f057_join_row_encoding_enum() {
2970 assert_eq!(JoinRowEncoding::default(), JoinRowEncoding::Compact);
2971 assert_eq!(format!("{}", JoinRowEncoding::Compact), "compact");
2972 assert_eq!(format!("{}", JoinRowEncoding::CpuFriendly), "cpu_friendly");
2973
2974 assert_eq!(
2975 "compact".parse::<JoinRowEncoding>().unwrap(),
2976 JoinRowEncoding::Compact
2977 );
2978 assert_eq!(
2979 "cpu_friendly".parse::<JoinRowEncoding>().unwrap(),
2980 JoinRowEncoding::CpuFriendly
2981 );
2982 assert_eq!(
2983 "cpu-friendly".parse::<JoinRowEncoding>().unwrap(),
2984 JoinRowEncoding::CpuFriendly
2985 );
2986 assert!("invalid".parse::<JoinRowEncoding>().is_err());
2987 }
2988
2989 #[test]
2990 fn test_f057_config_builder() {
2991 let config = StreamJoinConfig::builder()
2992 .left_key_column("order_id")
2993 .right_key_column("payment_id")
2994 .time_bound(Duration::from_secs(3600))
2995 .join_type(JoinType::Left)
2996 .operator_id("test_join")
2997 .row_encoding(JoinRowEncoding::CpuFriendly)
2998 .asymmetric_compaction(true)
2999 .idle_threshold(Duration::from_secs(120))
3000 .per_key_tracking(true)
3001 .key_idle_threshold(Duration::from_secs(600))
3002 .build_side_pruning(true)
3003 .build_side(JoinSide::Left)
3004 .build()
3005 .unwrap();
3006
3007 assert_eq!(config.left_key_column, "order_id");
3008 assert_eq!(config.right_key_column, "payment_id");
3009 assert_eq!(config.time_bound_ms, 3_600_000);
3010 assert_eq!(config.join_type, JoinType::Left);
3011 assert_eq!(config.operator_id, Some("test_join".to_string()));
3012 assert_eq!(config.row_encoding, JoinRowEncoding::CpuFriendly);
3013 assert!(config.asymmetric_compaction);
3014 assert_eq!(config.idle_threshold_ms, 120_000);
3015 assert!(config.per_key_tracking);
3016 assert_eq!(config.key_idle_threshold_ms, 600_000);
3017 assert!(config.build_side_pruning);
3018 assert_eq!(config.build_side, Some(JoinSide::Left));
3019 }
3020
3021 #[test]
3022 fn test_f057_from_config() {
3023 let config = StreamJoinConfig::builder()
3024 .left_key_column("key")
3025 .right_key_column("key")
3026 .time_bound(Duration::from_secs(60))
3027 .join_type(JoinType::Inner)
3028 .row_encoding(JoinRowEncoding::CpuFriendly)
3029 .build()
3030 .unwrap();
3031
3032 let operator = StreamJoinOperator::from_config(config);
3033
3034 assert_eq!(operator.row_encoding(), JoinRowEncoding::CpuFriendly);
3035 assert!(operator.asymmetric_compaction_enabled());
3036 assert!(operator.per_key_tracking_enabled());
3037 }
3038
3039 #[test]
3040 fn test_f057_cpu_friendly_encoding_roundtrip() {
3041 let schema = Arc::new(Schema::new(vec![
3042 Field::new("id", DataType::Utf8, false),
3043 Field::new("value", DataType::Int64, false),
3044 Field::new("price", DataType::Float64, false),
3045 ]));
3046 let batch = RecordBatch::try_new(
3047 schema,
3048 vec![
3049 Arc::new(StringArray::from(vec!["test_key"])),
3050 Arc::new(Int64Array::from(vec![42])),
3051 Arc::new(Float64Array::from(vec![99.99])),
3052 ],
3053 )
3054 .unwrap();
3055
3056 let row =
3058 JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
3059 .unwrap();
3060 assert_eq!(row.encoding(), JoinRowEncoding::CpuFriendly);
3061
3062 let restored = row.to_batch().unwrap();
3064 assert_eq!(restored.num_rows(), 1);
3065 assert_eq!(restored.num_columns(), 3);
3066
3067 let id_col = restored
3069 .column(0)
3070 .as_any()
3071 .downcast_ref::<StringArray>()
3072 .unwrap();
3073 assert_eq!(id_col.value(0), "test_key");
3074
3075 let value_col = restored
3076 .column(1)
3077 .as_any()
3078 .downcast_ref::<Int64Array>()
3079 .unwrap();
3080 assert_eq!(value_col.value(0), 42);
3081
3082 let price_col = restored
3083 .column(2)
3084 .as_any()
3085 .downcast_ref::<Float64Array>()
3086 .unwrap();
3087 assert!((price_col.value(0) - 99.99).abs() < 0.001);
3088 }
3089
3090 #[test]
3091 fn test_f057_compact_encoding_still_works() {
3092 let schema = Arc::new(Schema::new(vec![
3093 Field::new("id", DataType::Utf8, false),
3094 Field::new("value", DataType::Int64, false),
3095 ]));
3096 let batch = RecordBatch::try_new(
3097 schema,
3098 vec![
3099 Arc::new(StringArray::from(vec!["test"])),
3100 Arc::new(Int64Array::from(vec![100])),
3101 ],
3102 )
3103 .unwrap();
3104
3105 let row = JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::Compact)
3106 .unwrap();
3107 assert_eq!(row.encoding(), JoinRowEncoding::Compact);
3108
3109 let restored = row.to_batch().unwrap();
3110 assert_eq!(restored.num_rows(), 1);
3111 }
3112
3113 #[test]
3114 fn test_f057_side_stats_tracking() {
3115 let mut stats = SideStats::new();
3116 assert_eq!(stats.events_received, 0);
3117 assert!(!stats.is_idle(1000, 60_000)); stats.record_event(1000);
3121 assert_eq!(stats.events_received, 1);
3122 assert_eq!(stats.last_event_time, 1000);
3123
3124 stats.record_event(2000);
3125 assert_eq!(stats.events_received, 2);
3126 assert_eq!(stats.last_event_time, 2000);
3127
3128 assert!(!stats.is_idle(2000, 60_000)); assert!(!stats.is_idle(50_000, 60_000)); stats.events_this_window = 0;
3134 assert!(stats.is_idle(100_000, 60_000)); }
3136
3137 #[test]
3138 fn test_f057_key_metadata_tracking() {
3139 let mut meta = KeyMetadata::new(1000);
3140 assert_eq!(meta.last_activity, 1000);
3141 assert_eq!(meta.event_count, 1);
3142 assert_eq!(meta.state_entries, 1);
3143
3144 meta.record_event(2000);
3145 assert_eq!(meta.last_activity, 2000);
3146 assert_eq!(meta.event_count, 2);
3147 assert_eq!(meta.state_entries, 2);
3148
3149 meta.decrement_entries();
3150 assert_eq!(meta.state_entries, 1);
3151
3152 assert!(!meta.is_idle(2000, 60_000));
3153 assert!(meta.is_idle(100_000, 60_000));
3154 }
3155
3156 #[test]
3157 fn test_f057_per_key_tracking_in_operator() {
3158 let config = StreamJoinConfig::builder()
3159 .left_key_column("order_id")
3160 .right_key_column("order_id")
3161 .time_bound(Duration::from_secs(3600))
3162 .join_type(JoinType::Inner)
3163 .per_key_tracking(true)
3164 .build()
3165 .unwrap();
3166
3167 let mut operator = StreamJoinOperator::from_config(config);
3168 let mut timers = TimerService::new();
3169 let mut state = InMemoryStore::new();
3170 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3171
3172 for (i, key) in ["order_1", "order_2", "order_3"].iter().enumerate() {
3174 let event = create_order_event(1000 + i as i64 * 100, key, 100);
3175 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3176 operator.process_side(&event, JoinSide::Left, &mut ctx);
3177 }
3178
3179 assert_eq!(operator.tracked_key_count(), 3);
3181 assert_eq!(operator.metrics().tracked_keys, 3);
3182 }
3183
3184 #[test]
3185 fn test_f057_encoding_metrics() {
3186 let mut compact_op = StreamJoinOperator::from_config(
3188 StreamJoinConfig::builder()
3189 .left_key_column("order_id")
3190 .right_key_column("order_id")
3191 .time_bound(Duration::from_secs(3600))
3192 .join_type(JoinType::Inner)
3193 .row_encoding(JoinRowEncoding::Compact)
3194 .build()
3195 .unwrap(),
3196 );
3197
3198 let mut timers = TimerService::new();
3199 let mut state = InMemoryStore::new();
3200 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3201
3202 let event = create_order_event(1000, "order_1", 100);
3203 {
3204 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3205 compact_op.process_side(&event, JoinSide::Left, &mut ctx);
3206 }
3207 assert_eq!(compact_op.metrics().compact_encodes, 1);
3208 assert_eq!(compact_op.metrics().cpu_friendly_encodes, 0);
3209
3210 let mut cpu_op = StreamJoinOperator::from_config(
3212 StreamJoinConfig::builder()
3213 .left_key_column("order_id")
3214 .right_key_column("order_id")
3215 .time_bound(Duration::from_secs(3600))
3216 .join_type(JoinType::Inner)
3217 .row_encoding(JoinRowEncoding::CpuFriendly)
3218 .build()
3219 .unwrap(),
3220 );
3221
3222 let mut state2 = InMemoryStore::new();
3223 {
3224 let mut ctx = create_test_context(&mut timers, &mut state2, &mut watermark_gen);
3225 cpu_op.process_side(&event, JoinSide::Left, &mut ctx);
3226 }
3227 assert_eq!(cpu_op.metrics().cpu_friendly_encodes, 1);
3228 assert_eq!(cpu_op.metrics().compact_encodes, 0);
3229 }
3230
3231 #[test]
3232 fn test_f057_asymmetric_compaction_detection() {
3233 let config = StreamJoinConfig::builder()
3234 .left_key_column("order_id")
3235 .right_key_column("order_id")
3236 .time_bound(Duration::from_secs(60))
3237 .join_type(JoinType::Inner)
3238 .asymmetric_compaction(true)
3239 .idle_threshold(Duration::from_secs(10)) .build()
3241 .unwrap();
3242
3243 let mut operator = StreamJoinOperator::from_config(config);
3244 let mut timers = TimerService::new();
3245 let mut state = InMemoryStore::new();
3246 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3247
3248 for i in 0..5 {
3250 let event = create_order_event(1000 + i * 100, "order_1", 100);
3251 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3252 ctx.processing_time = 1000 + i * 100;
3253 operator.process_side(&event, JoinSide::Left, &mut ctx);
3254 }
3255
3256 assert!(!operator.is_side_idle(JoinSide::Left, 1500));
3258
3259 assert!(!operator.is_side_idle(JoinSide::Right, 1500));
3261
3262 operator.left_stats.events_this_window = 0;
3264 assert!(operator.is_side_idle(JoinSide::Left, 100_000));
3265 }
3266
3267 #[test]
3268 fn test_f057_effective_build_side_selection() {
3269 let config = StreamJoinConfig::builder()
3271 .left_key_column("key")
3272 .right_key_column("key")
3273 .time_bound(Duration::from_secs(60))
3274 .join_type(JoinType::Inner)
3275 .build_side(JoinSide::Right)
3276 .build()
3277 .unwrap();
3278
3279 let operator = StreamJoinOperator::from_config(config);
3280 assert_eq!(operator.effective_build_side(), JoinSide::Right);
3281
3282 let config2 = StreamJoinConfig::builder()
3284 .left_key_column("key")
3285 .right_key_column("key")
3286 .time_bound(Duration::from_secs(60))
3287 .join_type(JoinType::Inner)
3288 .build()
3289 .unwrap();
3290
3291 let mut operator2 = StreamJoinOperator::from_config(config2);
3292 operator2.left_stats.events_received = 100;
3293 operator2.right_stats.events_received = 1000;
3294
3295 assert_eq!(operator2.effective_build_side(), JoinSide::Left);
3297 }
3298
3299 #[test]
3300 fn test_f057_join_with_cpu_friendly_encoding() {
3301 let config = StreamJoinConfig::builder()
3302 .left_key_column("order_id")
3303 .right_key_column("order_id")
3304 .time_bound(Duration::from_secs(3600))
3305 .join_type(JoinType::Inner)
3306 .row_encoding(JoinRowEncoding::CpuFriendly)
3307 .build()
3308 .unwrap();
3309
3310 let mut operator = StreamJoinOperator::from_config(config);
3311 let mut timers = TimerService::new();
3312 let mut state = InMemoryStore::new();
3313 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
3314
3315 let order = create_order_event(1000, "order_1", 100);
3317 {
3318 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3319 operator.process_side(&order, JoinSide::Left, &mut ctx);
3320 }
3321
3322 let payment = create_payment_event(2000, "order_1", "paid");
3324 {
3325 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
3326 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
3327 assert_eq!(
3328 outputs
3329 .iter()
3330 .filter(|o| matches!(o, Output::Event(_)))
3331 .count(),
3332 1
3333 );
3334 }
3335
3336 assert_eq!(operator.metrics().matches, 1);
3337 assert_eq!(operator.metrics().cpu_friendly_encodes, 2); }
3339
3340 #[test]
3341 fn test_f057_checkpoint_restore_with_optimization_state() {
3342 let config = StreamJoinConfig::builder()
3343 .left_key_column("key")
3344 .right_key_column("key")
3345 .time_bound(Duration::from_secs(60))
3346 .join_type(JoinType::Inner)
3347 .operator_id("test_join")
3348 .build()
3349 .unwrap();
3350
3351 let mut operator = StreamJoinOperator::from_config(config);
3352
3353 operator.metrics.left_events = 100;
3355 operator.metrics.right_events = 50;
3356 operator.metrics.matches = 25;
3357 operator.metrics.cpu_friendly_encodes = 10;
3358 operator.metrics.compact_encodes = 140;
3359 operator.metrics.asymmetric_skips = 5;
3360 operator.metrics.idle_key_cleanups = 3;
3361 operator.metrics.build_side_prunes = 2;
3362 operator.left_stats.events_received = 100;
3363 operator.right_stats.events_received = 50;
3364 operator.left_watermark = 5000;
3365 operator.right_watermark = 4000;
3366
3367 let checkpoint = operator.checkpoint();
3369
3370 let config2 = StreamJoinConfig::builder()
3372 .left_key_column("key")
3373 .right_key_column("key")
3374 .time_bound(Duration::from_secs(60))
3375 .join_type(JoinType::Inner)
3376 .operator_id("test_join")
3377 .build()
3378 .unwrap();
3379
3380 let mut restored = StreamJoinOperator::from_config(config2);
3381 restored.restore(checkpoint).unwrap();
3382
3383 assert_eq!(restored.metrics().left_events, 100);
3385 assert_eq!(restored.metrics().right_events, 50);
3386 assert_eq!(restored.metrics().matches, 25);
3387 assert_eq!(restored.metrics().cpu_friendly_encodes, 10);
3388 assert_eq!(restored.metrics().compact_encodes, 140);
3389 assert_eq!(restored.metrics().asymmetric_skips, 5);
3390 assert_eq!(restored.metrics().idle_key_cleanups, 3);
3391 assert_eq!(restored.metrics().build_side_prunes, 2);
3392 assert_eq!(restored.left_stats.events_received, 100);
3393 assert_eq!(restored.right_stats.events_received, 50);
3394 assert_eq!(restored.left_watermark, 5000);
3395 assert_eq!(restored.right_watermark, 4000);
3396 }
3397
3398 #[test]
3399 fn test_f057_should_skip_compaction() {
3400 let config = StreamJoinConfig::builder()
3401 .left_key_column("key")
3402 .right_key_column("key")
3403 .time_bound(Duration::from_secs(60))
3404 .join_type(JoinType::Inner)
3405 .asymmetric_compaction(true)
3406 .idle_threshold(Duration::from_secs(10))
3407 .build()
3408 .unwrap();
3409
3410 let mut operator = StreamJoinOperator::from_config(config);
3411
3412 operator.left_stats.record_event(1000);
3414 operator.left_stats.events_this_window = 0; assert!(operator.should_skip_compaction(JoinSide::Left, 100_000));
3418
3419 operator.asymmetric_compaction = false;
3421 assert!(!operator.should_skip_compaction(JoinSide::Left, 100_000));
3422 }
3423
3424 #[test]
3425 fn test_f057_multiple_rows_cpu_friendly() {
3426 let schema = Arc::new(Schema::new(vec![
3428 Field::new("id", DataType::Int64, false),
3429 Field::new("name", DataType::Utf8, false),
3430 ]));
3431
3432 let batch = RecordBatch::try_new(
3434 schema.clone(),
3435 vec![
3436 Arc::new(Int64Array::from(vec![1])),
3437 Arc::new(StringArray::from(vec!["Alice"])),
3438 ],
3439 )
3440 .unwrap();
3441
3442 let row =
3443 JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
3444 .unwrap();
3445 let restored = row.to_batch().unwrap();
3446
3447 let id_col = restored
3448 .column(0)
3449 .as_any()
3450 .downcast_ref::<Int64Array>()
3451 .unwrap();
3452 let name_col = restored
3453 .column(1)
3454 .as_any()
3455 .downcast_ref::<StringArray>()
3456 .unwrap();
3457
3458 assert_eq!(id_col.value(0), 1);
3459 assert_eq!(name_col.value(0), "Alice");
3460 }
3461}