1use std::time::{Duration, Instant};
54
55use rustc_hash::FxHashMap;
56
57#[derive(Debug, Clone, Hash, Eq, PartialEq)]
59pub struct AlignmentGroupId(pub String);
60
61impl AlignmentGroupId {
62 #[must_use]
64 pub fn new(id: impl Into<String>) -> Self {
65 Self(id.into())
66 }
67
68 #[must_use]
70 pub fn as_str(&self) -> &str {
71 &self.0
72 }
73}
74
75impl std::fmt::Display for AlignmentGroupId {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "{}", self.0)
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct AlignmentGroupConfig {
84 pub group_id: AlignmentGroupId,
86 pub max_drift: Duration,
88 pub update_interval: Duration,
90 pub enforcement_mode: EnforcementMode,
92}
93
94impl AlignmentGroupConfig {
95 #[must_use]
97 pub fn new(group_id: impl Into<String>) -> Self {
98 Self {
99 group_id: AlignmentGroupId::new(group_id),
100 max_drift: Duration::from_secs(300), update_interval: Duration::from_secs(1),
102 enforcement_mode: EnforcementMode::Pause,
103 }
104 }
105
106 #[must_use]
108 pub fn with_max_drift(mut self, max_drift: Duration) -> Self {
109 self.max_drift = max_drift;
110 self
111 }
112
113 #[must_use]
115 pub fn with_update_interval(mut self, interval: Duration) -> Self {
116 self.update_interval = interval;
117 self
118 }
119
120 #[must_use]
122 pub fn with_enforcement_mode(mut self, mode: EnforcementMode) -> Self {
123 self.enforcement_mode = mode;
124 self
125 }
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum EnforcementMode {
131 #[default]
133 Pause,
134 WarnOnly,
136 DropExcess,
138}
139
140#[derive(Debug, Clone)]
142pub struct AlignmentSourceState {
143 pub source_id: usize,
145 pub watermark: i64,
147 pub is_paused: bool,
149 pub pause_start: Option<Instant>,
151 pub total_pause_time: Duration,
153 pub events_dropped_while_paused: u64,
155 pub last_activity: Instant,
157}
158
159impl AlignmentSourceState {
160 fn new(source_id: usize) -> Self {
162 Self {
163 source_id,
164 watermark: i64::MIN,
165 is_paused: false,
166 pause_start: None,
167 total_pause_time: Duration::ZERO,
168 events_dropped_while_paused: 0,
169 last_activity: Instant::now(),
170 }
171 }
172
173 fn pause(&mut self) {
175 if !self.is_paused {
176 self.is_paused = true;
177 self.pause_start = Some(Instant::now());
178 }
179 }
180
181 fn resume(&mut self) {
183 if self.is_paused {
184 self.is_paused = false;
185 if let Some(start) = self.pause_start.take() {
186 self.total_pause_time += start.elapsed();
187 }
188 }
189 }
190}
191
192#[derive(Debug, Clone, Default)]
194pub struct AlignmentGroupMetrics {
195 pub pause_events: u64,
197 pub resume_events: u64,
199 pub total_pause_time: Duration,
201 pub max_observed_drift: Duration,
203 pub current_drift: Duration,
205 pub events_dropped: u64,
207 pub warnings_emitted: u64,
209}
210
211impl AlignmentGroupMetrics {
212 #[must_use]
214 pub fn new() -> Self {
215 Self::default()
216 }
217
218 pub fn reset(&mut self) {
220 *self = Self::default();
221 }
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum AlignmentAction {
227 Continue,
229 Pause,
231 Resume,
233 Drop,
235 Warn {
237 drift_ms: i64,
239 },
240}
241
242#[derive(Debug)]
286pub struct WatermarkAlignmentGroup {
287 config: AlignmentGroupConfig,
289 sources: FxHashMap<usize, AlignmentSourceState>,
291 min_watermark: i64,
293 max_watermark: i64,
295 last_check: Instant,
297 metrics: AlignmentGroupMetrics,
299}
300
301impl WatermarkAlignmentGroup {
302 #[must_use]
304 pub fn new(config: AlignmentGroupConfig) -> Self {
305 Self {
306 config,
307 sources: FxHashMap::default(),
308 min_watermark: i64::MIN,
309 max_watermark: i64::MIN,
310 last_check: Instant::now(),
311 metrics: AlignmentGroupMetrics::new(),
312 }
313 }
314
315 #[must_use]
317 pub fn group_id(&self) -> &AlignmentGroupId {
318 &self.config.group_id
319 }
320
321 #[must_use]
323 pub fn config(&self) -> &AlignmentGroupConfig {
324 &self.config
325 }
326
327 pub fn register_source(&mut self, source_id: usize) {
329 self.sources
330 .entry(source_id)
331 .or_insert_with(|| AlignmentSourceState::new(source_id));
332 }
333
334 pub fn unregister_source(&mut self, source_id: usize) {
336 self.sources.remove(&source_id);
337 self.recalculate_bounds();
338 }
339
340 pub fn report_watermark(&mut self, source_id: usize, watermark: i64) -> AlignmentAction {
344 self.sources
346 .entry(source_id)
347 .or_insert_with(|| AlignmentSourceState::new(source_id));
348
349 let is_paused = self.sources.get(&source_id).is_some_and(|s| s.is_paused);
351
352 if is_paused {
353 match self.config.enforcement_mode {
354 EnforcementMode::Pause => {
355 let can_resume = self.can_resume_with_watermark(source_id, watermark);
357 if can_resume {
358 if let Some(source) = self.sources.get_mut(&source_id) {
359 source.watermark = watermark;
360 source.last_activity = Instant::now();
361 source.resume();
362 }
363 self.metrics.resume_events += 1;
364 self.recalculate_bounds();
365 return AlignmentAction::Resume;
366 }
367 return AlignmentAction::Pause;
368 }
369 EnforcementMode::DropExcess => {
370 if let Some(source) = self.sources.get_mut(&source_id) {
371 source.events_dropped_while_paused += 1;
372 }
373 self.metrics.events_dropped += 1;
374 return AlignmentAction::Drop;
375 }
376 EnforcementMode::WarnOnly => {
377 }
379 }
380 }
381
382 if let Some(source) = self.sources.get_mut(&source_id) {
384 source.watermark = watermark;
385 source.last_activity = Instant::now();
386 }
387
388 self.recalculate_bounds();
390
391 let current_drift = if self.min_watermark == i64::MIN || self.max_watermark == i64::MIN {
393 Duration::ZERO
394 } else {
395 let drift_ms = self.max_watermark.saturating_sub(self.min_watermark).max(0);
396 #[allow(clippy::cast_sign_loss)]
398 Duration::from_millis(drift_ms as u64)
399 };
400
401 self.metrics.current_drift = current_drift;
402 if current_drift > self.metrics.max_observed_drift {
403 self.metrics.max_observed_drift = current_drift;
404 }
405
406 if watermark == self.max_watermark && current_drift > self.config.max_drift {
408 match self.config.enforcement_mode {
409 EnforcementMode::Pause => {
410 if let Some(source) = self.sources.get_mut(&source_id) {
411 source.pause();
412 }
413 self.metrics.pause_events += 1;
414 AlignmentAction::Pause
415 }
416 EnforcementMode::WarnOnly => {
417 self.metrics.warnings_emitted += 1;
418 #[allow(clippy::cast_possible_truncation)]
420 let drift_ms = current_drift.as_millis().min(i64::MAX as u128) as i64;
421 AlignmentAction::Warn { drift_ms }
422 }
423 EnforcementMode::DropExcess => {
424 if let Some(source) = self.sources.get_mut(&source_id) {
425 source.is_paused = true; source.events_dropped_while_paused += 1;
427 }
428 self.metrics.events_dropped += 1;
429 AlignmentAction::Drop
430 }
431 }
432 } else {
433 AlignmentAction::Continue
434 }
435 }
436
437 #[must_use]
439 pub fn should_resume(&self, source_id: usize) -> bool {
440 let Some(source) = self.sources.get(&source_id) else {
441 return false;
442 };
443
444 if !source.is_paused {
445 return false;
446 }
447
448 self.can_resume_with_watermark(source_id, source.watermark)
449 }
450
451 fn can_resume_with_watermark(&self, source_id: usize, watermark: i64) -> bool {
453 let min_without_source = self
455 .sources
456 .iter()
457 .filter(|(&id, s)| id != source_id && !s.is_paused && s.watermark != i64::MIN)
458 .map(|(_, s)| s.watermark)
459 .min()
460 .unwrap_or(i64::MIN);
461
462 if min_without_source == i64::MIN {
463 return true; }
465
466 let drift_if_resumed = watermark.saturating_sub(min_without_source).max(0);
467 #[allow(clippy::cast_sign_loss)]
469 let drift_duration = Duration::from_millis(drift_if_resumed as u64);
470
471 drift_duration <= self.config.max_drift
472 }
473
474 #[must_use]
476 pub fn current_drift(&self) -> Duration {
477 self.metrics.current_drift
478 }
479
480 #[must_use]
482 pub fn is_paused(&self, source_id: usize) -> bool {
483 self.sources.get(&source_id).is_some_and(|s| s.is_paused)
484 }
485
486 #[must_use]
488 pub fn min_watermark(&self) -> i64 {
489 self.min_watermark
490 }
491
492 #[must_use]
494 pub fn max_watermark(&self) -> i64 {
495 self.max_watermark
496 }
497
498 #[must_use]
500 pub fn metrics(&self) -> &AlignmentGroupMetrics {
501 &self.metrics
502 }
503
504 #[must_use]
506 pub fn source_count(&self) -> usize {
507 self.sources.len()
508 }
509
510 #[must_use]
512 pub fn paused_source_count(&self) -> usize {
513 self.sources.values().filter(|s| s.is_paused).count()
514 }
515
516 #[must_use]
518 pub fn active_source_count(&self) -> usize {
519 self.sources.values().filter(|s| !s.is_paused).count()
520 }
521
522 pub fn check_alignment(&mut self) -> Vec<(usize, AlignmentAction)> {
527 if self.last_check.elapsed() < self.config.update_interval {
528 return Vec::new();
529 }
530
531 self.last_check = Instant::now();
532 let mut actions = Vec::new();
533
534 let paused_sources: Vec<usize> = self
536 .sources
537 .iter()
538 .filter(|(_, s)| s.is_paused)
539 .map(|(&id, _)| id)
540 .collect();
541
542 for source_id in paused_sources {
543 if self.should_resume(source_id) {
544 if let Some(source) = self.sources.get_mut(&source_id) {
545 source.resume();
546 self.metrics.resume_events += 1;
547 actions.push((source_id, AlignmentAction::Resume));
548 }
549 }
550 }
551
552 if !actions.is_empty() {
554 self.recalculate_bounds();
555 }
556
557 actions
558 }
559
560 #[must_use]
562 pub fn source_state(&self, source_id: usize) -> Option<&AlignmentSourceState> {
563 self.sources.get(&source_id)
564 }
565
566 fn recalculate_bounds(&mut self) {
568 let active_watermarks: Vec<i64> = self
569 .sources
570 .values()
571 .filter(|s| !s.is_paused && s.watermark != i64::MIN)
572 .map(|s| s.watermark)
573 .collect();
574
575 if active_watermarks.is_empty() {
576 self.min_watermark = i64::MIN;
577 self.max_watermark = i64::MIN;
578 } else {
579 self.min_watermark = *active_watermarks.iter().min().unwrap();
580 self.max_watermark = *active_watermarks.iter().max().unwrap();
581 }
582 }
583}
584
585#[derive(Debug, thiserror::Error)]
587pub enum AlignmentError {
588 #[error("source {0} not in any group")]
590 SourceNotInGroup(usize),
591
592 #[error("group '{0}' not found")]
594 GroupNotFound(String),
595
596 #[error("source {source_id} already in group '{group_id}'")]
598 SourceAlreadyInGroup {
599 source_id: usize,
601 group_id: String,
603 },
604}
605
606#[derive(Debug, Default)]
636pub struct AlignmentGroupCoordinator {
637 groups: FxHashMap<AlignmentGroupId, WatermarkAlignmentGroup>,
639 source_groups: FxHashMap<usize, AlignmentGroupId>,
641}
642
643impl AlignmentGroupCoordinator {
644 #[must_use]
646 pub fn new() -> Self {
647 Self::default()
648 }
649
650 pub fn add_group(&mut self, config: AlignmentGroupConfig) {
652 let group_id = config.group_id.clone();
653 self.groups
654 .insert(group_id, WatermarkAlignmentGroup::new(config));
655 }
656
657 pub fn remove_group(&mut self, group_id: &AlignmentGroupId) -> Option<WatermarkAlignmentGroup> {
659 self.source_groups.retain(|_, gid| gid != group_id);
661 self.groups.remove(group_id)
662 }
663
664 pub fn assign_source_to_group(
671 &mut self,
672 source_id: usize,
673 group_id: &AlignmentGroupId,
674 ) -> Result<(), AlignmentError> {
675 if let Some(existing_group) = self.source_groups.get(&source_id) {
677 if existing_group != group_id {
678 return Err(AlignmentError::SourceAlreadyInGroup {
679 source_id,
680 group_id: existing_group.0.clone(),
681 });
682 }
683 return Ok(());
685 }
686
687 let group = self
689 .groups
690 .get_mut(group_id)
691 .ok_or_else(|| AlignmentError::GroupNotFound(group_id.0.clone()))?;
692
693 group.register_source(source_id);
695 self.source_groups.insert(source_id, group_id.clone());
696
697 Ok(())
698 }
699
700 pub fn unassign_source(&mut self, source_id: usize) {
702 if let Some(group_id) = self.source_groups.remove(&source_id) {
703 if let Some(group) = self.groups.get_mut(&group_id) {
704 group.unregister_source(source_id);
705 }
706 }
707 }
708
709 pub fn report_watermark(
713 &mut self,
714 source_id: usize,
715 watermark: i64,
716 ) -> Option<AlignmentAction> {
717 let group_id = self.source_groups.get(&source_id)?;
718 let group = self.groups.get_mut(group_id)?;
719 Some(group.report_watermark(source_id, watermark))
720 }
721
722 pub fn check_all_alignments(&mut self) -> Vec<(usize, AlignmentAction)> {
726 let mut all_actions = Vec::new();
727 for group in self.groups.values_mut() {
728 all_actions.extend(group.check_alignment());
729 }
730 all_actions
731 }
732
733 #[must_use]
735 pub fn all_metrics(&self) -> FxHashMap<AlignmentGroupId, AlignmentGroupMetrics> {
736 self.groups
737 .iter()
738 .map(|(id, group)| (id.clone(), group.metrics().clone()))
739 .collect()
740 }
741
742 #[must_use]
744 pub fn group(&self, group_id: &AlignmentGroupId) -> Option<&WatermarkAlignmentGroup> {
745 self.groups.get(group_id)
746 }
747
748 pub fn group_mut(
750 &mut self,
751 group_id: &AlignmentGroupId,
752 ) -> Option<&mut WatermarkAlignmentGroup> {
753 self.groups.get_mut(group_id)
754 }
755
756 #[must_use]
758 pub fn source_group(&self, source_id: usize) -> Option<&AlignmentGroupId> {
759 self.source_groups.get(&source_id)
760 }
761
762 #[must_use]
764 pub fn group_count(&self) -> usize {
765 self.groups.len()
766 }
767
768 #[must_use]
770 pub fn total_source_count(&self) -> usize {
771 self.source_groups.len()
772 }
773
774 #[must_use]
776 pub fn should_resume(&self, source_id: usize) -> bool {
777 let Some(group_id) = self.source_groups.get(&source_id) else {
778 return false;
779 };
780 let Some(group) = self.groups.get(group_id) else {
781 return false;
782 };
783 group.should_resume(source_id)
784 }
785
786 #[must_use]
788 pub fn is_paused(&self, source_id: usize) -> bool {
789 let Some(group_id) = self.source_groups.get(&source_id) else {
790 return false;
791 };
792 let Some(group) = self.groups.get(group_id) else {
793 return false;
794 };
795 group.is_paused(source_id)
796 }
797}
798
799#[cfg(test)]
800mod tests {
801 use super::*;
802
803 #[test]
804 fn test_alignment_group_id() {
805 let id = AlignmentGroupId::new("test-group");
806 assert_eq!(id.as_str(), "test-group");
807 assert_eq!(format!("{id}"), "test-group");
808 }
809
810 #[test]
811 fn test_alignment_group_config_builder() {
812 let config = AlignmentGroupConfig::new("test")
813 .with_max_drift(Duration::from_secs(120))
814 .with_update_interval(Duration::from_millis(500))
815 .with_enforcement_mode(EnforcementMode::WarnOnly);
816
817 assert_eq!(config.group_id.as_str(), "test");
818 assert_eq!(config.max_drift, Duration::from_secs(120));
819 assert_eq!(config.update_interval, Duration::from_millis(500));
820 assert_eq!(config.enforcement_mode, EnforcementMode::WarnOnly);
821 }
822
823 #[test]
824 fn test_alignment_group_single_source_no_pause() {
825 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
826 let mut group = WatermarkAlignmentGroup::new(config);
827
828 group.register_source(0);
829
830 let action = group.report_watermark(0, 100_000);
832 assert_eq!(action, AlignmentAction::Continue);
833 assert!(!group.is_paused(0));
834 }
835
836 #[test]
837 fn test_alignment_group_two_sources_fast_paused() {
838 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60)); let mut group = WatermarkAlignmentGroup::new(config);
840
841 group.register_source(0);
842 group.register_source(1);
843
844 group.report_watermark(0, 0);
846 group.report_watermark(1, 0);
847
848 let action = group.report_watermark(0, 50_000); assert_eq!(action, AlignmentAction::Continue);
851 assert!(!group.is_paused(0));
852
853 let action = group.report_watermark(0, 70_000); assert_eq!(action, AlignmentAction::Pause);
856 assert!(group.is_paused(0));
857 }
858
859 #[test]
860 fn test_alignment_group_resume_when_slow_catches_up() {
861 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
862 let mut group = WatermarkAlignmentGroup::new(config);
863
864 group.register_source(0);
865 group.register_source(1);
866
867 group.report_watermark(0, 0);
869 group.report_watermark(1, 0);
870
871 group.report_watermark(0, 100_000); assert!(group.is_paused(0));
874
875 group.report_watermark(1, 50_000); assert!(group.should_resume(0));
879 }
880
881 #[test]
882 fn test_alignment_group_warn_only_mode() {
883 let config = AlignmentGroupConfig::new("test")
884 .with_max_drift(Duration::from_secs(60))
885 .with_enforcement_mode(EnforcementMode::WarnOnly);
886 let mut group = WatermarkAlignmentGroup::new(config);
887
888 group.register_source(0);
889 group.register_source(1);
890
891 group.report_watermark(0, 0);
892 group.report_watermark(1, 0);
893
894 let action = group.report_watermark(0, 100_000);
896 match action {
897 AlignmentAction::Warn { drift_ms } => {
898 assert_eq!(drift_ms, 100_000); }
900 _ => panic!("Expected Warn action"),
901 }
902 assert!(!group.is_paused(0)); assert_eq!(group.metrics().warnings_emitted, 1);
904 }
905
906 #[test]
907 fn test_alignment_group_drop_excess_mode() {
908 let config = AlignmentGroupConfig::new("test")
909 .with_max_drift(Duration::from_secs(60))
910 .with_enforcement_mode(EnforcementMode::DropExcess);
911 let mut group = WatermarkAlignmentGroup::new(config);
912
913 group.register_source(0);
914 group.register_source(1);
915
916 group.report_watermark(0, 0);
917 group.report_watermark(1, 0);
918
919 let action = group.report_watermark(0, 100_000);
921 assert_eq!(action, AlignmentAction::Drop);
922 assert_eq!(group.metrics().events_dropped, 1);
923
924 let action = group.report_watermark(0, 110_000);
926 assert_eq!(action, AlignmentAction::Drop);
927 assert_eq!(group.metrics().events_dropped, 2);
928 }
929
930 #[test]
931 fn test_alignment_group_drift_calculation() {
932 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(300)); let mut group = WatermarkAlignmentGroup::new(config);
934
935 group.register_source(0);
936 group.register_source(1);
937 group.register_source(2);
938
939 group.report_watermark(0, 100_000); group.report_watermark(1, 200_000); group.report_watermark(2, 150_000); assert_eq!(group.current_drift(), Duration::from_secs(100));
945 assert_eq!(group.min_watermark(), 100_000);
946 assert_eq!(group.max_watermark(), 200_000);
947 }
948
949 #[test]
950 fn test_alignment_group_metrics_accurate() {
951 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
952 let mut group = WatermarkAlignmentGroup::new(config);
953
954 group.register_source(0);
955 group.register_source(1);
956
957 group.report_watermark(0, 0);
958 group.report_watermark(1, 0);
959
960 group.report_watermark(0, 100_000);
962 assert_eq!(group.metrics().pause_events, 1);
963
964 group.report_watermark(1, 50_000);
966
967 let _actions = group.check_alignment();
969 assert!(group.should_resume(0));
972 }
973
974 #[test]
975 fn test_alignment_group_unregister_source() {
976 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
977 let mut group = WatermarkAlignmentGroup::new(config);
978
979 group.register_source(0);
980 group.register_source(1);
981
982 group.report_watermark(0, 100_000);
983 group.report_watermark(1, 50_000);
984
985 assert_eq!(group.source_count(), 2);
986
987 group.unregister_source(1);
988 assert_eq!(group.source_count(), 1);
989
990 assert_eq!(group.min_watermark(), 100_000);
993 assert_eq!(group.max_watermark(), 100_000);
994 }
995
996 #[test]
997 fn test_alignment_group_source_state() {
998 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
999 let mut group = WatermarkAlignmentGroup::new(config);
1000
1001 group.register_source(0);
1002 group.report_watermark(0, 50_000);
1003
1004 let state = group.source_state(0).expect("source exists");
1005 assert_eq!(state.source_id, 0);
1006 assert_eq!(state.watermark, 50_000);
1007 assert!(!state.is_paused);
1008 }
1009
1010 #[test]
1011 fn test_coordinator_multiple_groups() {
1012 let mut coordinator = AlignmentGroupCoordinator::new();
1013
1014 let config1 = AlignmentGroupConfig::new("group1").with_max_drift(Duration::from_secs(60));
1015 let config2 = AlignmentGroupConfig::new("group2").with_max_drift(Duration::from_secs(120));
1016
1017 coordinator.add_group(config1);
1018 coordinator.add_group(config2);
1019
1020 assert_eq!(coordinator.group_count(), 2);
1021 }
1022
1023 #[test]
1024 fn test_coordinator_source_assignment() {
1025 let mut coordinator = AlignmentGroupCoordinator::new();
1026
1027 let config =
1028 AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1029 coordinator.add_group(config);
1030
1031 let group_id = AlignmentGroupId::new("test-group");
1032
1033 coordinator
1035 .assign_source_to_group(0, &group_id)
1036 .expect("should succeed");
1037 coordinator
1038 .assign_source_to_group(1, &group_id)
1039 .expect("should succeed");
1040
1041 assert_eq!(coordinator.total_source_count(), 2);
1042 assert_eq!(coordinator.source_group(0), Some(&group_id));
1043 }
1044
1045 #[test]
1046 fn test_coordinator_source_already_in_group() {
1047 let mut coordinator = AlignmentGroupCoordinator::new();
1048
1049 let config1 = AlignmentGroupConfig::new("group1");
1050 let config2 = AlignmentGroupConfig::new("group2");
1051 coordinator.add_group(config1);
1052 coordinator.add_group(config2);
1053
1054 let group1 = AlignmentGroupId::new("group1");
1055 let group2 = AlignmentGroupId::new("group2");
1056
1057 coordinator
1058 .assign_source_to_group(0, &group1)
1059 .expect("should succeed");
1060
1061 let result = coordinator.assign_source_to_group(0, &group2);
1063 assert!(matches!(
1064 result,
1065 Err(AlignmentError::SourceAlreadyInGroup { .. })
1066 ));
1067
1068 let result = coordinator.assign_source_to_group(0, &group1);
1070 assert!(result.is_ok());
1071 }
1072
1073 #[test]
1074 fn test_coordinator_group_not_found() {
1075 let mut coordinator = AlignmentGroupCoordinator::new();
1076
1077 let result = coordinator.assign_source_to_group(0, &AlignmentGroupId::new("nonexistent"));
1078 assert!(matches!(result, Err(AlignmentError::GroupNotFound(_))));
1079 }
1080
1081 #[test]
1082 fn test_coordinator_report_watermark() {
1083 let mut coordinator = AlignmentGroupCoordinator::new();
1084
1085 let config =
1086 AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1087 coordinator.add_group(config);
1088
1089 let group_id = AlignmentGroupId::new("test-group");
1090 coordinator.assign_source_to_group(0, &group_id).unwrap();
1091 coordinator.assign_source_to_group(1, &group_id).unwrap();
1092
1093 let action = coordinator.report_watermark(0, 0);
1095 assert_eq!(action, Some(AlignmentAction::Continue));
1096
1097 let action = coordinator.report_watermark(1, 0);
1098 assert_eq!(action, Some(AlignmentAction::Continue));
1099
1100 let action = coordinator.report_watermark(99, 0);
1102 assert_eq!(action, None);
1103 }
1104
1105 #[test]
1106 fn test_coordinator_unassign_source() {
1107 let mut coordinator = AlignmentGroupCoordinator::new();
1108
1109 let config = AlignmentGroupConfig::new("test-group");
1110 coordinator.add_group(config);
1111
1112 let group_id = AlignmentGroupId::new("test-group");
1113 coordinator.assign_source_to_group(0, &group_id).unwrap();
1114
1115 assert_eq!(coordinator.total_source_count(), 1);
1116
1117 coordinator.unassign_source(0);
1118 assert_eq!(coordinator.total_source_count(), 0);
1119 assert_eq!(coordinator.source_group(0), None);
1120 }
1121
1122 #[test]
1123 fn test_coordinator_remove_group() {
1124 let mut coordinator = AlignmentGroupCoordinator::new();
1125
1126 let config = AlignmentGroupConfig::new("test-group");
1127 coordinator.add_group(config);
1128
1129 let group_id = AlignmentGroupId::new("test-group");
1130 coordinator.assign_source_to_group(0, &group_id).unwrap();
1131 coordinator.assign_source_to_group(1, &group_id).unwrap();
1132
1133 assert_eq!(coordinator.group_count(), 1);
1134 assert_eq!(coordinator.total_source_count(), 2);
1135
1136 coordinator.remove_group(&group_id);
1137
1138 assert_eq!(coordinator.group_count(), 0);
1139 assert_eq!(coordinator.total_source_count(), 0);
1140 }
1141
1142 #[test]
1143 fn test_coordinator_is_paused() {
1144 let mut coordinator = AlignmentGroupCoordinator::new();
1145
1146 let config =
1147 AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1148 coordinator.add_group(config);
1149
1150 let group_id = AlignmentGroupId::new("test-group");
1151 coordinator.assign_source_to_group(0, &group_id).unwrap();
1152 coordinator.assign_source_to_group(1, &group_id).unwrap();
1153
1154 coordinator.report_watermark(0, 0);
1155 coordinator.report_watermark(1, 0);
1156
1157 coordinator.report_watermark(0, 100_000);
1159 assert!(coordinator.is_paused(0));
1160 assert!(!coordinator.is_paused(1));
1161 }
1162
1163 #[test]
1164 fn test_coordinator_all_metrics() {
1165 let mut coordinator = AlignmentGroupCoordinator::new();
1166
1167 let config1 = AlignmentGroupConfig::new("group1");
1168 let config2 = AlignmentGroupConfig::new("group2");
1169 coordinator.add_group(config1);
1170 coordinator.add_group(config2);
1171
1172 let metrics = coordinator.all_metrics();
1173 assert_eq!(metrics.len(), 2);
1174 assert!(metrics.contains_key(&AlignmentGroupId::new("group1")));
1175 assert!(metrics.contains_key(&AlignmentGroupId::new("group2")));
1176 }
1177
1178 #[test]
1179 fn test_alignment_group_empty() {
1180 let config = AlignmentGroupConfig::new("test");
1181 let group = WatermarkAlignmentGroup::new(config);
1182
1183 assert_eq!(group.source_count(), 0);
1184 assert_eq!(group.min_watermark(), i64::MIN);
1185 assert_eq!(group.max_watermark(), i64::MIN);
1186 }
1187
1188 #[test]
1189 fn test_alignment_group_all_paused() {
1190 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(10));
1191 let mut group = WatermarkAlignmentGroup::new(config);
1192
1193 group.register_source(0);
1194 group.register_source(1);
1195
1196 group.report_watermark(0, 0);
1197 group.report_watermark(1, 0);
1198
1199 group.report_watermark(0, 100_000);
1202 assert!(group.is_paused(0));
1203
1204 assert!(!group.is_paused(1));
1207 }
1208
1209 #[test]
1210 fn test_alignment_group_negative_watermarks() {
1211 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
1212 let mut group = WatermarkAlignmentGroup::new(config);
1213
1214 group.register_source(0);
1215 group.register_source(1);
1216
1217 group.report_watermark(0, -100_000);
1219 group.report_watermark(1, -50_000);
1220
1221 assert_eq!(group.min_watermark(), -100_000);
1222 assert_eq!(group.max_watermark(), -50_000);
1223 assert_eq!(group.current_drift(), Duration::from_secs(50));
1224 }
1225
1226 #[test]
1227 fn test_alignment_source_state_pause_resume_tracking() {
1228 let mut state = AlignmentSourceState::new(0);
1229
1230 assert!(!state.is_paused);
1231 assert!(state.pause_start.is_none());
1232
1233 state.pause();
1234 assert!(state.is_paused);
1235 assert!(state.pause_start.is_some());
1236
1237 std::thread::sleep(Duration::from_millis(1));
1239
1240 state.resume();
1241 assert!(!state.is_paused);
1242 assert!(state.pause_start.is_none());
1243 assert!(state.total_pause_time > Duration::ZERO);
1244 }
1245
1246 #[test]
1247 fn test_alignment_group_check_alignment_interval() {
1248 let config = AlignmentGroupConfig::new("test")
1249 .with_max_drift(Duration::from_secs(60))
1250 .with_update_interval(Duration::from_millis(100));
1251 let mut group = WatermarkAlignmentGroup::new(config);
1252
1253 group.register_source(0);
1254 group.register_source(1);
1255
1256 group.report_watermark(0, 0);
1257 group.report_watermark(1, 0);
1258 group.report_watermark(0, 100_000); let immediate_actions = group.check_alignment();
1262 assert!(immediate_actions.is_empty());
1263
1264 std::thread::sleep(Duration::from_millis(110));
1266
1267 group.report_watermark(1, 50_000); let actions = group.check_alignment();
1270 assert!(actions
1272 .iter()
1273 .any(|(id, action)| *id == 0 && *action == AlignmentAction::Resume));
1274 }
1275}