1use super::{
55 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
56 TimerKey,
57};
58use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
59use arrow_schema::{DataType, Field, Schema, SchemaRef};
60use rkyv::{
61 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
62};
63use rustc_hash::FxHashMap;
64use smallvec::SmallVec;
65use std::collections::BTreeMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67use std::sync::Arc;
68use std::time::Duration;
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum AsofDirection {
73 #[default]
76 Backward,
77 Forward,
79 Nearest,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum AsofJoinType {
86 #[default]
88 Inner,
89 Left,
91}
92
93impl AsofJoinType {
94 #[must_use]
96 pub fn emits_unmatched(&self) -> bool {
97 matches!(self, AsofJoinType::Left)
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct AsofJoinConfig {
104 pub key_column: String,
106 pub left_time_column: String,
108 pub right_time_column: String,
110 pub direction: AsofDirection,
112 pub tolerance: Option<Duration>,
114 pub join_type: AsofJoinType,
116 pub operator_id: Option<String>,
118}
119
120impl AsofJoinConfig {
121 #[must_use]
123 pub fn builder() -> AsofJoinConfigBuilder {
124 AsofJoinConfigBuilder::default()
125 }
126
127 #[must_use]
129 #[allow(clippy::cast_possible_truncation)] pub fn tolerance_ms(&self) -> i64 {
131 self.tolerance.map_or(i64::MAX, |d| d.as_millis() as i64)
132 }
133}
134
135#[derive(Debug, Default)]
137pub struct AsofJoinConfigBuilder {
138 key_column: Option<String>,
139 left_time_column: Option<String>,
140 right_time_column: Option<String>,
141 direction: Option<AsofDirection>,
142 tolerance: Option<Duration>,
143 join_type: Option<AsofJoinType>,
144 operator_id: Option<String>,
145}
146
147impl AsofJoinConfigBuilder {
148 #[must_use]
150 pub fn key_column(mut self, column: String) -> Self {
151 self.key_column = Some(column);
152 self
153 }
154
155 #[must_use]
157 pub fn left_time_column(mut self, column: String) -> Self {
158 self.left_time_column = Some(column);
159 self
160 }
161
162 #[must_use]
164 pub fn right_time_column(mut self, column: String) -> Self {
165 self.right_time_column = Some(column);
166 self
167 }
168
169 #[must_use]
171 pub fn direction(mut self, direction: AsofDirection) -> Self {
172 self.direction = Some(direction);
173 self
174 }
175
176 #[must_use]
178 pub fn tolerance(mut self, tolerance: Duration) -> Self {
179 self.tolerance = Some(tolerance);
180 self
181 }
182
183 #[must_use]
185 pub fn join_type(mut self, join_type: AsofJoinType) -> Self {
186 self.join_type = Some(join_type);
187 self
188 }
189
190 #[must_use]
192 pub fn operator_id(mut self, id: String) -> Self {
193 self.operator_id = Some(id);
194 self
195 }
196
197 pub fn build(self) -> Result<AsofJoinConfig, OperatorError> {
204 Ok(AsofJoinConfig {
205 key_column: self
206 .key_column
207 .ok_or_else(|| OperatorError::ConfigError("key_column is required".into()))?,
208 left_time_column: self
209 .left_time_column
210 .ok_or_else(|| OperatorError::ConfigError("left_time_column is required".into()))?,
211 right_time_column: self.right_time_column.ok_or_else(|| {
212 OperatorError::ConfigError("right_time_column is required".into())
213 })?,
214 direction: self.direction.unwrap_or_default(),
215 tolerance: self.tolerance,
216 join_type: self.join_type.unwrap_or_default(),
217 operator_id: self.operator_id,
218 })
219 }
220}
221
222const ASOF_TIMER_PREFIX: u8 = 0x50;
224
225static ASOF_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
227
228type AsofKey = SmallVec<[u8; 24]>;
231
232#[derive(Debug, Clone)]
237pub struct AsofRow {
238 pub timestamp: i64,
240 batch: Arc<RecordBatch>,
242}
243
244impl AsofRow {
245 fn new(timestamp: i64, batch: &Arc<RecordBatch>) -> Self {
249 Self {
250 timestamp,
251 batch: Arc::clone(batch),
252 }
253 }
254
255 #[must_use]
257 pub fn batch(&self) -> &RecordBatch {
258 &self.batch
259 }
260}
261
262#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
265struct SerializableAsofRow {
266 timestamp: i64,
267 data: Vec<u8>,
268}
269
270impl SerializableAsofRow {
271 fn from_row(row: &AsofRow) -> Result<Self, OperatorError> {
272 let data = crate::serialization::serialize_batch_stream(&row.batch)?;
273 Ok(Self {
274 timestamp: row.timestamp,
275 data,
276 })
277 }
278
279 fn to_row(&self) -> Result<AsofRow, OperatorError> {
280 let batch = crate::serialization::deserialize_batch_stream(&self.data)?;
281 Ok(AsofRow {
282 timestamp: self.timestamp,
283 batch: Arc::new(batch),
284 })
285 }
286}
287
288#[derive(Debug, Clone, Default)]
292pub struct KeyState {
293 pub events: BTreeMap<i64, SmallVec<[AsofRow; 1]>>,
296 pub min_timestamp: i64,
298 pub max_timestamp: i64,
300}
301
302impl KeyState {
303 #[must_use]
305 pub fn new() -> Self {
306 Self {
307 events: BTreeMap::new(),
308 min_timestamp: i64::MAX,
309 max_timestamp: i64::MIN,
310 }
311 }
312
313 pub fn insert(&mut self, row: AsofRow) {
315 let ts = row.timestamp;
316 self.events.entry(ts).or_default().push(row);
317 self.min_timestamp = self.min_timestamp.min(ts);
318 self.max_timestamp = self.max_timestamp.max(ts);
319 }
320
321 #[must_use]
323 pub fn len(&self) -> usize {
324 self.events.values().map(SmallVec::len).sum()
325 }
326
327 #[must_use]
329 pub fn is_empty(&self) -> bool {
330 self.events.is_empty()
331 }
332
333 pub fn cleanup_before(&mut self, threshold: i64) {
335 self.events = self.events.split_off(&threshold);
336 self.min_timestamp = self.events.keys().next().copied().unwrap_or(i64::MAX);
337 }
338}
339
340#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
342struct SerializableKeyState {
343 events: Vec<(i64, Vec<SerializableAsofRow>)>,
344 min_timestamp: i64,
345 max_timestamp: i64,
346}
347
348impl SerializableKeyState {
349 fn from_key_state(state: &KeyState) -> Result<Self, OperatorError> {
351 let events = state
352 .events
353 .iter()
354 .map(|(ts, rows)| {
355 let ser_rows: Result<Vec<_>, _> =
356 rows.iter().map(SerializableAsofRow::from_row).collect();
357 ser_rows.map(|r| (*ts, r))
358 })
359 .collect::<Result<Vec<_>, _>>()?;
360 Ok(Self {
361 events,
362 min_timestamp: state.min_timestamp,
363 max_timestamp: state.max_timestamp,
364 })
365 }
366
367 fn to_key_state(&self) -> Result<KeyState, OperatorError> {
369 let mut events = BTreeMap::new();
370 for (ts, rows) in &self.events {
371 let asof_rows: Result<SmallVec<[AsofRow; 1]>, _> =
372 rows.iter().map(SerializableAsofRow::to_row).collect();
373 events.insert(*ts, asof_rows?);
374 }
375 Ok(KeyState {
376 events,
377 min_timestamp: self.min_timestamp,
378 max_timestamp: self.max_timestamp,
379 })
380 }
381}
382
383#[derive(Debug, Clone, Default)]
385pub struct AsofJoinMetrics {
386 pub left_events: u64,
388 pub right_events: u64,
390 pub matches: u64,
392 pub unmatched_left: u64,
394 pub within_tolerance: u64,
396 pub outside_tolerance: u64,
398 pub late_events: u64,
400 pub state_cleanups: u64,
402}
403
404impl AsofJoinMetrics {
405 #[must_use]
407 pub fn new() -> Self {
408 Self::default()
409 }
410
411 pub fn reset(&mut self) {
413 *self = Self::default();
414 }
415}
416
417pub struct AsofJoinOperator {
434 config: AsofJoinConfig,
436 operator_id: String,
438 right_state: FxHashMap<AsofKey, KeyState>,
440 watermark: i64,
442 metrics: AsofJoinMetrics,
444 output_schema: Option<SchemaRef>,
446 left_schema: Option<SchemaRef>,
448 right_schema: Option<SchemaRef>,
450 left_key_index: Option<usize>,
452 right_key_index: Option<usize>,
454}
455
456impl AsofJoinOperator {
457 #[must_use]
459 pub fn new(config: AsofJoinConfig) -> Self {
460 let operator_id = config.operator_id.clone().unwrap_or_else(|| {
461 let num = ASOF_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
462 format!("asof_join_{num}")
463 });
464
465 Self {
466 config,
467 operator_id,
468 right_state: FxHashMap::default(),
469 watermark: i64::MIN,
470 metrics: AsofJoinMetrics::new(),
471 output_schema: None,
472 left_schema: None,
473 right_schema: None,
474 left_key_index: None,
475 right_key_index: None,
476 }
477 }
478
479 #[must_use]
481 pub fn with_id(mut config: AsofJoinConfig, operator_id: String) -> Self {
482 config.operator_id = Some(operator_id);
483 Self::new(config)
484 }
485
486 #[must_use]
488 pub fn config(&self) -> &AsofJoinConfig {
489 &self.config
490 }
491
492 #[must_use]
494 pub fn metrics(&self) -> &AsofJoinMetrics {
495 &self.metrics
496 }
497
498 pub fn reset_metrics(&mut self) {
500 self.metrics.reset();
501 }
502
503 #[must_use]
505 pub fn watermark(&self) -> i64 {
506 self.watermark
507 }
508
509 #[must_use]
511 pub fn state_size(&self) -> usize {
512 self.right_state.values().map(KeyState::len).sum()
513 }
514
515 pub fn process_left(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
517 self.metrics.left_events += 1;
518
519 if self.left_schema.is_none() {
521 self.left_schema = Some(event.data.schema());
522 self.update_output_schema();
523 }
524
525 let mut output = OutputVec::new();
526
527 let Some(key_value) = Self::extract_key(
529 &event.data,
530 &self.config.key_column,
531 &mut self.left_key_index,
532 ) else {
533 return output;
534 };
535
536 let left_timestamp = event.timestamp;
538
539 let match_result = self.find_match(&key_value, left_timestamp);
541
542 match match_result {
543 Some(matched_row) => {
544 self.metrics.matches += 1;
545 self.metrics.within_tolerance += 1;
546
547 if let Some(joined) = self.create_joined_event(event, &matched_row) {
549 output.push(Output::Event(joined));
550 }
551 }
552 None => {
553 if self.config.join_type.emits_unmatched() {
554 self.metrics.unmatched_left += 1;
555 if let Some(unmatched) = self.create_unmatched_event(event) {
556 output.push(Output::Event(unmatched));
557 }
558 }
559 }
560 }
561
562 output
563 }
564
565 pub fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
567 self.metrics.right_events += 1;
568
569 if self.right_schema.is_none() {
571 self.right_schema = Some(event.data.schema());
572 self.update_output_schema();
573 }
574
575 let output = OutputVec::new();
576
577 let Some(key_value) = Self::extract_key(
579 &event.data,
580 &self.config.key_column,
581 &mut self.right_key_index,
582 ) else {
583 return output;
584 };
585
586 if self.watermark > i64::MIN && event.timestamp < self.watermark {
588 self.metrics.late_events += 1;
589 }
591
592 let row = AsofRow::new(event.timestamp, &event.data);
594
595 let cleanup_time = self.calculate_cleanup_time(event.timestamp);
597
598 let key_state = self.right_state.entry(key_value).or_default();
599 key_state.insert(row);
600
601 let timer_key = Self::make_cleanup_timer_key(&key_state.max_timestamp.to_be_bytes());
603 ctx.timers
604 .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
605
606 output
607 }
608
609 fn find_match(&self, key: &[u8], left_timestamp: i64) -> Option<AsofRow> {
611 let key_state = self.right_state.get(key)?;
612
613 match self.config.direction {
614 AsofDirection::Backward => self.find_backward_match(key_state, left_timestamp),
615 AsofDirection::Forward => self.find_forward_match(key_state, left_timestamp),
616 AsofDirection::Nearest => self.find_nearest_match(key_state, left_timestamp),
617 }
618 }
619
620 fn find_backward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
622 let (ts, rows) = key_state.events.range(..=left_timestamp).next_back()?;
624
625 let diff = left_timestamp - ts;
627 if diff > self.config.tolerance_ms() {
628 return None;
629 }
630
631 rows.last().cloned()
633 }
634
635 fn find_forward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
637 let (ts, rows) = key_state.events.range(left_timestamp..).next()?;
639
640 let diff = ts - left_timestamp;
642 if diff > self.config.tolerance_ms() {
643 return None;
644 }
645
646 rows.first().cloned()
648 }
649
650 fn find_nearest_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
652 let before = key_state.events.range(..=left_timestamp).next_back();
653 let after = key_state.events.range(left_timestamp..).next();
654
655 let candidate = match (before, after) {
656 (Some((ts_before, rows_before)), Some((ts_after, rows_after))) => {
657 let diff_before = left_timestamp - ts_before;
658 let diff_after = ts_after - left_timestamp;
659 if diff_before <= diff_after {
660 Some((diff_before, rows_before.last()?.clone()))
661 } else {
662 Some((diff_after, rows_after.first()?.clone()))
663 }
664 }
665 (Some((ts, rows)), None) => {
666 let diff = left_timestamp - ts;
667 Some((diff, rows.last()?.clone()))
668 }
669 (None, Some((ts, rows))) => {
670 let diff = ts - left_timestamp;
671 Some((diff, rows.first()?.clone()))
672 }
673 (None, None) => None,
674 };
675
676 let (diff, row) = candidate?;
677
678 if diff > self.config.tolerance_ms() {
680 return None;
681 }
682
683 Some(row)
684 }
685
686 fn calculate_cleanup_time(&self, timestamp: i64) -> i64 {
688 let tolerance_ms = self.config.tolerance_ms();
689 match self.config.direction {
690 AsofDirection::Backward | AsofDirection::Nearest => {
693 if tolerance_ms == i64::MAX {
694 i64::MAX
695 } else {
696 timestamp.saturating_add(tolerance_ms)
697 }
698 }
699 AsofDirection::Forward => timestamp,
701 }
702 }
703
704 pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
706 self.watermark = watermark;
707 self.cleanup_state(watermark);
708 OutputVec::new()
709 }
710
711 fn cleanup_state(&mut self, watermark: i64) {
713 let tolerance_ms = self.config.tolerance_ms();
714
715 let threshold = match self.config.direction {
716 AsofDirection::Backward | AsofDirection::Nearest => {
717 if tolerance_ms == i64::MAX {
718 i64::MIN } else {
720 watermark.saturating_sub(tolerance_ms)
721 }
722 }
723 AsofDirection::Forward => watermark,
724 };
725
726 if threshold == i64::MIN {
727 return;
728 }
729
730 let initial_count: usize = self.right_state.values().map(KeyState::len).sum();
731
732 for key_state in self.right_state.values_mut() {
733 key_state.cleanup_before(threshold);
734 }
735
736 self.right_state.retain(|_, v| !v.is_empty());
738
739 let final_count: usize = self.right_state.values().map(KeyState::len).sum();
740 if final_count < initial_count {
741 self.metrics.state_cleanups += (initial_count - final_count) as u64;
742 }
743 }
744
745 fn extract_key(
749 batch: &RecordBatch,
750 column_name: &str,
751 cached_index: &mut Option<usize>,
752 ) -> Option<AsofKey> {
753 let column_index = if let Some(idx) = *cached_index {
754 idx
755 } else {
756 let idx = batch.schema().index_of(column_name).ok()?;
757 *cached_index = Some(idx);
758 idx
759 };
760 let column = batch.column(column_index);
761
762 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
763 if string_array.is_empty() || string_array.is_null(0) {
764 return None;
765 }
766 return Some(AsofKey::from_slice(string_array.value(0).as_bytes()));
767 }
768
769 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
770 if int_array.is_empty() || int_array.is_null(0) {
771 return None;
772 }
773 return Some(AsofKey::from_slice(&int_array.value(0).to_le_bytes()));
774 }
775
776 None
777 }
778
779 fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
781 let mut key = TimerKey::new();
782 key.push(ASOF_TIMER_PREFIX);
783 key.extend_from_slice(key_suffix);
784 key
785 }
786
787 fn update_output_schema(&mut self) {
789 if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
790 let mut fields: Vec<Field> =
791 Vec::with_capacity(left.fields().len() + right.fields().len());
792 fields.extend(left.fields().iter().map(|f| f.as_ref().clone()));
793
794 for field in right.fields() {
796 let name = if left.field_with_name(field.name()).is_ok() {
797 format!("right_{}", field.name())
798 } else {
799 field.name().clone()
800 };
801 fields.push(Field::new(
802 name,
803 field.data_type().clone(),
804 true, ));
806 }
807
808 self.output_schema = Some(Arc::new(Schema::new(fields)));
809 }
810 }
811
812 fn create_joined_event(&self, left_event: &Event, right_row: &AsofRow) -> Option<Event> {
814 let schema = self.output_schema.as_ref()?;
815
816 let left_cols = left_event.data.columns();
817 let right_cols = right_row.batch().columns();
818 let mut columns: Vec<ArrayRef> = Vec::with_capacity(left_cols.len() + right_cols.len());
819 columns.extend_from_slice(left_cols);
820 for column in right_cols {
821 columns.push(Arc::clone(column));
822 }
823
824 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
825
826 Some(Event::new(left_event.timestamp, joined_batch))
827 }
828
829 fn create_unmatched_event(&self, left_event: &Event) -> Option<Event> {
831 let schema = self.output_schema.as_ref()?;
832 let right_schema = self.right_schema.as_ref()?;
833
834 let num_rows = left_event.data.num_rows();
835 let left_cols = left_event.data.columns();
836 let mut columns: Vec<ArrayRef> =
837 Vec::with_capacity(left_cols.len() + right_schema.fields().len());
838 columns.extend_from_slice(left_cols);
839
840 for field in right_schema.fields() {
842 columns.push(Self::create_null_array(field.data_type(), num_rows));
843 }
844
845 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
846
847 Some(Event::new(left_event.timestamp, joined_batch))
848 }
849
850 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
852 match data_type {
853 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
854 DataType::Float64 => {
855 use arrow_array::Float64Array;
856 Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
857 }
858 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
860 }
861 }
862}
863
864impl Operator for AsofJoinOperator {
865 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
866 self.process_left(event, ctx)
868 }
869
870 fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
871 if timer.key.first() == Some(&ASOF_TIMER_PREFIX) {
873 self.cleanup_state(timer.timestamp);
874 }
875 OutputVec::new()
876 }
877
878 fn checkpoint(&self) -> OperatorState {
879 let mut keys_dropped: u64 = 0;
880 let mut state_entries: Vec<(Vec<u8>, SerializableKeyState)> =
881 Vec::with_capacity(self.right_state.len());
882 for (k, v) in &self.right_state {
883 match SerializableKeyState::from_key_state(v) {
884 Ok(s) => state_entries.push((k.to_vec(), s)),
885 Err(e) => {
886 keys_dropped += 1;
887 tracing::error!(
888 operator_id = %self.operator_id,
889 error = %e,
890 "[LDB-6013] ASOF join key failed serialization during checkpoint — \
891 this key will be MISSING after recovery"
892 );
893 }
894 }
895 }
896 if keys_dropped > 0 {
897 tracing::error!(
898 operator_id = %self.operator_id,
899 keys_dropped,
900 keys_total = self.right_state.len(),
901 "[LDB-6013] ASOF join checkpoint lost state — \
902 {keys_dropped} keys could not be serialized"
903 );
904 }
905
906 let checkpoint_data = (
907 self.watermark,
908 self.metrics.left_events,
909 self.metrics.right_events,
910 self.metrics.matches,
911 self.metrics.unmatched_left,
912 state_entries,
913 );
914
915 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
916 .map(|v| v.to_vec())
917 .unwrap_or_default();
918
919 OperatorState {
920 operator_id: self.operator_id.clone(),
921 data,
922 }
923 }
924
925 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
926 type CheckpointData = (
927 i64,
928 u64,
929 u64,
930 u64,
931 u64,
932 Vec<(Vec<u8>, SerializableKeyState)>,
933 );
934
935 if state.operator_id != self.operator_id {
936 return Err(OperatorError::StateAccessFailed(format!(
937 "Operator ID mismatch: expected {}, got {}",
938 self.operator_id, state.operator_id
939 )));
940 }
941
942 let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
943 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
944 let (watermark, left_events, right_events, matches, unmatched_left, state_entries) =
945 rkyv::deserialize::<CheckpointData, RkyvError>(archived)
946 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
947
948 self.watermark = watermark;
949 self.metrics.left_events = left_events;
950 self.metrics.right_events = right_events;
951 self.metrics.matches = matches;
952 self.metrics.unmatched_left = unmatched_left;
953
954 self.right_state.clear();
956 for (key, serializable) in state_entries {
957 let key_state = serializable.to_key_state()?;
958 self.right_state
959 .insert(AsofKey::from_slice(&key), key_state);
960 }
961
962 Ok(())
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use super::*;
969 use crate::state::{InMemoryStore, StateStore};
970 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
971 use arrow_array::Float64Array;
972 use arrow_schema::{DataType, Field, Schema};
973
974 fn create_trade_event(timestamp: i64, symbol: &str, price: f64) -> Event {
976 let schema = Arc::new(Schema::new(vec![
977 Field::new("symbol", DataType::Utf8, false),
978 Field::new("price", DataType::Float64, false),
979 ]));
980 let batch = RecordBatch::try_new(
981 schema,
982 vec![
983 Arc::new(StringArray::from(vec![symbol])),
984 Arc::new(Float64Array::from(vec![price])),
985 ],
986 )
987 .unwrap();
988 Event::new(timestamp, batch)
989 }
990
991 fn create_quote_event(timestamp: i64, symbol: &str, bid: f64, ask: f64) -> Event {
993 let schema = Arc::new(Schema::new(vec![
994 Field::new("symbol", DataType::Utf8, false),
995 Field::new("bid", DataType::Float64, false),
996 Field::new("ask", DataType::Float64, false),
997 ]));
998 let batch = RecordBatch::try_new(
999 schema,
1000 vec![
1001 Arc::new(StringArray::from(vec![symbol])),
1002 Arc::new(Float64Array::from(vec![bid])),
1003 Arc::new(Float64Array::from(vec![ask])),
1004 ],
1005 )
1006 .unwrap();
1007 Event::new(timestamp, batch)
1008 }
1009
1010 fn create_test_context<'a>(
1011 timers: &'a mut TimerService,
1012 state: &'a mut dyn StateStore,
1013 watermark_gen: &'a mut dyn WatermarkGenerator,
1014 ) -> OperatorContext<'a> {
1015 OperatorContext {
1016 event_time: 0,
1017 processing_time: 0,
1018 timers,
1019 state,
1020 watermark_generator: watermark_gen,
1021 operator_index: 0,
1022 }
1023 }
1024
1025 #[test]
1026 fn test_asof_direction_default() {
1027 assert_eq!(AsofDirection::default(), AsofDirection::Backward);
1028 }
1029
1030 #[test]
1031 fn test_asof_join_type_properties() {
1032 assert!(!AsofJoinType::Inner.emits_unmatched());
1033 assert!(AsofJoinType::Left.emits_unmatched());
1034 }
1035
1036 #[test]
1037 fn test_config_builder() {
1038 let config = AsofJoinConfig::builder()
1039 .key_column("symbol".to_string())
1040 .left_time_column("trade_time".to_string())
1041 .right_time_column("quote_time".to_string())
1042 .direction(AsofDirection::Backward)
1043 .tolerance(Duration::from_secs(5))
1044 .join_type(AsofJoinType::Left)
1045 .operator_id("test_op".to_string())
1046 .build()
1047 .unwrap();
1048
1049 assert_eq!(config.key_column, "symbol");
1050 assert_eq!(config.left_time_column, "trade_time");
1051 assert_eq!(config.right_time_column, "quote_time");
1052 assert_eq!(config.direction, AsofDirection::Backward);
1053 assert_eq!(config.tolerance, Some(Duration::from_secs(5)));
1054 assert_eq!(config.join_type, AsofJoinType::Left);
1055 assert_eq!(config.tolerance_ms(), 5000);
1056 }
1057
1058 #[test]
1059 fn test_backward_asof_basic() {
1060 let config = AsofJoinConfig::builder()
1061 .key_column("symbol".to_string())
1062 .left_time_column("trade_time".to_string())
1063 .right_time_column("quote_time".to_string())
1064 .direction(AsofDirection::Backward)
1065 .tolerance(Duration::from_secs(10))
1066 .join_type(AsofJoinType::Inner)
1067 .build()
1068 .unwrap();
1069
1070 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1071
1072 let mut timers = TimerService::new();
1073 let mut state = InMemoryStore::new();
1074 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1075
1076 let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1078 {
1079 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1080 operator.process_right("e, &mut ctx);
1081 }
1082
1083 let quote2 = create_quote_event(950, "AAPL", 152.0, 153.0);
1085 {
1086 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1087 operator.process_right("e2, &mut ctx);
1088 }
1089
1090 let trade = create_trade_event(1000, "AAPL", 152.5);
1092 let outputs = {
1093 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1094 operator.process_left(&trade, &mut ctx)
1095 };
1096
1097 assert_eq!(
1098 outputs
1099 .iter()
1100 .filter(|o| matches!(o, Output::Event(_)))
1101 .count(),
1102 1
1103 );
1104 assert_eq!(operator.metrics().matches, 1);
1105
1106 if let Some(Output::Event(event)) = outputs.first() {
1108 assert_eq!(event.data.num_columns(), 5); }
1110 }
1111
1112 #[test]
1113 fn test_forward_asof_basic() {
1114 let config = AsofJoinConfig::builder()
1115 .key_column("symbol".to_string())
1116 .left_time_column("trade_time".to_string())
1117 .right_time_column("quote_time".to_string())
1118 .direction(AsofDirection::Forward)
1119 .tolerance(Duration::from_secs(10))
1120 .join_type(AsofJoinType::Inner)
1121 .build()
1122 .unwrap();
1123
1124 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1125
1126 let mut timers = TimerService::new();
1127 let mut state = InMemoryStore::new();
1128 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1129
1130 let quote1 = create_quote_event(1050, "AAPL", 150.0, 151.0);
1132 let quote2 = create_quote_event(1100, "AAPL", 152.0, 153.0);
1133 {
1134 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1135 operator.process_right("e1, &mut ctx);
1136 operator.process_right("e2, &mut ctx);
1137 }
1138
1139 let trade = create_trade_event(1000, "AAPL", 150.5);
1141 let outputs = {
1142 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1143 operator.process_left(&trade, &mut ctx)
1144 };
1145
1146 assert_eq!(
1147 outputs
1148 .iter()
1149 .filter(|o| matches!(o, Output::Event(_)))
1150 .count(),
1151 1
1152 );
1153 assert_eq!(operator.metrics().matches, 1);
1154 }
1155
1156 #[test]
1157 fn test_nearest_asof() {
1158 let config = AsofJoinConfig::builder()
1159 .key_column("symbol".to_string())
1160 .left_time_column("trade_time".to_string())
1161 .right_time_column("quote_time".to_string())
1162 .direction(AsofDirection::Nearest)
1163 .tolerance(Duration::from_secs(10))
1164 .join_type(AsofJoinType::Inner)
1165 .build()
1166 .unwrap();
1167
1168 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1169
1170 let mut timers = TimerService::new();
1171 let mut state = InMemoryStore::new();
1172 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1173
1174 let quote_before = create_quote_event(990, "AAPL", 150.0, 151.0);
1176 let quote_after = create_quote_event(1020, "AAPL", 152.0, 153.0);
1177 {
1178 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1179 operator.process_right("e_before, &mut ctx);
1180 operator.process_right("e_after, &mut ctx);
1181 }
1182
1183 let trade = create_trade_event(1000, "AAPL", 150.5);
1186 let outputs = {
1187 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1188 operator.process_left(&trade, &mut ctx)
1189 };
1190
1191 assert_eq!(
1192 outputs
1193 .iter()
1194 .filter(|o| matches!(o, Output::Event(_)))
1195 .count(),
1196 1
1197 );
1198 }
1199
1200 #[test]
1201 fn test_tolerance_exceeded() {
1202 let config = AsofJoinConfig::builder()
1203 .key_column("symbol".to_string())
1204 .left_time_column("trade_time".to_string())
1205 .right_time_column("quote_time".to_string())
1206 .direction(AsofDirection::Backward)
1207 .tolerance(Duration::from_millis(50)) .join_type(AsofJoinType::Inner)
1209 .build()
1210 .unwrap();
1211
1212 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1213
1214 let mut timers = TimerService::new();
1215 let mut state = InMemoryStore::new();
1216 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1217
1218 let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1220 {
1221 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1222 operator.process_right("e, &mut ctx);
1223 }
1224
1225 let trade = create_trade_event(1000, "AAPL", 150.5);
1227 let outputs = {
1228 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1229 operator.process_left(&trade, &mut ctx)
1230 };
1231
1232 assert_eq!(outputs.len(), 0);
1234 assert_eq!(operator.metrics().matches, 0);
1235 }
1236
1237 #[test]
1238 fn test_tolerance_within() {
1239 let config = AsofJoinConfig::builder()
1240 .key_column("symbol".to_string())
1241 .left_time_column("trade_time".to_string())
1242 .right_time_column("quote_time".to_string())
1243 .direction(AsofDirection::Backward)
1244 .tolerance(Duration::from_millis(100)) .join_type(AsofJoinType::Inner)
1246 .build()
1247 .unwrap();
1248
1249 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1250
1251 let mut timers = TimerService::new();
1252 let mut state = InMemoryStore::new();
1253 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1254
1255 let quote = create_quote_event(950, "AAPL", 150.0, 151.0);
1257 {
1258 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1259 operator.process_right("e, &mut ctx);
1260 }
1261
1262 let trade = create_trade_event(1000, "AAPL", 150.5);
1264 let outputs = {
1265 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1266 operator.process_left(&trade, &mut ctx)
1267 };
1268
1269 assert_eq!(
1270 outputs
1271 .iter()
1272 .filter(|o| matches!(o, Output::Event(_)))
1273 .count(),
1274 1
1275 );
1276 assert_eq!(operator.metrics().within_tolerance, 1);
1277 }
1278
1279 #[test]
1280 fn test_no_match_empty_state() {
1281 let config = AsofJoinConfig::builder()
1282 .key_column("symbol".to_string())
1283 .left_time_column("trade_time".to_string())
1284 .right_time_column("quote_time".to_string())
1285 .direction(AsofDirection::Backward)
1286 .join_type(AsofJoinType::Inner)
1287 .build()
1288 .unwrap();
1289
1290 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1291
1292 let mut timers = TimerService::new();
1293 let mut state = InMemoryStore::new();
1294 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1295
1296 let trade = create_trade_event(1000, "AAPL", 150.5);
1298 let outputs = {
1299 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1300 operator.process_left(&trade, &mut ctx)
1301 };
1302
1303 assert_eq!(outputs.len(), 0);
1304 assert_eq!(operator.metrics().matches, 0);
1305 }
1306
1307 #[test]
1308 fn test_multiple_keys() {
1309 let config = AsofJoinConfig::builder()
1310 .key_column("symbol".to_string())
1311 .left_time_column("trade_time".to_string())
1312 .right_time_column("quote_time".to_string())
1313 .direction(AsofDirection::Backward)
1314 .tolerance(Duration::from_secs(10))
1315 .join_type(AsofJoinType::Inner)
1316 .build()
1317 .unwrap();
1318
1319 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1320
1321 let mut timers = TimerService::new();
1322 let mut state = InMemoryStore::new();
1323 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1324
1325 {
1327 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1328 operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1329 operator.process_right(&create_quote_event(960, "GOOG", 2800.0, 2801.0), &mut ctx);
1330 }
1331
1332 let trade = create_trade_event(1000, "AAPL", 150.5);
1334 let outputs = {
1335 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1336 operator.process_left(&trade, &mut ctx)
1337 };
1338
1339 assert_eq!(
1340 outputs
1341 .iter()
1342 .filter(|o| matches!(o, Output::Event(_)))
1343 .count(),
1344 1
1345 );
1346
1347 let trade2 = create_trade_event(1000, "GOOG", 2800.5);
1349 let outputs2 = {
1350 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1351 operator.process_left(&trade2, &mut ctx)
1352 };
1353
1354 assert_eq!(
1355 outputs2
1356 .iter()
1357 .filter(|o| matches!(o, Output::Event(_)))
1358 .count(),
1359 1
1360 );
1361 assert_eq!(operator.metrics().matches, 2);
1362 }
1363
1364 #[test]
1365 fn test_multiple_events_same_timestamp() {
1366 let config = AsofJoinConfig::builder()
1367 .key_column("symbol".to_string())
1368 .left_time_column("trade_time".to_string())
1369 .right_time_column("quote_time".to_string())
1370 .direction(AsofDirection::Backward)
1371 .tolerance(Duration::from_secs(10))
1372 .join_type(AsofJoinType::Inner)
1373 .build()
1374 .unwrap();
1375
1376 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1377
1378 let mut timers = TimerService::new();
1379 let mut state = InMemoryStore::new();
1380 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1381
1382 {
1384 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1385 operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1386 operator.process_right(&create_quote_event(950, "AAPL", 150.5, 151.5), &mut ctx);
1387 }
1389
1390 let trade = create_trade_event(1000, "AAPL", 150.5);
1392 let outputs = {
1393 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1394 operator.process_left(&trade, &mut ctx)
1395 };
1396
1397 assert_eq!(
1398 outputs
1399 .iter()
1400 .filter(|o| matches!(o, Output::Event(_)))
1401 .count(),
1402 1
1403 );
1404 }
1405
1406 #[test]
1407 fn test_left_outer_join() {
1408 let config = AsofJoinConfig::builder()
1409 .key_column("symbol".to_string())
1410 .left_time_column("trade_time".to_string())
1411 .right_time_column("quote_time".to_string())
1412 .direction(AsofDirection::Backward)
1413 .tolerance(Duration::from_millis(50))
1414 .join_type(AsofJoinType::Left) .build()
1416 .unwrap();
1417
1418 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1419
1420 let mut timers = TimerService::new();
1421 let mut state = InMemoryStore::new();
1422 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1423
1424 {
1426 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1427 operator.process_right(&create_quote_event(990, "AAPL", 150.0, 151.0), &mut ctx);
1428 }
1429 {
1430 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1431 operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1432 }
1433
1434 let trade = create_trade_event(2000, "GOOG", 2800.5);
1436 let outputs = {
1437 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1438 operator.process_left(&trade, &mut ctx)
1439 };
1440
1441 assert_eq!(
1443 outputs
1444 .iter()
1445 .filter(|o| matches!(o, Output::Event(_)))
1446 .count(),
1447 1
1448 );
1449 assert_eq!(operator.metrics().unmatched_left, 1);
1450
1451 if let Some(Output::Event(event)) = outputs.first() {
1452 assert_eq!(event.data.num_columns(), 5);
1454 }
1455 }
1456
1457 #[test]
1458 fn test_inner_join_no_output() {
1459 let config = AsofJoinConfig::builder()
1460 .key_column("symbol".to_string())
1461 .left_time_column("trade_time".to_string())
1462 .right_time_column("quote_time".to_string())
1463 .direction(AsofDirection::Backward)
1464 .tolerance(Duration::from_millis(50))
1465 .join_type(AsofJoinType::Inner)
1466 .build()
1467 .unwrap();
1468
1469 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1470
1471 let mut timers = TimerService::new();
1472 let mut state = InMemoryStore::new();
1473 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1474
1475 let trade = create_trade_event(1000, "AAPL", 150.5);
1477 let outputs = {
1478 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1479 operator.process_left(&trade, &mut ctx)
1480 };
1481
1482 assert_eq!(outputs.len(), 0);
1484 }
1485
1486 #[test]
1487 fn test_state_cleanup() {
1488 let config = AsofJoinConfig::builder()
1489 .key_column("symbol".to_string())
1490 .left_time_column("trade_time".to_string())
1491 .right_time_column("quote_time".to_string())
1492 .direction(AsofDirection::Backward)
1493 .tolerance(Duration::from_millis(100))
1494 .join_type(AsofJoinType::Inner)
1495 .build()
1496 .unwrap();
1497
1498 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1499
1500 let mut timers = TimerService::new();
1501 let mut state = InMemoryStore::new();
1502 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1503
1504 {
1506 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1507 operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1508 operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1509 }
1510
1511 assert_eq!(operator.state_size(), 2);
1512
1513 {
1515 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1516 operator.on_watermark(1100, &mut ctx);
1517 }
1518
1519 assert!(operator.state_size() < 2 || operator.metrics().state_cleanups > 0);
1521 }
1522
1523 #[test]
1524 fn test_late_event_still_stored() {
1525 let config = AsofJoinConfig::builder()
1526 .key_column("symbol".to_string())
1527 .left_time_column("trade_time".to_string())
1528 .right_time_column("quote_time".to_string())
1529 .direction(AsofDirection::Backward)
1530 .tolerance(Duration::from_secs(10))
1531 .join_type(AsofJoinType::Inner)
1532 .build()
1533 .unwrap();
1534
1535 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1536
1537 let mut timers = TimerService::new();
1538 let mut state = InMemoryStore::new();
1539 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1540
1541 operator.watermark = 1000;
1543
1544 let quote = create_quote_event(500, "AAPL", 150.0, 151.0);
1546 {
1547 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1548 operator.process_right("e, &mut ctx);
1549 }
1550
1551 assert_eq!(operator.metrics().late_events, 1);
1552 assert_eq!(operator.state_size(), 1);
1554 }
1555
1556 #[test]
1557 fn test_checkpoint_restore() {
1558 let config = AsofJoinConfig::builder()
1559 .key_column("symbol".to_string())
1560 .left_time_column("trade_time".to_string())
1561 .right_time_column("quote_time".to_string())
1562 .direction(AsofDirection::Backward)
1563 .tolerance(Duration::from_secs(10))
1564 .join_type(AsofJoinType::Inner)
1565 .build()
1566 .unwrap();
1567
1568 let mut operator = AsofJoinOperator::with_id(config.clone(), "test_asof".to_string());
1569
1570 let mut timers = TimerService::new();
1571 let mut state = InMemoryStore::new();
1572 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1573
1574 {
1576 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1577 operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1578 operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1579 }
1580
1581 operator.metrics.left_events = 10;
1583 operator.metrics.matches = 5;
1584 operator.watermark = 800;
1585
1586 let checkpoint = operator.checkpoint();
1588
1589 let mut restored = AsofJoinOperator::with_id(config, "test_asof".to_string());
1591 restored.restore(checkpoint).unwrap();
1592
1593 assert_eq!(restored.metrics().left_events, 10);
1595 assert_eq!(restored.metrics().matches, 5);
1596 assert_eq!(restored.watermark(), 800);
1597 assert_eq!(restored.state_size(), 2);
1598 }
1599
1600 #[test]
1601 fn test_schema_composition() {
1602 let config = AsofJoinConfig::builder()
1603 .key_column("symbol".to_string())
1604 .left_time_column("trade_time".to_string())
1605 .right_time_column("quote_time".to_string())
1606 .direction(AsofDirection::Backward)
1607 .tolerance(Duration::from_secs(10))
1608 .join_type(AsofJoinType::Inner)
1609 .build()
1610 .unwrap();
1611
1612 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1613
1614 let mut timers = TimerService::new();
1615 let mut state = InMemoryStore::new();
1616 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1617
1618 {
1620 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1621 operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1622 }
1623
1624 let trade = create_trade_event(1000, "AAPL", 150.5);
1626 let outputs = {
1627 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1628 operator.process_left(&trade, &mut ctx)
1629 };
1630
1631 assert_eq!(outputs.len(), 1);
1632
1633 if let Some(Output::Event(event)) = outputs.first() {
1634 let schema = event.data.schema();
1635
1636 assert!(schema.field_with_name("price").is_ok());
1638
1639 assert!(schema.field_with_name("right_symbol").is_ok());
1641 assert!(schema.field_with_name("bid").is_ok());
1642 assert!(schema.field_with_name("ask").is_ok());
1643 }
1644 }
1645
1646 #[test]
1647 fn test_metrics_tracking() {
1648 let config = AsofJoinConfig::builder()
1649 .key_column("symbol".to_string())
1650 .left_time_column("trade_time".to_string())
1651 .right_time_column("quote_time".to_string())
1652 .direction(AsofDirection::Backward)
1653 .tolerance(Duration::from_secs(10))
1654 .join_type(AsofJoinType::Inner)
1655 .build()
1656 .unwrap();
1657
1658 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1659
1660 let mut timers = TimerService::new();
1661 let mut state = InMemoryStore::new();
1662 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1663
1664 {
1666 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1667 operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1668 operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1669 }
1670
1671 {
1672 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1673 operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1674 operator.process_left(&create_trade_event(1100, "AAPL", 151.5), &mut ctx);
1675 }
1676
1677 assert_eq!(operator.metrics().right_events, 2);
1678 assert_eq!(operator.metrics().left_events, 2);
1679 assert_eq!(operator.metrics().matches, 2);
1680 assert_eq!(operator.metrics().within_tolerance, 2);
1681 }
1682
1683 #[test]
1684 fn test_key_state_operations() {
1685 let mut key_state = KeyState::new();
1686 assert!(key_state.is_empty());
1687
1688 let empty_batch = Arc::new(RecordBatch::new_empty(Arc::new(Schema::empty())));
1690 let row1 = AsofRow::new(100, &empty_batch);
1691 let row2 = AsofRow::new(200, &empty_batch);
1692
1693 key_state.insert(row1);
1694 key_state.insert(row2);
1695
1696 assert_eq!(key_state.len(), 2);
1697 assert_eq!(key_state.min_timestamp, 100);
1698 assert_eq!(key_state.max_timestamp, 200);
1699
1700 key_state.cleanup_before(150);
1702 assert_eq!(key_state.len(), 1);
1703 assert_eq!(key_state.min_timestamp, 200);
1704 }
1705
1706 #[test]
1707 fn test_asof_row_serialization() {
1708 let schema = Arc::new(Schema::new(vec![
1709 Field::new("symbol", DataType::Utf8, false),
1710 Field::new("value", DataType::Float64, false),
1711 ]));
1712 let batch = Arc::new(
1713 RecordBatch::try_new(
1714 schema,
1715 vec![
1716 Arc::new(StringArray::from(vec!["AAPL"])),
1717 Arc::new(Float64Array::from(vec![150.5])),
1718 ],
1719 )
1720 .unwrap(),
1721 );
1722
1723 let row = AsofRow::new(1000, &batch);
1724
1725 let serializable = SerializableAsofRow::from_row(&row).unwrap();
1727 let restored = serializable.to_row().unwrap();
1728 assert_eq!(restored.batch().num_rows(), 1);
1729 assert_eq!(restored.batch().num_columns(), 2);
1730 assert_eq!(restored.timestamp, 1000);
1731 }
1732
1733 #[test]
1734 fn test_metrics_reset() {
1735 let mut metrics = AsofJoinMetrics::new();
1736 metrics.left_events = 100;
1737 metrics.matches = 50;
1738
1739 metrics.reset();
1740
1741 assert_eq!(metrics.left_events, 0);
1742 assert_eq!(metrics.matches, 0);
1743 }
1744
1745 #[test]
1746 fn test_unlimited_tolerance() {
1747 let config = AsofJoinConfig::builder()
1748 .key_column("symbol".to_string())
1749 .left_time_column("trade_time".to_string())
1750 .right_time_column("quote_time".to_string())
1751 .direction(AsofDirection::Backward)
1752 .join_type(AsofJoinType::Inner)
1754 .build()
1755 .unwrap();
1756
1757 assert_eq!(config.tolerance_ms(), i64::MAX);
1758
1759 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1760
1761 let mut timers = TimerService::new();
1762 let mut state = InMemoryStore::new();
1763 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1764
1765 {
1767 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1768 operator.process_right(&create_quote_event(100, "AAPL", 150.0, 151.0), &mut ctx);
1769 }
1770
1771 let trade = create_trade_event(1_000_000, "AAPL", 150.5);
1773 let outputs = {
1774 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1775 operator.process_left(&trade, &mut ctx)
1776 };
1777
1778 assert_eq!(
1779 outputs
1780 .iter()
1781 .filter(|o| matches!(o, Output::Event(_)))
1782 .count(),
1783 1
1784 );
1785 }
1786}