Skip to main content

laminar_core/time/
alignment_group.rs

1//! # Watermark Alignment Groups
2//!
3//! Coordinates watermark progress across multiple sources to prevent unbounded state growth
4//! when sources have different processing speeds.
5//!
6//! ## Problem
7//!
8//! When sources have different processing speeds, fast sources can cause:
9//! - Excessive state growth (buffering data waiting for slow sources)
10//! - Memory pressure on downstream operators
11//! - Uneven resource utilization
12//!
13//! ## Solution
14//!
15//! Alignment groups enforce a maximum watermark drift between sources. When a source
16//! gets too far ahead, it is paused until slower sources catch up.
17//!
18//! ## Example
19//!
20//! ```rust
21//! use laminar_core::time::{
22//!     WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
23//!     EnforcementMode, AlignmentAction,
24//! };
25//! use std::time::Duration;
26//!
27//! let config = AlignmentGroupConfig {
28//!     group_id: AlignmentGroupId("orders-payments".to_string()),
29//!     max_drift: Duration::from_secs(300), // 5 minutes
30//!     update_interval: Duration::from_secs(1),
31//!     enforcement_mode: EnforcementMode::Pause,
32//! };
33//!
34//! let mut group = WatermarkAlignmentGroup::new(config);
35//!
36//! // Register sources
37//! group.register_source(0); // orders stream
38//! group.register_source(1); // payments stream
39//!
40//! // Initialize both sources with watermarks
41//! group.report_watermark(0, 0);
42//! group.report_watermark(1, 0);
43//!
44//! // Report watermarks
45//! let action = group.report_watermark(0, 10_000); // orders at 10:00
46//! assert_eq!(action, AlignmentAction::Continue);
47//!
48//! // Source 0 jumps far ahead (>300s drift from source 1 at 0)
49//! let action = group.report_watermark(0, 310_000); // orders at 10:05:10
50//! assert_eq!(action, AlignmentAction::Pause); // Too far ahead of source 1!
51//! ```
52
53use std::time::{Duration, Instant};
54
55use rustc_hash::FxHashMap;
56
57/// Identifier for an alignment group.
58#[derive(Debug, Clone, Hash, Eq, PartialEq)]
59pub struct AlignmentGroupId(pub String);
60
61impl AlignmentGroupId {
62    /// Creates a new alignment group ID.
63    #[must_use]
64    pub fn new(id: impl Into<String>) -> Self {
65        Self(id.into())
66    }
67
68    /// Returns the group ID as a string slice.
69    #[must_use]
70    pub fn as_str(&self) -> &str {
71        &self.0
72    }
73}
74
75impl std::fmt::Display for AlignmentGroupId {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "{}", self.0)
78    }
79}
80
81/// Configuration for a watermark alignment group.
82#[derive(Debug, Clone)]
83pub struct AlignmentGroupConfig {
84    /// Group identifier.
85    pub group_id: AlignmentGroupId,
86    /// Maximum allowed drift between fastest and slowest source.
87    pub max_drift: Duration,
88    /// How often to check alignment (wall clock).
89    pub update_interval: Duration,
90    /// Whether to pause sources or just emit warnings.
91    pub enforcement_mode: EnforcementMode,
92}
93
94impl AlignmentGroupConfig {
95    /// Creates a new configuration with defaults.
96    #[must_use]
97    pub fn new(group_id: impl Into<String>) -> Self {
98        Self {
99            group_id: AlignmentGroupId::new(group_id),
100            max_drift: Duration::from_secs(300), // 5 minutes default
101            update_interval: Duration::from_secs(1),
102            enforcement_mode: EnforcementMode::Pause,
103        }
104    }
105
106    /// Sets the maximum allowed drift.
107    #[must_use]
108    pub fn with_max_drift(mut self, max_drift: Duration) -> Self {
109        self.max_drift = max_drift;
110        self
111    }
112
113    /// Sets the update interval.
114    #[must_use]
115    pub fn with_update_interval(mut self, interval: Duration) -> Self {
116        self.update_interval = interval;
117        self
118    }
119
120    /// Sets the enforcement mode.
121    #[must_use]
122    pub fn with_enforcement_mode(mut self, mode: EnforcementMode) -> Self {
123        self.enforcement_mode = mode;
124        self
125    }
126}
127
128/// Enforcement mode for alignment groups.
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum EnforcementMode {
131    /// Pause fast sources (recommended for production).
132    #[default]
133    Pause,
134    /// Emit warnings but don't pause (for monitoring).
135    WarnOnly,
136    /// Drop events from fast sources that exceed drift.
137    DropExcess,
138}
139
140/// State for a source within an alignment group.
141#[derive(Debug, Clone)]
142pub struct AlignmentSourceState {
143    /// Source identifier.
144    pub source_id: usize,
145    /// Current watermark (milliseconds since epoch).
146    pub watermark: i64,
147    /// Whether this source is currently paused.
148    pub is_paused: bool,
149    /// Time when pause started (for metrics).
150    pub pause_start: Option<Instant>,
151    /// Total time spent paused.
152    pub total_pause_time: Duration,
153    /// Events processed while paused (for `DropExcess` mode).
154    pub events_dropped_while_paused: u64,
155    /// Last activity time.
156    pub last_activity: Instant,
157}
158
159impl AlignmentSourceState {
160    /// Creates a new source state.
161    fn new(source_id: usize) -> Self {
162        Self {
163            source_id,
164            watermark: i64::MIN,
165            is_paused: false,
166            pause_start: None,
167            total_pause_time: Duration::ZERO,
168            events_dropped_while_paused: 0,
169            last_activity: Instant::now(),
170        }
171    }
172
173    /// Pauses this source.
174    fn pause(&mut self) {
175        if !self.is_paused {
176            self.is_paused = true;
177            self.pause_start = Some(Instant::now());
178        }
179    }
180
181    /// Resumes this source.
182    fn resume(&mut self) {
183        if self.is_paused {
184            self.is_paused = false;
185            if let Some(start) = self.pause_start.take() {
186                self.total_pause_time += start.elapsed();
187            }
188        }
189    }
190}
191
192/// Metrics for an alignment group.
193#[derive(Debug, Clone, Default)]
194pub struct AlignmentGroupMetrics {
195    /// Number of times sources were paused.
196    pub pause_events: u64,
197    /// Number of times sources were resumed.
198    pub resume_events: u64,
199    /// Total pause time across all sources.
200    pub total_pause_time: Duration,
201    /// Maximum observed drift.
202    pub max_observed_drift: Duration,
203    /// Current drift.
204    pub current_drift: Duration,
205    /// Events dropped (in `DropExcess` mode).
206    pub events_dropped: u64,
207    /// Number of warnings emitted (in `WarnOnly` mode).
208    pub warnings_emitted: u64,
209}
210
211impl AlignmentGroupMetrics {
212    /// Creates new metrics.
213    #[must_use]
214    pub fn new() -> Self {
215        Self::default()
216    }
217
218    /// Resets all metrics.
219    pub fn reset(&mut self) {
220        *self = Self::default();
221    }
222}
223
224/// Action to take for a source based on alignment.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum AlignmentAction {
227    /// Continue processing normally.
228    Continue,
229    /// Pause this source (too far ahead).
230    Pause,
231    /// Resume this source (caught up).
232    Resume,
233    /// Drop this event (`DropExcess` mode).
234    Drop,
235    /// Warning only (`WarnOnly` mode).
236    Warn {
237        /// Current drift in milliseconds.
238        drift_ms: i64,
239    },
240}
241
242/// Manages watermark alignment across sources in a group.
243///
244/// Tracks watermarks from multiple sources and enforces that no source
245/// gets too far ahead of others. When a source exceeds the maximum drift,
246/// it is paused (or warned/dropped depending on enforcement mode) until
247/// slower sources catch up.
248///
249/// # Example
250///
251/// ```rust
252/// use laminar_core::time::{
253///     WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
254///     EnforcementMode, AlignmentAction,
255/// };
256/// use std::time::Duration;
257///
258/// let config = AlignmentGroupConfig {
259///     group_id: AlignmentGroupId("join-group".to_string()),
260///     max_drift: Duration::from_secs(60), // 1 minute
261///     update_interval: Duration::from_millis(100),
262///     enforcement_mode: EnforcementMode::Pause,
263/// };
264///
265/// let mut group = WatermarkAlignmentGroup::new(config);
266/// group.register_source(0);
267/// group.register_source(1);
268///
269/// // Both sources start at 0
270/// group.report_watermark(0, 0);
271/// group.report_watermark(1, 0);
272///
273/// // Source 0 advances within limit
274/// let action = group.report_watermark(0, 50_000); // 50 seconds
275/// assert_eq!(action, AlignmentAction::Continue);
276///
277/// // Source 0 advances beyond limit (>60s drift)
278/// let action = group.report_watermark(0, 70_000); // 70 seconds
279/// assert_eq!(action, AlignmentAction::Pause);
280///
281/// // Source 1 catches up
282/// group.report_watermark(1, 30_000); // 30 seconds
283/// assert!(group.should_resume(0)); // Drift now 40s < 60s
284/// ```
285#[derive(Debug)]
286pub struct WatermarkAlignmentGroup {
287    /// Configuration.
288    config: AlignmentGroupConfig,
289    /// Per-source state.
290    sources: FxHashMap<usize, AlignmentSourceState>,
291    /// Current minimum watermark in the group.
292    min_watermark: i64,
293    /// Current maximum watermark in the group.
294    max_watermark: i64,
295    /// Last alignment check time.
296    last_check: Instant,
297    /// Metrics.
298    metrics: AlignmentGroupMetrics,
299}
300
301impl WatermarkAlignmentGroup {
302    /// Creates a new alignment group.
303    #[must_use]
304    pub fn new(config: AlignmentGroupConfig) -> Self {
305        Self {
306            config,
307            sources: FxHashMap::default(),
308            min_watermark: i64::MIN,
309            max_watermark: i64::MIN,
310            last_check: Instant::now(),
311            metrics: AlignmentGroupMetrics::new(),
312        }
313    }
314
315    /// Returns the group ID.
316    #[must_use]
317    pub fn group_id(&self) -> &AlignmentGroupId {
318        &self.config.group_id
319    }
320
321    /// Returns the configuration.
322    #[must_use]
323    pub fn config(&self) -> &AlignmentGroupConfig {
324        &self.config
325    }
326
327    /// Registers a source with this alignment group.
328    pub fn register_source(&mut self, source_id: usize) {
329        self.sources
330            .entry(source_id)
331            .or_insert_with(|| AlignmentSourceState::new(source_id));
332    }
333
334    /// Removes a source from this alignment group.
335    pub fn unregister_source(&mut self, source_id: usize) {
336        self.sources.remove(&source_id);
337        self.recalculate_bounds();
338    }
339
340    /// Reports a watermark update from a source.
341    ///
342    /// Returns the action the source should take.
343    pub fn report_watermark(&mut self, source_id: usize, watermark: i64) -> AlignmentAction {
344        // Ensure source exists
345        self.sources
346            .entry(source_id)
347            .or_insert_with(|| AlignmentSourceState::new(source_id));
348
349        // Check if source is paused and handle accordingly
350        let is_paused = self.sources.get(&source_id).is_some_and(|s| s.is_paused);
351
352        if is_paused {
353            match self.config.enforcement_mode {
354                EnforcementMode::Pause => {
355                    // Check if we can resume (need to check before mutating)
356                    let can_resume = self.can_resume_with_watermark(source_id, watermark);
357                    if can_resume {
358                        if let Some(source) = self.sources.get_mut(&source_id) {
359                            source.watermark = watermark;
360                            source.last_activity = Instant::now();
361                            source.resume();
362                        }
363                        self.metrics.resume_events += 1;
364                        self.recalculate_bounds();
365                        return AlignmentAction::Resume;
366                    }
367                    return AlignmentAction::Pause;
368                }
369                EnforcementMode::DropExcess => {
370                    if let Some(source) = self.sources.get_mut(&source_id) {
371                        source.events_dropped_while_paused += 1;
372                    }
373                    self.metrics.events_dropped += 1;
374                    return AlignmentAction::Drop;
375                }
376                EnforcementMode::WarnOnly => {
377                    // WarnOnly doesn't actually pause, so continue to update watermark
378                }
379            }
380        }
381
382        // Update source watermark
383        if let Some(source) = self.sources.get_mut(&source_id) {
384            source.watermark = watermark;
385            source.last_activity = Instant::now();
386        }
387
388        // Recalculate min/max
389        self.recalculate_bounds();
390
391        // Calculate current drift
392        let current_drift = if self.min_watermark == i64::MIN || self.max_watermark == i64::MIN {
393            Duration::ZERO
394        } else {
395            let drift_ms = self.max_watermark.saturating_sub(self.min_watermark).max(0);
396            // SAFETY: drift_ms is guaranteed to be >= 0 due to max(0)
397            #[allow(clippy::cast_sign_loss)]
398            Duration::from_millis(drift_ms as u64)
399        };
400
401        self.metrics.current_drift = current_drift;
402        if current_drift > self.metrics.max_observed_drift {
403            self.metrics.max_observed_drift = current_drift;
404        }
405
406        // Check if this source is the one causing excessive drift
407        if watermark == self.max_watermark && current_drift > self.config.max_drift {
408            match self.config.enforcement_mode {
409                EnforcementMode::Pause => {
410                    if let Some(source) = self.sources.get_mut(&source_id) {
411                        source.pause();
412                    }
413                    self.metrics.pause_events += 1;
414                    AlignmentAction::Pause
415                }
416                EnforcementMode::WarnOnly => {
417                    self.metrics.warnings_emitted += 1;
418                    // Cap drift_ms to i64::MAX for extremely large drifts
419                    #[allow(clippy::cast_possible_truncation)]
420                    let drift_ms = current_drift.as_millis().min(i64::MAX as u128) as i64;
421                    AlignmentAction::Warn { drift_ms }
422                }
423                EnforcementMode::DropExcess => {
424                    if let Some(source) = self.sources.get_mut(&source_id) {
425                        source.is_paused = true; // Mark as paused for tracking
426                        source.events_dropped_while_paused += 1;
427                    }
428                    self.metrics.events_dropped += 1;
429                    AlignmentAction::Drop
430                }
431            }
432        } else {
433            AlignmentAction::Continue
434        }
435    }
436
437    /// Checks if a paused source should resume.
438    #[must_use]
439    pub fn should_resume(&self, source_id: usize) -> bool {
440        let Some(source) = self.sources.get(&source_id) else {
441            return false;
442        };
443
444        if !source.is_paused {
445            return false;
446        }
447
448        self.can_resume_with_watermark(source_id, source.watermark)
449    }
450
451    /// Checks if a source can resume with a given watermark.
452    fn can_resume_with_watermark(&self, source_id: usize, watermark: i64) -> bool {
453        // Calculate what the min would be without this source
454        let min_without_source = self
455            .sources
456            .iter()
457            .filter(|(&id, s)| id != source_id && !s.is_paused && s.watermark != i64::MIN)
458            .map(|(_, s)| s.watermark)
459            .min()
460            .unwrap_or(i64::MIN);
461
462        if min_without_source == i64::MIN {
463            return true; // No other active sources
464        }
465
466        let drift_if_resumed = watermark.saturating_sub(min_without_source).max(0);
467        // SAFETY: drift_if_resumed is guaranteed to be >= 0 due to max(0)
468        #[allow(clippy::cast_sign_loss)]
469        let drift_duration = Duration::from_millis(drift_if_resumed as u64);
470
471        drift_duration <= self.config.max_drift
472    }
473
474    /// Returns the current drift (max - min watermark).
475    #[must_use]
476    pub fn current_drift(&self) -> Duration {
477        self.metrics.current_drift
478    }
479
480    /// Returns whether a specific source is paused.
481    #[must_use]
482    pub fn is_paused(&self, source_id: usize) -> bool {
483        self.sources.get(&source_id).is_some_and(|s| s.is_paused)
484    }
485
486    /// Returns the minimum watermark in the group.
487    #[must_use]
488    pub fn min_watermark(&self) -> i64 {
489        self.min_watermark
490    }
491
492    /// Returns the maximum watermark in the group.
493    #[must_use]
494    pub fn max_watermark(&self) -> i64 {
495        self.max_watermark
496    }
497
498    /// Returns metrics for this group.
499    #[must_use]
500    pub fn metrics(&self) -> &AlignmentGroupMetrics {
501        &self.metrics
502    }
503
504    /// Returns the number of registered sources.
505    #[must_use]
506    pub fn source_count(&self) -> usize {
507        self.sources.len()
508    }
509
510    /// Returns the number of paused sources.
511    #[must_use]
512    pub fn paused_source_count(&self) -> usize {
513        self.sources.values().filter(|s| s.is_paused).count()
514    }
515
516    /// Returns the number of active (non-paused) sources.
517    #[must_use]
518    pub fn active_source_count(&self) -> usize {
519        self.sources.values().filter(|s| !s.is_paused).count()
520    }
521
522    /// Performs periodic alignment check.
523    ///
524    /// Should be called from Ring 1 at the configured `update_interval`.
525    /// Returns list of (`source_id`, action) pairs.
526    pub fn check_alignment(&mut self) -> Vec<(usize, AlignmentAction)> {
527        if self.last_check.elapsed() < self.config.update_interval {
528            return Vec::new();
529        }
530
531        self.last_check = Instant::now();
532        let mut actions = Vec::new();
533
534        // Check each paused source to see if it can resume
535        let paused_sources: Vec<usize> = self
536            .sources
537            .iter()
538            .filter(|(_, s)| s.is_paused)
539            .map(|(&id, _)| id)
540            .collect();
541
542        for source_id in paused_sources {
543            if self.should_resume(source_id) {
544                if let Some(source) = self.sources.get_mut(&source_id) {
545                    source.resume();
546                    self.metrics.resume_events += 1;
547                    actions.push((source_id, AlignmentAction::Resume));
548                }
549            }
550        }
551
552        // Recalculate bounds after resumptions
553        if !actions.is_empty() {
554            self.recalculate_bounds();
555        }
556
557        actions
558    }
559
560    /// Returns the state of a specific source.
561    #[must_use]
562    pub fn source_state(&self, source_id: usize) -> Option<&AlignmentSourceState> {
563        self.sources.get(&source_id)
564    }
565
566    /// Recalculates min/max watermarks from active sources.
567    fn recalculate_bounds(&mut self) {
568        let active_watermarks: Vec<i64> = self
569            .sources
570            .values()
571            .filter(|s| !s.is_paused && s.watermark != i64::MIN)
572            .map(|s| s.watermark)
573            .collect();
574
575        if active_watermarks.is_empty() {
576            self.min_watermark = i64::MIN;
577            self.max_watermark = i64::MIN;
578        } else {
579            self.min_watermark = *active_watermarks.iter().min().unwrap();
580            self.max_watermark = *active_watermarks.iter().max().unwrap();
581        }
582    }
583}
584
585/// Error type for alignment group operations.
586#[derive(Debug, thiserror::Error)]
587pub enum AlignmentError {
588    /// Source not registered in any group.
589    #[error("source {0} not in any group")]
590    SourceNotInGroup(usize),
591
592    /// Group not found.
593    #[error("group '{0}' not found")]
594    GroupNotFound(String),
595
596    /// Source already assigned to another group.
597    #[error("source {source_id} already in group '{group_id}'")]
598    SourceAlreadyInGroup {
599        /// The source ID.
600        source_id: usize,
601        /// The group ID.
602        group_id: String,
603    },
604}
605
606/// Manages multiple alignment groups.
607///
608/// Provides a single coordination point for all alignment groups in the system.
609/// Routes watermark updates to the appropriate group based on source assignment.
610///
611/// # Example
612///
613/// ```rust
614/// use laminar_core::time::{
615///     AlignmentGroupCoordinator, AlignmentGroupConfig, AlignmentGroupId,
616///     EnforcementMode, AlignmentAction,
617/// };
618/// use std::time::Duration;
619///
620/// let mut coordinator = AlignmentGroupCoordinator::new();
621///
622/// // Create a group for orders-payments join
623/// let config = AlignmentGroupConfig::new("orders-payments")
624///     .with_max_drift(Duration::from_secs(300));
625/// coordinator.add_group(config);
626///
627/// // Assign sources to the group
628/// coordinator.assign_source_to_group(0, &AlignmentGroupId::new("orders-payments")).unwrap();
629/// coordinator.assign_source_to_group(1, &AlignmentGroupId::new("orders-payments")).unwrap();
630///
631/// // Report watermarks
632/// let action = coordinator.report_watermark(0, 10_000);
633/// assert_eq!(action, Some(AlignmentAction::Continue));
634/// ```
635#[derive(Debug, Default)]
636pub struct AlignmentGroupCoordinator {
637    /// Groups by ID.
638    groups: FxHashMap<AlignmentGroupId, WatermarkAlignmentGroup>,
639    /// Source to group mapping (a source can be in one group).
640    source_groups: FxHashMap<usize, AlignmentGroupId>,
641}
642
643impl AlignmentGroupCoordinator {
644    /// Creates a new coordinator.
645    #[must_use]
646    pub fn new() -> Self {
647        Self::default()
648    }
649
650    /// Adds an alignment group.
651    pub fn add_group(&mut self, config: AlignmentGroupConfig) {
652        let group_id = config.group_id.clone();
653        self.groups
654            .insert(group_id, WatermarkAlignmentGroup::new(config));
655    }
656
657    /// Removes an alignment group.
658    pub fn remove_group(&mut self, group_id: &AlignmentGroupId) -> Option<WatermarkAlignmentGroup> {
659        // Remove source mappings for this group
660        self.source_groups.retain(|_, gid| gid != group_id);
661        self.groups.remove(group_id)
662    }
663
664    /// Assigns a source to a group.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the group doesn't exist or the source is already
669    /// assigned to another group.
670    pub fn assign_source_to_group(
671        &mut self,
672        source_id: usize,
673        group_id: &AlignmentGroupId,
674    ) -> Result<(), AlignmentError> {
675        // Check if source is already in a group
676        if let Some(existing_group) = self.source_groups.get(&source_id) {
677            if existing_group != group_id {
678                return Err(AlignmentError::SourceAlreadyInGroup {
679                    source_id,
680                    group_id: existing_group.0.clone(),
681                });
682            }
683            // Already in this group, nothing to do
684            return Ok(());
685        }
686
687        // Check if group exists
688        let group = self
689            .groups
690            .get_mut(group_id)
691            .ok_or_else(|| AlignmentError::GroupNotFound(group_id.0.clone()))?;
692
693        // Register source with group
694        group.register_source(source_id);
695        self.source_groups.insert(source_id, group_id.clone());
696
697        Ok(())
698    }
699
700    /// Removes a source from its group.
701    pub fn unassign_source(&mut self, source_id: usize) {
702        if let Some(group_id) = self.source_groups.remove(&source_id) {
703            if let Some(group) = self.groups.get_mut(&group_id) {
704                group.unregister_source(source_id);
705            }
706        }
707    }
708
709    /// Reports a watermark update.
710    ///
711    /// Returns the action for the source, or `None` if source not in any group.
712    pub fn report_watermark(
713        &mut self,
714        source_id: usize,
715        watermark: i64,
716    ) -> Option<AlignmentAction> {
717        let group_id = self.source_groups.get(&source_id)?;
718        let group = self.groups.get_mut(group_id)?;
719        Some(group.report_watermark(source_id, watermark))
720    }
721
722    /// Checks alignment for all groups.
723    ///
724    /// Returns all resume/pause actions across all groups.
725    pub fn check_all_alignments(&mut self) -> Vec<(usize, AlignmentAction)> {
726        let mut all_actions = Vec::new();
727        for group in self.groups.values_mut() {
728            all_actions.extend(group.check_alignment());
729        }
730        all_actions
731    }
732
733    /// Returns metrics for all groups.
734    #[must_use]
735    pub fn all_metrics(&self) -> FxHashMap<AlignmentGroupId, AlignmentGroupMetrics> {
736        self.groups
737            .iter()
738            .map(|(id, group)| (id.clone(), group.metrics().clone()))
739            .collect()
740    }
741
742    /// Returns a reference to a specific group.
743    #[must_use]
744    pub fn group(&self, group_id: &AlignmentGroupId) -> Option<&WatermarkAlignmentGroup> {
745        self.groups.get(group_id)
746    }
747
748    /// Returns a mutable reference to a specific group.
749    pub fn group_mut(
750        &mut self,
751        group_id: &AlignmentGroupId,
752    ) -> Option<&mut WatermarkAlignmentGroup> {
753        self.groups.get_mut(group_id)
754    }
755
756    /// Returns the group ID for a source.
757    #[must_use]
758    pub fn source_group(&self, source_id: usize) -> Option<&AlignmentGroupId> {
759        self.source_groups.get(&source_id)
760    }
761
762    /// Returns the number of groups.
763    #[must_use]
764    pub fn group_count(&self) -> usize {
765        self.groups.len()
766    }
767
768    /// Returns the total number of sources across all groups.
769    #[must_use]
770    pub fn total_source_count(&self) -> usize {
771        self.source_groups.len()
772    }
773
774    /// Checks if a source should resume.
775    #[must_use]
776    pub fn should_resume(&self, source_id: usize) -> bool {
777        let Some(group_id) = self.source_groups.get(&source_id) else {
778            return false;
779        };
780        let Some(group) = self.groups.get(group_id) else {
781            return false;
782        };
783        group.should_resume(source_id)
784    }
785
786    /// Checks if a source is paused.
787    #[must_use]
788    pub fn is_paused(&self, source_id: usize) -> bool {
789        let Some(group_id) = self.source_groups.get(&source_id) else {
790            return false;
791        };
792        let Some(group) = self.groups.get(group_id) else {
793            return false;
794        };
795        group.is_paused(source_id)
796    }
797}
798
799#[cfg(test)]
800mod tests {
801    use super::*;
802
803    #[test]
804    fn test_alignment_group_id() {
805        let id = AlignmentGroupId::new("test-group");
806        assert_eq!(id.as_str(), "test-group");
807        assert_eq!(format!("{id}"), "test-group");
808    }
809
810    #[test]
811    fn test_alignment_group_config_builder() {
812        let config = AlignmentGroupConfig::new("test")
813            .with_max_drift(Duration::from_secs(120))
814            .with_update_interval(Duration::from_millis(500))
815            .with_enforcement_mode(EnforcementMode::WarnOnly);
816
817        assert_eq!(config.group_id.as_str(), "test");
818        assert_eq!(config.max_drift, Duration::from_secs(120));
819        assert_eq!(config.update_interval, Duration::from_millis(500));
820        assert_eq!(config.enforcement_mode, EnforcementMode::WarnOnly);
821    }
822
823    #[test]
824    fn test_alignment_group_single_source_no_pause() {
825        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
826        let mut group = WatermarkAlignmentGroup::new(config);
827
828        group.register_source(0);
829
830        // Single source should never be paused
831        let action = group.report_watermark(0, 100_000);
832        assert_eq!(action, AlignmentAction::Continue);
833        assert!(!group.is_paused(0));
834    }
835
836    #[test]
837    fn test_alignment_group_two_sources_fast_paused() {
838        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60)); // 60 second max drift
839        let mut group = WatermarkAlignmentGroup::new(config);
840
841        group.register_source(0);
842        group.register_source(1);
843
844        // Both start at 0
845        group.report_watermark(0, 0);
846        group.report_watermark(1, 0);
847
848        // Source 0 advances within limit
849        let action = group.report_watermark(0, 50_000); // 50 seconds
850        assert_eq!(action, AlignmentAction::Continue);
851        assert!(!group.is_paused(0));
852
853        // Source 0 advances beyond limit
854        let action = group.report_watermark(0, 70_000); // 70 seconds, drift > 60
855        assert_eq!(action, AlignmentAction::Pause);
856        assert!(group.is_paused(0));
857    }
858
859    #[test]
860    fn test_alignment_group_resume_when_slow_catches_up() {
861        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
862        let mut group = WatermarkAlignmentGroup::new(config);
863
864        group.register_source(0);
865        group.register_source(1);
866
867        // Source 0 at 0, source 1 at 0
868        group.report_watermark(0, 0);
869        group.report_watermark(1, 0);
870
871        // Source 0 jumps ahead and gets paused
872        group.report_watermark(0, 100_000); // 100 seconds
873        assert!(group.is_paused(0));
874
875        // Source 1 catches up
876        group.report_watermark(1, 50_000); // 50 seconds
877                                           // Drift is now 100 - 50 = 50 seconds, within limit
878        assert!(group.should_resume(0));
879    }
880
881    #[test]
882    fn test_alignment_group_warn_only_mode() {
883        let config = AlignmentGroupConfig::new("test")
884            .with_max_drift(Duration::from_secs(60))
885            .with_enforcement_mode(EnforcementMode::WarnOnly);
886        let mut group = WatermarkAlignmentGroup::new(config);
887
888        group.register_source(0);
889        group.register_source(1);
890
891        group.report_watermark(0, 0);
892        group.report_watermark(1, 0);
893
894        // Source 0 exceeds drift but only warns
895        let action = group.report_watermark(0, 100_000);
896        match action {
897            AlignmentAction::Warn { drift_ms } => {
898                assert_eq!(drift_ms, 100_000); // 100 seconds drift
899            }
900            _ => panic!("Expected Warn action"),
901        }
902        assert!(!group.is_paused(0)); // Not actually paused
903        assert_eq!(group.metrics().warnings_emitted, 1);
904    }
905
906    #[test]
907    fn test_alignment_group_drop_excess_mode() {
908        let config = AlignmentGroupConfig::new("test")
909            .with_max_drift(Duration::from_secs(60))
910            .with_enforcement_mode(EnforcementMode::DropExcess);
911        let mut group = WatermarkAlignmentGroup::new(config);
912
913        group.register_source(0);
914        group.register_source(1);
915
916        group.report_watermark(0, 0);
917        group.report_watermark(1, 0);
918
919        // Source 0 exceeds drift and gets dropped
920        let action = group.report_watermark(0, 100_000);
921        assert_eq!(action, AlignmentAction::Drop);
922        assert_eq!(group.metrics().events_dropped, 1);
923
924        // Subsequent events from source 0 are also dropped
925        let action = group.report_watermark(0, 110_000);
926        assert_eq!(action, AlignmentAction::Drop);
927        assert_eq!(group.metrics().events_dropped, 2);
928    }
929
930    #[test]
931    fn test_alignment_group_drift_calculation() {
932        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(300)); // 5 minutes
933        let mut group = WatermarkAlignmentGroup::new(config);
934
935        group.register_source(0);
936        group.register_source(1);
937        group.register_source(2);
938
939        group.report_watermark(0, 100_000); // 100s
940        group.report_watermark(1, 200_000); // 200s
941        group.report_watermark(2, 150_000); // 150s
942
943        // Drift should be max - min = 200 - 100 = 100 seconds
944        assert_eq!(group.current_drift(), Duration::from_secs(100));
945        assert_eq!(group.min_watermark(), 100_000);
946        assert_eq!(group.max_watermark(), 200_000);
947    }
948
949    #[test]
950    fn test_alignment_group_metrics_accurate() {
951        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
952        let mut group = WatermarkAlignmentGroup::new(config);
953
954        group.register_source(0);
955        group.register_source(1);
956
957        group.report_watermark(0, 0);
958        group.report_watermark(1, 0);
959
960        // Pause source 0
961        group.report_watermark(0, 100_000);
962        assert_eq!(group.metrics().pause_events, 1);
963
964        // Source 1 catches up
965        group.report_watermark(1, 50_000);
966
967        // Check alignment should resume source 0
968        let _actions = group.check_alignment();
969        // Note: check_alignment only runs if update_interval has passed
970        // For this test, we check should_resume directly
971        assert!(group.should_resume(0));
972    }
973
974    #[test]
975    fn test_alignment_group_unregister_source() {
976        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
977        let mut group = WatermarkAlignmentGroup::new(config);
978
979        group.register_source(0);
980        group.register_source(1);
981
982        group.report_watermark(0, 100_000);
983        group.report_watermark(1, 50_000);
984
985        assert_eq!(group.source_count(), 2);
986
987        group.unregister_source(1);
988        assert_eq!(group.source_count(), 1);
989
990        // After removing source 1, only source 0 remains
991        // so drift should be 0 (single source)
992        assert_eq!(group.min_watermark(), 100_000);
993        assert_eq!(group.max_watermark(), 100_000);
994    }
995
996    #[test]
997    fn test_alignment_group_source_state() {
998        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
999        let mut group = WatermarkAlignmentGroup::new(config);
1000
1001        group.register_source(0);
1002        group.report_watermark(0, 50_000);
1003
1004        let state = group.source_state(0).expect("source exists");
1005        assert_eq!(state.source_id, 0);
1006        assert_eq!(state.watermark, 50_000);
1007        assert!(!state.is_paused);
1008    }
1009
1010    #[test]
1011    fn test_coordinator_multiple_groups() {
1012        let mut coordinator = AlignmentGroupCoordinator::new();
1013
1014        let config1 = AlignmentGroupConfig::new("group1").with_max_drift(Duration::from_secs(60));
1015        let config2 = AlignmentGroupConfig::new("group2").with_max_drift(Duration::from_secs(120));
1016
1017        coordinator.add_group(config1);
1018        coordinator.add_group(config2);
1019
1020        assert_eq!(coordinator.group_count(), 2);
1021    }
1022
1023    #[test]
1024    fn test_coordinator_source_assignment() {
1025        let mut coordinator = AlignmentGroupCoordinator::new();
1026
1027        let config =
1028            AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1029        coordinator.add_group(config);
1030
1031        let group_id = AlignmentGroupId::new("test-group");
1032
1033        // Assign sources
1034        coordinator
1035            .assign_source_to_group(0, &group_id)
1036            .expect("should succeed");
1037        coordinator
1038            .assign_source_to_group(1, &group_id)
1039            .expect("should succeed");
1040
1041        assert_eq!(coordinator.total_source_count(), 2);
1042        assert_eq!(coordinator.source_group(0), Some(&group_id));
1043    }
1044
1045    #[test]
1046    fn test_coordinator_source_already_in_group() {
1047        let mut coordinator = AlignmentGroupCoordinator::new();
1048
1049        let config1 = AlignmentGroupConfig::new("group1");
1050        let config2 = AlignmentGroupConfig::new("group2");
1051        coordinator.add_group(config1);
1052        coordinator.add_group(config2);
1053
1054        let group1 = AlignmentGroupId::new("group1");
1055        let group2 = AlignmentGroupId::new("group2");
1056
1057        coordinator
1058            .assign_source_to_group(0, &group1)
1059            .expect("should succeed");
1060
1061        // Try to assign to different group
1062        let result = coordinator.assign_source_to_group(0, &group2);
1063        assert!(matches!(
1064            result,
1065            Err(AlignmentError::SourceAlreadyInGroup { .. })
1066        ));
1067
1068        // Assigning to same group should be fine
1069        let result = coordinator.assign_source_to_group(0, &group1);
1070        assert!(result.is_ok());
1071    }
1072
1073    #[test]
1074    fn test_coordinator_group_not_found() {
1075        let mut coordinator = AlignmentGroupCoordinator::new();
1076
1077        let result = coordinator.assign_source_to_group(0, &AlignmentGroupId::new("nonexistent"));
1078        assert!(matches!(result, Err(AlignmentError::GroupNotFound(_))));
1079    }
1080
1081    #[test]
1082    fn test_coordinator_report_watermark() {
1083        let mut coordinator = AlignmentGroupCoordinator::new();
1084
1085        let config =
1086            AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1087        coordinator.add_group(config);
1088
1089        let group_id = AlignmentGroupId::new("test-group");
1090        coordinator.assign_source_to_group(0, &group_id).unwrap();
1091        coordinator.assign_source_to_group(1, &group_id).unwrap();
1092
1093        // Report watermarks
1094        let action = coordinator.report_watermark(0, 0);
1095        assert_eq!(action, Some(AlignmentAction::Continue));
1096
1097        let action = coordinator.report_watermark(1, 0);
1098        assert_eq!(action, Some(AlignmentAction::Continue));
1099
1100        // Source not in any group
1101        let action = coordinator.report_watermark(99, 0);
1102        assert_eq!(action, None);
1103    }
1104
1105    #[test]
1106    fn test_coordinator_unassign_source() {
1107        let mut coordinator = AlignmentGroupCoordinator::new();
1108
1109        let config = AlignmentGroupConfig::new("test-group");
1110        coordinator.add_group(config);
1111
1112        let group_id = AlignmentGroupId::new("test-group");
1113        coordinator.assign_source_to_group(0, &group_id).unwrap();
1114
1115        assert_eq!(coordinator.total_source_count(), 1);
1116
1117        coordinator.unassign_source(0);
1118        assert_eq!(coordinator.total_source_count(), 0);
1119        assert_eq!(coordinator.source_group(0), None);
1120    }
1121
1122    #[test]
1123    fn test_coordinator_remove_group() {
1124        let mut coordinator = AlignmentGroupCoordinator::new();
1125
1126        let config = AlignmentGroupConfig::new("test-group");
1127        coordinator.add_group(config);
1128
1129        let group_id = AlignmentGroupId::new("test-group");
1130        coordinator.assign_source_to_group(0, &group_id).unwrap();
1131        coordinator.assign_source_to_group(1, &group_id).unwrap();
1132
1133        assert_eq!(coordinator.group_count(), 1);
1134        assert_eq!(coordinator.total_source_count(), 2);
1135
1136        coordinator.remove_group(&group_id);
1137
1138        assert_eq!(coordinator.group_count(), 0);
1139        assert_eq!(coordinator.total_source_count(), 0);
1140    }
1141
1142    #[test]
1143    fn test_coordinator_is_paused() {
1144        let mut coordinator = AlignmentGroupCoordinator::new();
1145
1146        let config =
1147            AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1148        coordinator.add_group(config);
1149
1150        let group_id = AlignmentGroupId::new("test-group");
1151        coordinator.assign_source_to_group(0, &group_id).unwrap();
1152        coordinator.assign_source_to_group(1, &group_id).unwrap();
1153
1154        coordinator.report_watermark(0, 0);
1155        coordinator.report_watermark(1, 0);
1156
1157        // Source 0 exceeds drift
1158        coordinator.report_watermark(0, 100_000);
1159        assert!(coordinator.is_paused(0));
1160        assert!(!coordinator.is_paused(1));
1161    }
1162
1163    #[test]
1164    fn test_coordinator_all_metrics() {
1165        let mut coordinator = AlignmentGroupCoordinator::new();
1166
1167        let config1 = AlignmentGroupConfig::new("group1");
1168        let config2 = AlignmentGroupConfig::new("group2");
1169        coordinator.add_group(config1);
1170        coordinator.add_group(config2);
1171
1172        let metrics = coordinator.all_metrics();
1173        assert_eq!(metrics.len(), 2);
1174        assert!(metrics.contains_key(&AlignmentGroupId::new("group1")));
1175        assert!(metrics.contains_key(&AlignmentGroupId::new("group2")));
1176    }
1177
1178    #[test]
1179    fn test_alignment_group_empty() {
1180        let config = AlignmentGroupConfig::new("test");
1181        let group = WatermarkAlignmentGroup::new(config);
1182
1183        assert_eq!(group.source_count(), 0);
1184        assert_eq!(group.min_watermark(), i64::MIN);
1185        assert_eq!(group.max_watermark(), i64::MIN);
1186    }
1187
1188    #[test]
1189    fn test_alignment_group_all_paused() {
1190        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(10));
1191        let mut group = WatermarkAlignmentGroup::new(config);
1192
1193        group.register_source(0);
1194        group.register_source(1);
1195
1196        group.report_watermark(0, 0);
1197        group.report_watermark(1, 0);
1198
1199        // Pause both sources by having them exceed drift relative to each other
1200        // Source 0 jumps far ahead
1201        group.report_watermark(0, 100_000);
1202        assert!(group.is_paused(0));
1203
1204        // Source 1 is now the min, so drift calculation only includes it
1205        // Source 1 can't get paused because it's the slowest
1206        assert!(!group.is_paused(1));
1207    }
1208
1209    #[test]
1210    fn test_alignment_group_negative_watermarks() {
1211        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
1212        let mut group = WatermarkAlignmentGroup::new(config);
1213
1214        group.register_source(0);
1215        group.register_source(1);
1216
1217        // Use negative timestamps (before epoch)
1218        group.report_watermark(0, -100_000);
1219        group.report_watermark(1, -50_000);
1220
1221        assert_eq!(group.min_watermark(), -100_000);
1222        assert_eq!(group.max_watermark(), -50_000);
1223        assert_eq!(group.current_drift(), Duration::from_secs(50));
1224    }
1225
1226    #[test]
1227    fn test_alignment_source_state_pause_resume_tracking() {
1228        let mut state = AlignmentSourceState::new(0);
1229
1230        assert!(!state.is_paused);
1231        assert!(state.pause_start.is_none());
1232
1233        state.pause();
1234        assert!(state.is_paused);
1235        assert!(state.pause_start.is_some());
1236
1237        // Wait a tiny bit
1238        std::thread::sleep(Duration::from_millis(1));
1239
1240        state.resume();
1241        assert!(!state.is_paused);
1242        assert!(state.pause_start.is_none());
1243        assert!(state.total_pause_time > Duration::ZERO);
1244    }
1245
1246    #[test]
1247    fn test_alignment_group_check_alignment_interval() {
1248        let config = AlignmentGroupConfig::new("test")
1249            .with_max_drift(Duration::from_secs(60))
1250            .with_update_interval(Duration::from_millis(100));
1251        let mut group = WatermarkAlignmentGroup::new(config);
1252
1253        group.register_source(0);
1254        group.register_source(1);
1255
1256        group.report_watermark(0, 0);
1257        group.report_watermark(1, 0);
1258        group.report_watermark(0, 100_000); // Paused
1259
1260        // Immediate check should return empty (interval not elapsed)
1261        let immediate_actions = group.check_alignment();
1262        assert!(immediate_actions.is_empty());
1263
1264        // Wait for interval
1265        std::thread::sleep(Duration::from_millis(110));
1266
1267        // Now check should potentially return actions
1268        group.report_watermark(1, 50_000); // Slow source catches up
1269        let actions = group.check_alignment();
1270        // Should have a Resume action for source 0
1271        assert!(actions
1272            .iter()
1273            .any(|(id, action)| *id == 0 && *action == AlignmentAction::Resume));
1274    }
1275}