1#![deny(clippy::disallowed_types)]
49
50use std::time::{Duration, Instant};
51
52use rustc_hash::FxHashMap;
53
54use super::Watermark;
55
56#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
61pub struct PartitionId {
62 pub source_id: usize,
64 pub partition: u32,
66}
67
68impl PartitionId {
69 #[inline]
71 #[must_use]
72 pub const fn new(source_id: usize, partition: u32) -> Self {
73 Self {
74 source_id,
75 partition,
76 }
77 }
78}
79
80impl std::fmt::Display for PartitionId {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 write!(f, "{}:{}", self.source_id, self.partition)
83 }
84}
85
86#[derive(Debug, Clone)]
90pub struct PartitionWatermarkState {
91 pub watermark: i64,
93 pub last_event_time: i64,
95 pub last_activity: Instant,
97 pub is_idle: bool,
99 pub assigned_core: Option<usize>,
101}
102
103impl PartitionWatermarkState {
104 #[must_use]
106 pub fn new() -> Self {
107 Self {
108 watermark: i64::MIN,
109 last_event_time: i64::MIN,
110 last_activity: Instant::now(),
111 is_idle: false,
112 assigned_core: None,
113 }
114 }
115
116 #[must_use]
118 pub fn with_core(core_id: usize) -> Self {
119 Self {
120 watermark: i64::MIN,
121 last_event_time: i64::MIN,
122 last_activity: Instant::now(),
123 is_idle: false,
124 assigned_core: Some(core_id),
125 }
126 }
127}
128
129impl Default for PartitionWatermarkState {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135#[derive(Debug, Clone, Default)]
137pub struct PartitionedWatermarkMetrics {
138 pub total_partitions: usize,
140 pub active_partitions: usize,
142 pub idle_partitions: usize,
144 pub watermark_advances: u64,
146 pub rebalances: u64,
148}
149
150impl PartitionedWatermarkMetrics {
151 #[must_use]
153 pub fn new() -> Self {
154 Self::default()
155 }
156}
157
158#[derive(Debug, Clone, thiserror::Error)]
160pub enum WatermarkError {
161 #[error("Unknown partition: {0}")]
163 UnknownPartition(PartitionId),
164
165 #[error("Source not found: {0}")]
167 SourceNotFound(usize),
168
169 #[error("Invalid partition {partition} for source {source_id} (max: {max_partition})")]
171 InvalidPartition {
172 source_id: usize,
174 partition: u32,
176 max_partition: u32,
178 },
179
180 #[error("Partition already exists: {0}")]
182 PartitionExists(PartitionId),
183}
184
185#[derive(Debug)]
222pub struct PartitionedWatermarkTracker {
223 partitions: FxHashMap<PartitionId, PartitionWatermarkState>,
225
226 source_partition_counts: Vec<usize>,
228
229 combined_watermark: i64,
231
232 idle_timeout: Duration,
234
235 metrics: PartitionedWatermarkMetrics,
237}
238
239impl PartitionedWatermarkTracker {
240 pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
242
243 #[must_use]
245 pub fn new() -> Self {
246 Self {
247 partitions: FxHashMap::default(),
248 source_partition_counts: Vec::new(),
249 combined_watermark: i64::MIN,
250 idle_timeout: Self::DEFAULT_IDLE_TIMEOUT,
251 metrics: PartitionedWatermarkMetrics::new(),
252 }
253 }
254
255 #[must_use]
257 pub fn with_idle_timeout(idle_timeout: Duration) -> Self {
258 Self {
259 partitions: FxHashMap::default(),
260 source_partition_counts: Vec::new(),
261 combined_watermark: i64::MIN,
262 idle_timeout,
263 metrics: PartitionedWatermarkMetrics::new(),
264 }
265 }
266
267 pub fn register_source(&mut self, source_id: usize, num_partitions: usize) {
272 while self.source_partition_counts.len() <= source_id {
274 self.source_partition_counts.push(0);
275 }
276
277 self.source_partition_counts[source_id] = num_partitions;
278
279 #[allow(clippy::cast_possible_truncation)]
281 for partition in 0..num_partitions {
283 let pid = PartitionId::new(source_id, partition as u32);
284 self.partitions.entry(pid).or_default();
285 }
286
287 self.update_metrics();
288 }
289
290 pub fn add_partition(&mut self, partition: PartitionId) -> Result<(), WatermarkError> {
296 if self.partitions.contains_key(&partition) {
297 return Err(WatermarkError::PartitionExists(partition));
298 }
299
300 while self.source_partition_counts.len() <= partition.source_id {
302 self.source_partition_counts.push(0);
303 }
304
305 let current_count = self.source_partition_counts[partition.source_id];
307 if partition.partition as usize >= current_count {
308 self.source_partition_counts[partition.source_id] = partition.partition as usize + 1;
309 }
310
311 self.partitions
312 .insert(partition, PartitionWatermarkState::new());
313 self.metrics.rebalances += 1;
314 self.update_metrics();
315
316 Ok(())
317 }
318
319 pub fn remove_partition(&mut self, partition: PartitionId) -> Option<PartitionWatermarkState> {
323 let state = self.partitions.remove(&partition);
324 if state.is_some() {
325 self.metrics.rebalances += 1;
326 self.update_metrics();
327 self.recalculate_combined();
329 }
330 state
331 }
332
333 #[inline]
341 pub fn update_partition(
342 &mut self,
343 partition: PartitionId,
344 watermark: i64,
345 ) -> Option<Watermark> {
346 if let Some(state) = self.partitions.get_mut(&partition) {
347 if state.is_idle {
349 state.is_idle = false;
350 self.metrics.active_partitions += 1;
351 self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
352 }
353 state.last_activity = Instant::now();
354
355 if watermark > state.watermark {
357 state.watermark = watermark;
358 state.last_event_time = watermark;
359 return self.try_advance_combined();
360 }
361 }
362 None
363 }
364
365 #[inline]
369 pub fn update_partition_from_event(
370 &mut self,
371 partition: PartitionId,
372 event_time: i64,
373 max_lateness: i64,
374 ) -> Option<Watermark> {
375 let watermark = event_time.saturating_sub(max_lateness);
376 self.update_partition(partition, watermark)
377 }
378
379 pub fn mark_partition_idle(&mut self, partition: PartitionId) -> Option<Watermark> {
385 if let Some(state) = self.partitions.get_mut(&partition) {
386 if !state.is_idle {
387 state.is_idle = true;
388 self.metrics.idle_partitions += 1;
389 self.metrics.active_partitions = self.metrics.active_partitions.saturating_sub(1);
390 return self.try_advance_combined();
391 }
392 }
393 None
394 }
395
396 pub fn mark_partition_active(&mut self, partition: PartitionId) {
401 if let Some(state) = self.partitions.get_mut(&partition) {
402 if state.is_idle {
403 state.is_idle = false;
404 state.last_activity = Instant::now();
405 self.metrics.active_partitions += 1;
406 self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
407 }
408 }
409 }
410
411 pub fn check_idle_partitions(&mut self) -> Option<Watermark> {
419 let mut any_marked = false;
420
421 for state in self.partitions.values_mut() {
422 if !state.is_idle && state.last_activity.elapsed() >= self.idle_timeout {
423 state.is_idle = true;
424 any_marked = true;
425 }
426 }
427
428 if any_marked {
429 self.update_metrics();
430 self.try_advance_combined()
431 } else {
432 None
433 }
434 }
435
436 #[inline]
438 #[must_use]
439 pub fn current_watermark(&self) -> Option<Watermark> {
440 if self.combined_watermark == i64::MIN {
441 None
442 } else {
443 Some(Watermark::new(self.combined_watermark))
444 }
445 }
446
447 #[must_use]
449 pub fn partition_watermark(&self, partition: PartitionId) -> Option<i64> {
450 self.partitions.get(&partition).map(|s| s.watermark)
451 }
452
453 #[must_use]
455 pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
456 let mut min_watermark = i64::MAX;
457 let mut found = false;
458
459 for (pid, state) in &self.partitions {
460 if pid.source_id == source_id && !state.is_idle {
461 found = true;
462 min_watermark = min_watermark.min(state.watermark);
463 }
464 }
465
466 if found && min_watermark != i64::MAX {
467 Some(min_watermark)
468 } else {
469 None
470 }
471 }
472
473 #[must_use]
475 pub fn is_partition_idle(&self, partition: PartitionId) -> bool {
476 self.partitions.get(&partition).is_some_and(|s| s.is_idle)
477 }
478
479 #[must_use]
481 pub fn active_partition_count(&self, source_id: usize) -> usize {
482 self.partitions
483 .iter()
484 .filter(|(pid, state)| pid.source_id == source_id && !state.is_idle)
485 .count()
486 }
487
488 #[must_use]
490 pub fn partition_count(&self, source_id: usize) -> usize {
491 self.source_partition_counts
492 .get(source_id)
493 .copied()
494 .unwrap_or(0)
495 }
496
497 #[must_use]
499 pub fn metrics(&self) -> &PartitionedWatermarkMetrics {
500 &self.metrics
501 }
502
503 #[must_use]
505 pub fn num_sources(&self) -> usize {
506 self.source_partition_counts.len()
507 }
508
509 pub fn assign_partition_to_core(&mut self, partition: PartitionId, core_id: usize) {
511 if let Some(state) = self.partitions.get_mut(&partition) {
512 state.assigned_core = Some(core_id);
513 }
514 }
515
516 #[must_use]
518 pub fn partition_core(&self, partition: PartitionId) -> Option<usize> {
519 self.partitions
520 .get(&partition)
521 .and_then(|s| s.assigned_core)
522 }
523
524 #[must_use]
526 pub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId> {
527 self.partitions
528 .iter()
529 .filter_map(|(pid, state)| {
530 if state.assigned_core == Some(core_id) {
531 Some(*pid)
532 } else {
533 None
534 }
535 })
536 .collect()
537 }
538
539 #[must_use]
541 pub fn idle_timeout(&self) -> Duration {
542 self.idle_timeout
543 }
544
545 pub fn set_idle_timeout(&mut self, timeout: Duration) {
547 self.idle_timeout = timeout;
548 }
549
550 #[must_use]
552 pub fn partition_state(&self, partition: PartitionId) -> Option<&PartitionWatermarkState> {
553 self.partitions.get(&partition)
554 }
555
556 fn try_advance_combined(&mut self) -> Option<Watermark> {
558 let new_combined = self.calculate_combined();
559
560 if new_combined > self.combined_watermark && new_combined != i64::MAX {
561 self.combined_watermark = new_combined;
562 self.metrics.watermark_advances += 1;
563 Some(Watermark::new(new_combined))
564 } else {
565 None
566 }
567 }
568
569 fn recalculate_combined(&mut self) {
571 let new_combined = self.calculate_combined();
572 if new_combined != i64::MAX {
573 self.combined_watermark = new_combined;
574 }
575 }
576
577 fn calculate_combined(&self) -> i64 {
579 let mut min_watermark = i64::MAX;
580 let mut has_active = false;
581
582 for state in self.partitions.values() {
583 if !state.is_idle {
584 has_active = true;
585 min_watermark = min_watermark.min(state.watermark);
586 }
587 }
588
589 if !has_active {
591 min_watermark = self
592 .partitions
593 .values()
594 .map(|s| s.watermark)
595 .max()
596 .unwrap_or(i64::MIN);
597 }
598
599 min_watermark
600 }
601
602 fn update_metrics(&mut self) {
604 self.metrics.total_partitions = self.partitions.len();
605 self.metrics.idle_partitions = self.partitions.values().filter(|s| s.is_idle).count();
606 self.metrics.active_partitions =
607 self.metrics.total_partitions - self.metrics.idle_partitions;
608 }
609}
610
611impl Default for PartitionedWatermarkTracker {
612 fn default() -> Self {
613 Self::new()
614 }
615}
616
617#[derive(Debug)]
625pub struct CoreWatermarkState {
626 assigned_partitions: Vec<PartitionId>,
628
629 partition_watermarks: Vec<i64>,
631
632 local_watermark: i64,
634
635 idle_status: Vec<bool>,
637
638 core_id: usize,
640}
641
642impl CoreWatermarkState {
643 #[must_use]
645 pub fn new(core_id: usize) -> Self {
646 Self {
647 assigned_partitions: Vec::new(),
648 partition_watermarks: Vec::new(),
649 local_watermark: i64::MIN,
650 idle_status: Vec::new(),
651 core_id,
652 }
653 }
654
655 #[must_use]
657 pub fn with_partitions(core_id: usize, partitions: Vec<PartitionId>) -> Self {
658 let count = partitions.len();
659 Self {
660 assigned_partitions: partitions,
661 partition_watermarks: vec![i64::MIN; count],
662 local_watermark: i64::MIN,
663 idle_status: vec![false; count],
664 core_id,
665 }
666 }
667
668 pub fn assign_partition(&mut self, partition: PartitionId) {
670 if !self.assigned_partitions.contains(&partition) {
671 self.assigned_partitions.push(partition);
672 self.partition_watermarks.push(i64::MIN);
673 self.idle_status.push(false);
674 }
675 }
676
677 pub fn remove_partition(&mut self, partition: PartitionId) -> bool {
679 if let Some(idx) = self
680 .assigned_partitions
681 .iter()
682 .position(|p| *p == partition)
683 {
684 self.assigned_partitions.swap_remove(idx);
685 self.partition_watermarks.swap_remove(idx);
686 self.idle_status.swap_remove(idx);
687 self.recalculate_local();
688 true
689 } else {
690 false
691 }
692 }
693
694 #[inline]
700 pub fn update_partition(&mut self, partition: PartitionId, watermark: i64) -> Option<i64> {
701 if let Some(idx) = self
702 .assigned_partitions
703 .iter()
704 .position(|p| *p == partition)
705 {
706 if watermark > self.partition_watermarks[idx] {
707 self.partition_watermarks[idx] = watermark;
708 self.idle_status[idx] = false;
709
710 let new_local = self.calculate_local();
712 if new_local > self.local_watermark {
713 self.local_watermark = new_local;
714 return Some(new_local);
715 }
716 }
717 }
718 None
719 }
720
721 pub fn mark_idle(&mut self, partition: PartitionId) -> Option<i64> {
723 if let Some(idx) = self
724 .assigned_partitions
725 .iter()
726 .position(|p| *p == partition)
727 {
728 if !self.idle_status[idx] {
729 self.idle_status[idx] = true;
730
731 let new_local = self.calculate_local();
733 if new_local > self.local_watermark {
734 self.local_watermark = new_local;
735 return Some(new_local);
736 }
737 }
738 }
739 None
740 }
741
742 #[inline]
744 #[must_use]
745 pub fn local_watermark(&self) -> i64 {
746 self.local_watermark
747 }
748
749 #[must_use]
751 pub fn core_id(&self) -> usize {
752 self.core_id
753 }
754
755 #[must_use]
757 pub fn assigned_partitions(&self) -> &[PartitionId] {
758 &self.assigned_partitions
759 }
760
761 #[must_use]
763 pub fn partition_count(&self) -> usize {
764 self.assigned_partitions.len()
765 }
766
767 fn calculate_local(&self) -> i64 {
769 let mut min = i64::MAX;
770 let mut has_active = false;
771
772 for (idx, &wm) in self.partition_watermarks.iter().enumerate() {
773 if !self.idle_status[idx] {
774 has_active = true;
775 min = min.min(wm);
776 }
777 }
778
779 if !has_active {
780 self.partition_watermarks
782 .iter()
783 .copied()
784 .max()
785 .unwrap_or(i64::MIN)
786 } else if min == i64::MAX {
787 i64::MIN
788 } else {
789 min
790 }
791 }
792
793 fn recalculate_local(&mut self) {
795 self.local_watermark = self.calculate_local();
796 }
797}
798
799#[derive(Debug)]
804pub struct GlobalWatermarkCollector {
805 core_watermarks: Vec<i64>,
807
808 global_watermark: i64,
810}
811
812impl GlobalWatermarkCollector {
813 #[must_use]
815 pub fn new(num_cores: usize) -> Self {
816 Self {
817 core_watermarks: vec![i64::MIN; num_cores],
818 global_watermark: i64::MIN,
819 }
820 }
821
822 #[inline]
828 pub fn update_core(&mut self, core_id: usize, watermark: i64) -> Option<Watermark> {
829 if core_id < self.core_watermarks.len() {
830 self.core_watermarks[core_id] = watermark;
831
832 let new_global = self
834 .core_watermarks
835 .iter()
836 .copied()
837 .min()
838 .unwrap_or(i64::MIN);
839
840 if new_global > self.global_watermark && new_global != i64::MIN {
841 self.global_watermark = new_global;
842 return Some(Watermark::new(new_global));
843 }
844 }
845 None
846 }
847
848 #[must_use]
850 pub fn global_watermark(&self) -> Option<Watermark> {
851 if self.global_watermark == i64::MIN {
852 None
853 } else {
854 Some(Watermark::new(self.global_watermark))
855 }
856 }
857
858 #[must_use]
860 pub fn core_watermark(&self, core_id: usize) -> Option<i64> {
861 self.core_watermarks.get(core_id).copied()
862 }
863
864 #[must_use]
866 pub fn num_cores(&self) -> usize {
867 self.core_watermarks.len()
868 }
869}
870
871#[cfg(test)]
872mod tests {
873 use super::*;
874
875 #[test]
876 fn test_partition_id_creation() {
877 let pid = PartitionId::new(1, 3);
878 assert_eq!(pid.source_id, 1);
879 assert_eq!(pid.partition, 3);
880 }
881
882 #[test]
883 fn test_partition_id_equality() {
884 let p1 = PartitionId::new(1, 2);
885 let p2 = PartitionId::new(1, 2);
886 let p3 = PartitionId::new(1, 3);
887
888 assert_eq!(p1, p2);
889 assert_ne!(p1, p3);
890 }
891
892 #[test]
893 fn test_partition_id_display() {
894 let pid = PartitionId::new(2, 5);
895 assert_eq!(format!("{pid}"), "2:5");
896 }
897
898 #[test]
899 fn test_partitioned_tracker_single_partition_updates_watermark() {
900 let mut tracker = PartitionedWatermarkTracker::new();
901 tracker.register_source(0, 1);
902
903 let wm = tracker.update_partition(PartitionId::new(0, 0), 1000);
904 assert_eq!(wm, Some(Watermark::new(1000)));
905 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
906 }
907
908 #[test]
909 fn test_partitioned_tracker_multiple_partitions_uses_minimum() {
910 let mut tracker = PartitionedWatermarkTracker::new();
911 tracker.register_source(0, 4);
912
913 tracker.update_partition(PartitionId::new(0, 0), 5000);
914 tracker.update_partition(PartitionId::new(0, 1), 3000);
915 tracker.update_partition(PartitionId::new(0, 2), 4000);
916 tracker.update_partition(PartitionId::new(0, 3), 4500);
917
918 assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
919 }
920
921 #[test]
922 fn test_partitioned_tracker_idle_partition_excluded_from_min() {
923 let mut tracker = PartitionedWatermarkTracker::new();
924 tracker.register_source(0, 4);
925
926 tracker.update_partition(PartitionId::new(0, 0), 5000);
927 tracker.update_partition(PartitionId::new(0, 1), 1000); tracker.update_partition(PartitionId::new(0, 2), 4000);
929 tracker.update_partition(PartitionId::new(0, 3), 4500);
930
931 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
932
933 let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
935 assert_eq!(wm, Some(Watermark::new(4000)));
936 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
937 }
938
939 #[test]
940 fn test_partitioned_tracker_all_idle_uses_max() {
941 let mut tracker = PartitionedWatermarkTracker::new();
942 tracker.register_source(0, 2);
943
944 tracker.update_partition(PartitionId::new(0, 0), 5000);
945 tracker.update_partition(PartitionId::new(0, 1), 3000);
946
947 tracker.mark_partition_idle(PartitionId::new(0, 0));
948 let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
949
950 assert_eq!(wm, Some(Watermark::new(5000)));
952 }
953
954 #[test]
955 fn test_partitioned_tracker_partition_reactivated_on_update() {
956 let mut tracker = PartitionedWatermarkTracker::new();
957 tracker.register_source(0, 2);
958
959 tracker.update_partition(PartitionId::new(0, 0), 5000);
960 tracker.update_partition(PartitionId::new(0, 1), 3000);
961
962 tracker.mark_partition_idle(PartitionId::new(0, 1));
964 assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
965 assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000)));
966
967 tracker.update_partition(PartitionId::new(0, 1), 4000);
969 assert!(!tracker.is_partition_idle(PartitionId::new(0, 1)));
970
971 assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000))); }
974
975 #[test]
976 fn test_partitioned_tracker_add_partition_during_operation() {
977 let mut tracker = PartitionedWatermarkTracker::new();
978 tracker.register_source(0, 2);
979
980 tracker.update_partition(PartitionId::new(0, 0), 5000);
981 tracker.update_partition(PartitionId::new(0, 1), 4000);
982
983 tracker.add_partition(PartitionId::new(0, 2)).unwrap();
985
986 assert_eq!(tracker.partition_count(0), 3);
988
989 tracker.update_partition(PartitionId::new(0, 2), 3000);
991 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000))); }
994
995 #[test]
996 fn test_partitioned_tracker_remove_partition_recalculates_watermark() {
997 let mut tracker = PartitionedWatermarkTracker::new();
998 tracker.register_source(0, 3);
999
1000 tracker.update_partition(PartitionId::new(0, 0), 5000);
1001 tracker.update_partition(PartitionId::new(0, 1), 2000); tracker.update_partition(PartitionId::new(0, 2), 4000);
1003
1004 assert_eq!(tracker.current_watermark(), Some(Watermark::new(2000)));
1005
1006 let state = tracker.remove_partition(PartitionId::new(0, 1));
1008 assert!(state.is_some());
1009 assert_eq!(state.unwrap().watermark, 2000);
1010
1011 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
1014 }
1015
1016 #[test]
1017 fn test_partitioned_tracker_check_idle_marks_stale_partitions() {
1018 let mut tracker = PartitionedWatermarkTracker::with_idle_timeout(Duration::from_millis(10));
1019 tracker.register_source(0, 2);
1020
1021 tracker.update_partition(PartitionId::new(0, 0), 5000);
1022 tracker.update_partition(PartitionId::new(0, 1), 3000);
1023
1024 std::thread::sleep(Duration::from_millis(20));
1026
1027 tracker.update_partition(PartitionId::new(0, 0), 6000);
1029
1030 let wm = tracker.check_idle_partitions();
1032
1033 assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
1034 assert!(wm.is_some() || tracker.current_watermark() == Some(Watermark::new(6000)));
1036 }
1037
1038 #[test]
1039 fn test_partitioned_tracker_source_watermark_aggregates_partitions() {
1040 let mut tracker = PartitionedWatermarkTracker::new();
1041 tracker.register_source(0, 2);
1042 tracker.register_source(1, 2);
1043
1044 tracker.update_partition(PartitionId::new(0, 0), 5000);
1045 tracker.update_partition(PartitionId::new(0, 1), 3000);
1046 tracker.update_partition(PartitionId::new(1, 0), 4000);
1047 tracker.update_partition(PartitionId::new(1, 1), 6000);
1048
1049 assert_eq!(tracker.source_watermark(0), Some(3000));
1050 assert_eq!(tracker.source_watermark(1), Some(4000));
1051 }
1052
1053 #[test]
1054 fn test_partitioned_tracker_metrics_accurate() {
1055 let mut tracker = PartitionedWatermarkTracker::new();
1056 tracker.register_source(0, 4);
1057
1058 tracker.update_partition(PartitionId::new(0, 0), 1000);
1059 tracker.update_partition(PartitionId::new(0, 1), 2000);
1060
1061 let metrics = tracker.metrics();
1062 assert_eq!(metrics.total_partitions, 4);
1063 assert_eq!(metrics.active_partitions, 4);
1064 assert_eq!(metrics.idle_partitions, 0);
1065
1066 tracker.mark_partition_idle(PartitionId::new(0, 2));
1067
1068 let metrics = tracker.metrics();
1069 assert_eq!(metrics.idle_partitions, 1);
1070 assert_eq!(metrics.active_partitions, 3);
1071 }
1072
1073 #[test]
1074 fn test_partitioned_tracker_core_assignment() {
1075 let mut tracker = PartitionedWatermarkTracker::new();
1076 tracker.register_source(0, 4);
1077
1078 tracker.assign_partition_to_core(PartitionId::new(0, 0), 0);
1079 tracker.assign_partition_to_core(PartitionId::new(0, 1), 0);
1080 tracker.assign_partition_to_core(PartitionId::new(0, 2), 1);
1081 tracker.assign_partition_to_core(PartitionId::new(0, 3), 1);
1082
1083 assert_eq!(tracker.partition_core(PartitionId::new(0, 0)), Some(0));
1084 assert_eq!(tracker.partition_core(PartitionId::new(0, 2)), Some(1));
1085
1086 let core0_partitions = tracker.partitions_for_core(0);
1087 assert_eq!(core0_partitions.len(), 2);
1088 }
1089
1090 #[test]
1091 fn test_partitioned_tracker_multiple_sources() {
1092 let mut tracker = PartitionedWatermarkTracker::new();
1093 tracker.register_source(0, 2);
1094 tracker.register_source(1, 3);
1095
1096 assert_eq!(tracker.num_sources(), 2);
1097 assert_eq!(tracker.partition_count(0), 2);
1098 assert_eq!(tracker.partition_count(1), 3);
1099 }
1100
1101 #[test]
1102 fn test_partitioned_tracker_update_from_event() {
1103 let mut tracker = PartitionedWatermarkTracker::new();
1104 tracker.register_source(0, 1);
1105
1106 let wm = tracker.update_partition_from_event(PartitionId::new(0, 0), 5000, 1000);
1108 assert_eq!(wm, Some(Watermark::new(4000)));
1109 }
1110
1111 #[test]
1112 fn test_partitioned_tracker_add_partition_error() {
1113 let mut tracker = PartitionedWatermarkTracker::new();
1114 tracker.register_source(0, 2);
1115
1116 let result = tracker.add_partition(PartitionId::new(0, 0));
1118 assert!(matches!(result, Err(WatermarkError::PartitionExists(_))));
1119 }
1120
1121 #[test]
1122 fn test_core_watermark_state_creation() {
1123 let state = CoreWatermarkState::new(0);
1124 assert_eq!(state.core_id(), 0);
1125 assert_eq!(state.partition_count(), 0);
1126 assert_eq!(state.local_watermark(), i64::MIN);
1127 }
1128
1129 #[test]
1130 fn test_core_watermark_state_with_partitions() {
1131 let partitions = vec![PartitionId::new(0, 0), PartitionId::new(0, 1)];
1132 let state = CoreWatermarkState::with_partitions(1, partitions);
1133
1134 assert_eq!(state.core_id(), 1);
1135 assert_eq!(state.partition_count(), 2);
1136 }
1137
1138 #[test]
1139 fn test_core_watermark_state_update() {
1140 let mut state = CoreWatermarkState::with_partitions(
1141 0,
1142 vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1143 );
1144
1145 let wm = state.update_partition(PartitionId::new(0, 0), 5000);
1147 assert!(wm.is_none()); assert_eq!(state.local_watermark(), i64::MIN);
1149
1150 let wm = state.update_partition(PartitionId::new(0, 1), 3000);
1152 assert_eq!(wm, Some(3000)); assert_eq!(state.local_watermark(), 3000);
1154
1155 let wm = state.update_partition(PartitionId::new(0, 0), 6000);
1157 assert!(wm.is_none());
1158 assert_eq!(state.local_watermark(), 3000);
1159
1160 let wm = state.update_partition(PartitionId::new(0, 1), 4000);
1162 assert_eq!(wm, Some(4000));
1163 assert_eq!(state.local_watermark(), 4000);
1164 }
1165
1166 #[test]
1167 fn test_core_watermark_state_idle() {
1168 let mut state = CoreWatermarkState::with_partitions(
1169 0,
1170 vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1171 );
1172
1173 state.update_partition(PartitionId::new(0, 0), 5000);
1174 state.update_partition(PartitionId::new(0, 1), 2000);
1175
1176 assert_eq!(state.local_watermark(), 2000);
1177
1178 let wm = state.mark_idle(PartitionId::new(0, 1));
1180 assert_eq!(wm, Some(5000));
1181 assert_eq!(state.local_watermark(), 5000);
1182 }
1183
1184 #[test]
1185 fn test_core_watermark_state_assign_remove() {
1186 let mut state = CoreWatermarkState::new(0);
1187
1188 state.assign_partition(PartitionId::new(0, 0));
1189 state.assign_partition(PartitionId::new(0, 1));
1190 assert_eq!(state.partition_count(), 2);
1191
1192 state.update_partition(PartitionId::new(0, 0), 5000);
1193 state.update_partition(PartitionId::new(0, 1), 3000);
1194 assert_eq!(state.local_watermark(), 3000);
1195
1196 let removed = state.remove_partition(PartitionId::new(0, 1));
1198 assert!(removed);
1199 assert_eq!(state.partition_count(), 1);
1200 assert_eq!(state.local_watermark(), 5000);
1201 }
1202
1203 #[test]
1204 fn test_global_collector_creation() {
1205 let collector = GlobalWatermarkCollector::new(4);
1206 assert_eq!(collector.num_cores(), 4);
1207 assert_eq!(collector.global_watermark(), None);
1208 }
1209
1210 #[test]
1211 fn test_global_collector_update() {
1212 let mut collector = GlobalWatermarkCollector::new(3);
1213
1214 collector.update_core(0, 5000);
1215 collector.update_core(1, 3000);
1216 let wm = collector.update_core(2, 4000);
1217
1218 assert_eq!(wm, Some(Watermark::new(3000)));
1220 assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1221 }
1222
1223 #[test]
1224 fn test_global_collector_advancement() {
1225 let mut collector = GlobalWatermarkCollector::new(2);
1226
1227 collector.update_core(0, 5000);
1228 collector.update_core(1, 3000);
1229
1230 assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1231
1232 let wm = collector.update_core(1, 4000);
1234 assert_eq!(wm, Some(Watermark::new(4000)));
1235 }
1236
1237 #[test]
1238 fn test_global_collector_no_regression() {
1239 let mut collector = GlobalWatermarkCollector::new(2);
1240
1241 collector.update_core(0, 5000);
1242 collector.update_core(1, 4000);
1243
1244 let wm = collector.update_core(1, 3000);
1246 assert!(wm.is_none());
1247 assert_eq!(collector.core_watermark(1), Some(3000));
1249 }
1250}