Skip to main content

laminar_core/time/
partitioned_watermark.rs

1//! # Per-Partition Watermark Tracking
2//!
3//! Extends the watermark system to track watermarks per source partition rather
4//! than per source. This is critical for Kafka integration where each partition
5//! may have different event-time progress.
6//!
7//! ## Problem
8//!
9//! With per-source watermarks, a single slow partition blocks the entire source:
10//!
11//! ```text
12//! Source "orders" (Kafka topic with 4 partitions):
13//!   Partition 0: ████████████████ (active, ts: 10:05)
14//!   Partition 1: ████░░░░░░░░░░░░ (idle since 10:01)  ← BLOCKS ENTIRE SOURCE
15//!   Partition 2: ████████████████ (active, ts: 10:06)
16//!   Partition 3: ████████████████ (active, ts: 10:04)
17//!
18//! Source Watermark: stuck at 10:01 (because of Partition 1)
19//! ```
20//!
21//! ## Solution
22//!
23//! Track watermarks at partition granularity with per-partition idle detection:
24//!
25//! ```rust
26//! use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
27//!
28//! let mut tracker = PartitionedWatermarkTracker::new();
29//!
30//! // Register Kafka source with 4 partitions
31//! tracker.register_source(0, 4);
32//!
33//! // Update individual partitions
34//! tracker.update_partition(PartitionId::new(0, 0), 5000);
35//! tracker.update_partition(PartitionId::new(0, 1), 3000);
36//! tracker.update_partition(PartitionId::new(0, 2), 4000);
37//! tracker.update_partition(PartitionId::new(0, 3), 4500);
38//!
39//! // Combined watermark is minimum (3000)
40//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
41//!
42//! // Mark partition 1 as idle
43//! tracker.mark_partition_idle(PartitionId::new(0, 1));
44//!
45//! // Now combined watermark advances to 4000 (min of active partitions)
46//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
47//! ```
48#![deny(clippy::disallowed_types)]
49
50use std::time::{Duration, Instant};
51
52use rustc_hash::FxHashMap;
53
54use super::Watermark;
55
56/// Partition identifier within a source.
57///
58/// Uniquely identifies a partition by combining source ID and partition number.
59/// For Kafka sources, the partition number corresponds to the Kafka partition.
60#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
61pub struct PartitionId {
62    /// Source identifier (index in the source registry)
63    pub source_id: usize,
64    /// Partition number within the source
65    pub partition: u32,
66}
67
68impl PartitionId {
69    /// Creates a new partition identifier.
70    #[inline]
71    #[must_use]
72    pub const fn new(source_id: usize, partition: u32) -> Self {
73        Self {
74            source_id,
75            partition,
76        }
77    }
78}
79
80impl std::fmt::Display for PartitionId {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        write!(f, "{}:{}", self.source_id, self.partition)
83    }
84}
85
86/// Per-partition watermark state.
87///
88/// Tracks the watermark and activity status for a single partition.
89#[derive(Debug, Clone)]
90pub struct PartitionWatermarkState {
91    /// Current watermark for this partition
92    pub watermark: i64,
93    /// Last event time seen
94    pub last_event_time: i64,
95    /// Last activity timestamp (wall clock)
96    pub last_activity: Instant,
97    /// Whether this partition is marked idle
98    pub is_idle: bool,
99    /// Core assignment (for thread-per-core routing)
100    pub assigned_core: Option<usize>,
101}
102
103impl PartitionWatermarkState {
104    /// Creates new partition state.
105    #[must_use]
106    pub fn new() -> Self {
107        Self {
108            watermark: i64::MIN,
109            last_event_time: i64::MIN,
110            last_activity: Instant::now(),
111            is_idle: false,
112            assigned_core: None,
113        }
114    }
115
116    /// Creates partition state with a specific core assignment.
117    #[must_use]
118    pub fn with_core(core_id: usize) -> Self {
119        Self {
120            watermark: i64::MIN,
121            last_event_time: i64::MIN,
122            last_activity: Instant::now(),
123            is_idle: false,
124            assigned_core: Some(core_id),
125        }
126    }
127}
128
129impl Default for PartitionWatermarkState {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135/// Metrics for partitioned watermark tracking.
136#[derive(Debug, Clone, Default)]
137pub struct PartitionedWatermarkMetrics {
138    /// Total partitions tracked
139    pub total_partitions: usize,
140    /// Currently active (non-idle) partitions
141    pub active_partitions: usize,
142    /// Idle partitions
143    pub idle_partitions: usize,
144    /// Watermark advancements
145    pub watermark_advances: u64,
146    /// Partition rebalances (adds/removes)
147    pub rebalances: u64,
148}
149
150impl PartitionedWatermarkMetrics {
151    /// Creates new metrics.
152    #[must_use]
153    pub fn new() -> Self {
154        Self::default()
155    }
156}
157
158/// Errors that can occur in partitioned watermark operations.
159#[derive(Debug, Clone, thiserror::Error)]
160pub enum WatermarkError {
161    /// Partition not registered
162    #[error("Unknown partition: {0}")]
163    UnknownPartition(PartitionId),
164
165    /// Source not found
166    #[error("Source not found: {0}")]
167    SourceNotFound(usize),
168
169    /// Invalid partition number
170    #[error("Invalid partition {partition} for source {source_id} (max: {max_partition})")]
171    InvalidPartition {
172        /// Source ID
173        source_id: usize,
174        /// Requested partition
175        partition: u32,
176        /// Maximum valid partition
177        max_partition: u32,
178    },
179
180    /// Partition already exists
181    #[error("Partition already exists: {0}")]
182    PartitionExists(PartitionId),
183}
184
185/// Tracks watermarks across partitions within sources.
186///
187/// Extends `WatermarkTracker` to support partition-level granularity.
188/// The combined watermark is the minimum across all active partitions.
189///
190/// # Thread Safety
191///
192/// This tracker is NOT thread-safe. For thread-per-core architectures,
193/// use [`CoreWatermarkState`] for per-core tracking and coordinate
194/// via the runtime.
195///
196/// # Example
197///
198/// ```rust
199/// use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
200///
201/// let mut tracker = PartitionedWatermarkTracker::new();
202///
203/// // Register Kafka source with 4 partitions
204/// tracker.register_source(0, 4);
205///
206/// // Update individual partitions
207/// tracker.update_partition(PartitionId::new(0, 0), 5000);
208/// tracker.update_partition(PartitionId::new(0, 1), 3000);
209/// tracker.update_partition(PartitionId::new(0, 2), 4000);
210/// tracker.update_partition(PartitionId::new(0, 3), 4500);
211///
212/// // Combined watermark is minimum (3000)
213/// assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
214///
215/// // Mark partition 1 as idle
216/// tracker.mark_partition_idle(PartitionId::new(0, 1));
217///
218/// // Now combined watermark advances to 4000 (min of active partitions)
219/// assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
220/// ```
221#[derive(Debug)]
222pub struct PartitionedWatermarkTracker {
223    /// Per-partition state, keyed by `PartitionId`
224    partitions: FxHashMap<PartitionId, PartitionWatermarkState>,
225
226    /// Number of partitions per source (for bounds checking)
227    source_partition_counts: Vec<usize>,
228
229    /// Combined watermark across all active partitions
230    combined_watermark: i64,
231
232    /// Idle timeout for automatic idle detection
233    idle_timeout: Duration,
234
235    /// Metrics
236    metrics: PartitionedWatermarkMetrics,
237}
238
239impl PartitionedWatermarkTracker {
240    /// Default idle timeout (30 seconds).
241    pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
242
243    /// Creates a new partitioned watermark tracker.
244    #[must_use]
245    pub fn new() -> Self {
246        Self {
247            partitions: FxHashMap::default(),
248            source_partition_counts: Vec::new(),
249            combined_watermark: i64::MIN,
250            idle_timeout: Self::DEFAULT_IDLE_TIMEOUT,
251            metrics: PartitionedWatermarkMetrics::new(),
252        }
253    }
254
255    /// Creates a new tracker with custom idle timeout.
256    #[must_use]
257    pub fn with_idle_timeout(idle_timeout: Duration) -> Self {
258        Self {
259            partitions: FxHashMap::default(),
260            source_partition_counts: Vec::new(),
261            combined_watermark: i64::MIN,
262            idle_timeout,
263            metrics: PartitionedWatermarkMetrics::new(),
264        }
265    }
266
267    /// Registers a source with the specified number of partitions.
268    ///
269    /// Creates partition state for each partition in the source.
270    /// Call this when a source is created or when Kafka partitions are assigned.
271    pub fn register_source(&mut self, source_id: usize, num_partitions: usize) {
272        // Expand source_partition_counts if needed
273        while self.source_partition_counts.len() <= source_id {
274            self.source_partition_counts.push(0);
275        }
276
277        self.source_partition_counts[source_id] = num_partitions;
278
279        // Create partition state for each partition
280        #[allow(clippy::cast_possible_truncation)]
281        // Partition count bounded by Kafka max (< u32::MAX)
282        for partition in 0..num_partitions {
283            let pid = PartitionId::new(source_id, partition as u32);
284            self.partitions.entry(pid).or_default();
285        }
286
287        self.update_metrics();
288    }
289
290    /// Adds a partition to a source (for Kafka rebalancing).
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the partition already exists.
295    pub fn add_partition(&mut self, partition: PartitionId) -> Result<(), WatermarkError> {
296        if self.partitions.contains_key(&partition) {
297            return Err(WatermarkError::PartitionExists(partition));
298        }
299
300        // Expand source_partition_counts if needed
301        while self.source_partition_counts.len() <= partition.source_id {
302            self.source_partition_counts.push(0);
303        }
304
305        // Update partition count
306        let current_count = self.source_partition_counts[partition.source_id];
307        if partition.partition as usize >= current_count {
308            self.source_partition_counts[partition.source_id] = partition.partition as usize + 1;
309        }
310
311        self.partitions
312            .insert(partition, PartitionWatermarkState::new());
313        self.metrics.rebalances += 1;
314        self.update_metrics();
315
316        Ok(())
317    }
318
319    /// Removes a partition from tracking (for Kafka rebalancing).
320    ///
321    /// Returns the partition state if it existed.
322    pub fn remove_partition(&mut self, partition: PartitionId) -> Option<PartitionWatermarkState> {
323        let state = self.partitions.remove(&partition);
324        if state.is_some() {
325            self.metrics.rebalances += 1;
326            self.update_metrics();
327            // Recalculate combined watermark
328            self.recalculate_combined();
329        }
330        state
331    }
332
333    /// Updates the watermark for a specific partition.
334    ///
335    /// This also marks the partition as active and updates its last activity time.
336    ///
337    /// # Returns
338    ///
339    /// `Some(Watermark)` if the combined watermark advances.
340    #[inline]
341    pub fn update_partition(
342        &mut self,
343        partition: PartitionId,
344        watermark: i64,
345    ) -> Option<Watermark> {
346        if let Some(state) = self.partitions.get_mut(&partition) {
347            // Mark as active
348            if state.is_idle {
349                state.is_idle = false;
350                self.metrics.active_partitions += 1;
351                self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
352            }
353            state.last_activity = Instant::now();
354
355            // Update watermark if it advances
356            if watermark > state.watermark {
357                state.watermark = watermark;
358                state.last_event_time = watermark;
359                return self.try_advance_combined();
360            }
361        }
362        None
363    }
364
365    /// Updates watermark from an event timestamp (applies bounded lateness).
366    ///
367    /// Convenience method that subtracts the configured lateness.
368    #[inline]
369    pub fn update_partition_from_event(
370        &mut self,
371        partition: PartitionId,
372        event_time: i64,
373        max_lateness: i64,
374    ) -> Option<Watermark> {
375        let watermark = event_time.saturating_sub(max_lateness);
376        self.update_partition(partition, watermark)
377    }
378
379    /// Marks a partition as idle, excluding it from watermark calculation.
380    ///
381    /// # Returns
382    ///
383    /// `Some(Watermark)` if marking idle causes the combined watermark to advance.
384    pub fn mark_partition_idle(&mut self, partition: PartitionId) -> Option<Watermark> {
385        if let Some(state) = self.partitions.get_mut(&partition) {
386            if !state.is_idle {
387                state.is_idle = true;
388                self.metrics.idle_partitions += 1;
389                self.metrics.active_partitions = self.metrics.active_partitions.saturating_sub(1);
390                return self.try_advance_combined();
391            }
392        }
393        None
394    }
395
396    /// Marks a partition as active again.
397    ///
398    /// Called automatically when `update_partition` is called, but can be
399    /// called explicitly to reactivate a partition before receiving events.
400    pub fn mark_partition_active(&mut self, partition: PartitionId) {
401        if let Some(state) = self.partitions.get_mut(&partition) {
402            if state.is_idle {
403                state.is_idle = false;
404                state.last_activity = Instant::now();
405                self.metrics.active_partitions += 1;
406                self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
407            }
408        }
409    }
410
411    /// Checks for partitions that have been idle longer than the timeout.
412    ///
413    /// Should be called periodically from Ring 1.
414    ///
415    /// # Returns
416    ///
417    /// `Some(Watermark)` if marking idle partitions causes the combined watermark to advance.
418    pub fn check_idle_partitions(&mut self) -> Option<Watermark> {
419        let mut any_marked = false;
420
421        for state in self.partitions.values_mut() {
422            if !state.is_idle && state.last_activity.elapsed() >= self.idle_timeout {
423                state.is_idle = true;
424                any_marked = true;
425            }
426        }
427
428        if any_marked {
429            self.update_metrics();
430            self.try_advance_combined()
431        } else {
432            None
433        }
434    }
435
436    /// Returns the current combined watermark.
437    #[inline]
438    #[must_use]
439    pub fn current_watermark(&self) -> Option<Watermark> {
440        if self.combined_watermark == i64::MIN {
441            None
442        } else {
443            Some(Watermark::new(self.combined_watermark))
444        }
445    }
446
447    /// Returns the watermark for a specific partition.
448    #[must_use]
449    pub fn partition_watermark(&self, partition: PartitionId) -> Option<i64> {
450        self.partitions.get(&partition).map(|s| s.watermark)
451    }
452
453    /// Returns the watermark for a source (minimum across its partitions).
454    #[must_use]
455    pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
456        let mut min_watermark = i64::MAX;
457        let mut found = false;
458
459        for (pid, state) in &self.partitions {
460            if pid.source_id == source_id && !state.is_idle {
461                found = true;
462                min_watermark = min_watermark.min(state.watermark);
463            }
464        }
465
466        if found && min_watermark != i64::MAX {
467            Some(min_watermark)
468        } else {
469            None
470        }
471    }
472
473    /// Returns whether a partition is idle.
474    #[must_use]
475    pub fn is_partition_idle(&self, partition: PartitionId) -> bool {
476        self.partitions.get(&partition).is_some_and(|s| s.is_idle)
477    }
478
479    /// Returns the number of active partitions for a source.
480    #[must_use]
481    pub fn active_partition_count(&self, source_id: usize) -> usize {
482        self.partitions
483            .iter()
484            .filter(|(pid, state)| pid.source_id == source_id && !state.is_idle)
485            .count()
486    }
487
488    /// Returns the total number of partitions for a source.
489    #[must_use]
490    pub fn partition_count(&self, source_id: usize) -> usize {
491        self.source_partition_counts
492            .get(source_id)
493            .copied()
494            .unwrap_or(0)
495    }
496
497    /// Returns metrics.
498    #[must_use]
499    pub fn metrics(&self) -> &PartitionedWatermarkMetrics {
500        &self.metrics
501    }
502
503    /// Returns the number of sources registered.
504    #[must_use]
505    pub fn num_sources(&self) -> usize {
506        self.source_partition_counts.len()
507    }
508
509    /// Assigns a partition to a core (for thread-per-core routing).
510    pub fn assign_partition_to_core(&mut self, partition: PartitionId, core_id: usize) {
511        if let Some(state) = self.partitions.get_mut(&partition) {
512            state.assigned_core = Some(core_id);
513        }
514    }
515
516    /// Returns the core assignment for a partition.
517    #[must_use]
518    pub fn partition_core(&self, partition: PartitionId) -> Option<usize> {
519        self.partitions
520            .get(&partition)
521            .and_then(|s| s.assigned_core)
522    }
523
524    /// Returns all partitions assigned to a specific core.
525    #[must_use]
526    pub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId> {
527        self.partitions
528            .iter()
529            .filter_map(|(pid, state)| {
530                if state.assigned_core == Some(core_id) {
531                    Some(*pid)
532                } else {
533                    None
534                }
535            })
536            .collect()
537    }
538
539    /// Returns the idle timeout.
540    #[must_use]
541    pub fn idle_timeout(&self) -> Duration {
542        self.idle_timeout
543    }
544
545    /// Sets the idle timeout.
546    pub fn set_idle_timeout(&mut self, timeout: Duration) {
547        self.idle_timeout = timeout;
548    }
549
550    /// Returns partition state for inspection.
551    #[must_use]
552    pub fn partition_state(&self, partition: PartitionId) -> Option<&PartitionWatermarkState> {
553        self.partitions.get(&partition)
554    }
555
556    /// Tries to advance the combined watermark.
557    fn try_advance_combined(&mut self) -> Option<Watermark> {
558        let new_combined = self.calculate_combined();
559
560        if new_combined > self.combined_watermark && new_combined != i64::MAX {
561            self.combined_watermark = new_combined;
562            self.metrics.watermark_advances += 1;
563            Some(Watermark::new(new_combined))
564        } else {
565            None
566        }
567    }
568
569    /// Recalculates the combined watermark from scratch.
570    fn recalculate_combined(&mut self) {
571        let new_combined = self.calculate_combined();
572        if new_combined != i64::MAX {
573            self.combined_watermark = new_combined;
574        }
575    }
576
577    /// Calculates the combined watermark.
578    fn calculate_combined(&self) -> i64 {
579        let mut min_watermark = i64::MAX;
580        let mut has_active = false;
581
582        for state in self.partitions.values() {
583            if !state.is_idle {
584                has_active = true;
585                min_watermark = min_watermark.min(state.watermark);
586            }
587        }
588
589        // If all partitions are idle, use the max watermark to allow progress
590        if !has_active {
591            min_watermark = self
592                .partitions
593                .values()
594                .map(|s| s.watermark)
595                .max()
596                .unwrap_or(i64::MIN);
597        }
598
599        min_watermark
600    }
601
602    /// Updates metrics counts.
603    fn update_metrics(&mut self) {
604        self.metrics.total_partitions = self.partitions.len();
605        self.metrics.idle_partitions = self.partitions.values().filter(|s| s.is_idle).count();
606        self.metrics.active_partitions =
607            self.metrics.total_partitions - self.metrics.idle_partitions;
608    }
609}
610
611impl Default for PartitionedWatermarkTracker {
612    fn default() -> Self {
613        Self::new()
614    }
615}
616
617/// Per-core partition watermark aggregator.
618///
619/// Each core tracks watermarks for its assigned partitions.
620/// The global tracker aggregates across cores.
621///
622/// This is used in thread-per-core architectures where each core
623/// processes a subset of partitions.
624#[derive(Debug)]
625pub struct CoreWatermarkState {
626    /// Partitions assigned to this core
627    assigned_partitions: Vec<PartitionId>,
628
629    /// Per-partition watermarks (parallel to `assigned_partitions`)
630    partition_watermarks: Vec<i64>,
631
632    /// Local watermark (minimum across assigned partitions)
633    local_watermark: i64,
634
635    /// Idle status for each partition (parallel to `assigned_partitions`)
636    idle_status: Vec<bool>,
637
638    /// Core ID
639    core_id: usize,
640}
641
642impl CoreWatermarkState {
643    /// Creates a new per-core watermark state.
644    #[must_use]
645    pub fn new(core_id: usize) -> Self {
646        Self {
647            assigned_partitions: Vec::new(),
648            partition_watermarks: Vec::new(),
649            local_watermark: i64::MIN,
650            idle_status: Vec::new(),
651            core_id,
652        }
653    }
654
655    /// Creates with pre-assigned partitions.
656    #[must_use]
657    pub fn with_partitions(core_id: usize, partitions: Vec<PartitionId>) -> Self {
658        let count = partitions.len();
659        Self {
660            assigned_partitions: partitions,
661            partition_watermarks: vec![i64::MIN; count],
662            local_watermark: i64::MIN,
663            idle_status: vec![false; count],
664            core_id,
665        }
666    }
667
668    /// Assigns a partition to this core.
669    pub fn assign_partition(&mut self, partition: PartitionId) {
670        if !self.assigned_partitions.contains(&partition) {
671            self.assigned_partitions.push(partition);
672            self.partition_watermarks.push(i64::MIN);
673            self.idle_status.push(false);
674        }
675    }
676
677    /// Removes a partition from this core.
678    pub fn remove_partition(&mut self, partition: PartitionId) -> bool {
679        if let Some(idx) = self
680            .assigned_partitions
681            .iter()
682            .position(|p| *p == partition)
683        {
684            self.assigned_partitions.swap_remove(idx);
685            self.partition_watermarks.swap_remove(idx);
686            self.idle_status.swap_remove(idx);
687            self.recalculate_local();
688            true
689        } else {
690            false
691        }
692    }
693
694    /// Updates a partition watermark on this core.
695    ///
696    /// # Returns
697    ///
698    /// `Some(i64)` with the new local watermark if it advances.
699    #[inline]
700    pub fn update_partition(&mut self, partition: PartitionId, watermark: i64) -> Option<i64> {
701        if let Some(idx) = self
702            .assigned_partitions
703            .iter()
704            .position(|p| *p == partition)
705        {
706            if watermark > self.partition_watermarks[idx] {
707                self.partition_watermarks[idx] = watermark;
708                self.idle_status[idx] = false;
709
710                // Check if local watermark advances
711                let new_local = self.calculate_local();
712                if new_local > self.local_watermark {
713                    self.local_watermark = new_local;
714                    return Some(new_local);
715                }
716            }
717        }
718        None
719    }
720
721    /// Marks a partition as idle on this core.
722    pub fn mark_idle(&mut self, partition: PartitionId) -> Option<i64> {
723        if let Some(idx) = self
724            .assigned_partitions
725            .iter()
726            .position(|p| *p == partition)
727        {
728            if !self.idle_status[idx] {
729                self.idle_status[idx] = true;
730
731                // Recalculate local watermark
732                let new_local = self.calculate_local();
733                if new_local > self.local_watermark {
734                    self.local_watermark = new_local;
735                    return Some(new_local);
736                }
737            }
738        }
739        None
740    }
741
742    /// Returns the local (per-core) watermark.
743    #[inline]
744    #[must_use]
745    pub fn local_watermark(&self) -> i64 {
746        self.local_watermark
747    }
748
749    /// Returns the core ID.
750    #[must_use]
751    pub fn core_id(&self) -> usize {
752        self.core_id
753    }
754
755    /// Returns the assigned partitions.
756    #[must_use]
757    pub fn assigned_partitions(&self) -> &[PartitionId] {
758        &self.assigned_partitions
759    }
760
761    /// Returns the number of assigned partitions.
762    #[must_use]
763    pub fn partition_count(&self) -> usize {
764        self.assigned_partitions.len()
765    }
766
767    /// Calculates the local watermark.
768    fn calculate_local(&self) -> i64 {
769        let mut min = i64::MAX;
770        let mut has_active = false;
771
772        for (idx, &wm) in self.partition_watermarks.iter().enumerate() {
773            if !self.idle_status[idx] {
774                has_active = true;
775                min = min.min(wm);
776            }
777        }
778
779        if !has_active {
780            // All idle - use max
781            self.partition_watermarks
782                .iter()
783                .copied()
784                .max()
785                .unwrap_or(i64::MIN)
786        } else if min == i64::MAX {
787            i64::MIN
788        } else {
789            min
790        }
791    }
792
793    /// Recalculates the local watermark from scratch.
794    fn recalculate_local(&mut self) {
795        self.local_watermark = self.calculate_local();
796    }
797}
798
799/// Collects watermarks from multiple cores and computes the global watermark.
800///
801/// This is used by the thread-per-core runtime coordinator to aggregate
802/// watermarks across all cores.
803#[derive(Debug)]
804pub struct GlobalWatermarkCollector {
805    /// Per-core watermarks
806    core_watermarks: Vec<i64>,
807
808    /// Global watermark (minimum across all cores)
809    global_watermark: i64,
810}
811
812impl GlobalWatermarkCollector {
813    /// Creates a new collector for the given number of cores.
814    #[must_use]
815    pub fn new(num_cores: usize) -> Self {
816        Self {
817            core_watermarks: vec![i64::MIN; num_cores],
818            global_watermark: i64::MIN,
819        }
820    }
821
822    /// Updates the watermark for a specific core.
823    ///
824    /// # Returns
825    ///
826    /// `Some(Watermark)` if the global watermark advances.
827    #[inline]
828    pub fn update_core(&mut self, core_id: usize, watermark: i64) -> Option<Watermark> {
829        if core_id < self.core_watermarks.len() {
830            self.core_watermarks[core_id] = watermark;
831
832            // Calculate new global minimum
833            let new_global = self
834                .core_watermarks
835                .iter()
836                .copied()
837                .min()
838                .unwrap_or(i64::MIN);
839
840            if new_global > self.global_watermark && new_global != i64::MIN {
841                self.global_watermark = new_global;
842                return Some(Watermark::new(new_global));
843            }
844        }
845        None
846    }
847
848    /// Returns the current global watermark.
849    #[must_use]
850    pub fn global_watermark(&self) -> Option<Watermark> {
851        if self.global_watermark == i64::MIN {
852            None
853        } else {
854            Some(Watermark::new(self.global_watermark))
855        }
856    }
857
858    /// Returns the watermark for a specific core.
859    #[must_use]
860    pub fn core_watermark(&self, core_id: usize) -> Option<i64> {
861        self.core_watermarks.get(core_id).copied()
862    }
863
864    /// Returns the number of cores.
865    #[must_use]
866    pub fn num_cores(&self) -> usize {
867        self.core_watermarks.len()
868    }
869}
870
871#[cfg(test)]
872mod tests {
873    use super::*;
874
875    #[test]
876    fn test_partition_id_creation() {
877        let pid = PartitionId::new(1, 3);
878        assert_eq!(pid.source_id, 1);
879        assert_eq!(pid.partition, 3);
880    }
881
882    #[test]
883    fn test_partition_id_equality() {
884        let p1 = PartitionId::new(1, 2);
885        let p2 = PartitionId::new(1, 2);
886        let p3 = PartitionId::new(1, 3);
887
888        assert_eq!(p1, p2);
889        assert_ne!(p1, p3);
890    }
891
892    #[test]
893    fn test_partition_id_display() {
894        let pid = PartitionId::new(2, 5);
895        assert_eq!(format!("{pid}"), "2:5");
896    }
897
898    #[test]
899    fn test_partitioned_tracker_single_partition_updates_watermark() {
900        let mut tracker = PartitionedWatermarkTracker::new();
901        tracker.register_source(0, 1);
902
903        let wm = tracker.update_partition(PartitionId::new(0, 0), 1000);
904        assert_eq!(wm, Some(Watermark::new(1000)));
905        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
906    }
907
908    #[test]
909    fn test_partitioned_tracker_multiple_partitions_uses_minimum() {
910        let mut tracker = PartitionedWatermarkTracker::new();
911        tracker.register_source(0, 4);
912
913        tracker.update_partition(PartitionId::new(0, 0), 5000);
914        tracker.update_partition(PartitionId::new(0, 1), 3000);
915        tracker.update_partition(PartitionId::new(0, 2), 4000);
916        tracker.update_partition(PartitionId::new(0, 3), 4500);
917
918        assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
919    }
920
921    #[test]
922    fn test_partitioned_tracker_idle_partition_excluded_from_min() {
923        let mut tracker = PartitionedWatermarkTracker::new();
924        tracker.register_source(0, 4);
925
926        tracker.update_partition(PartitionId::new(0, 0), 5000);
927        tracker.update_partition(PartitionId::new(0, 1), 1000); // Slow partition
928        tracker.update_partition(PartitionId::new(0, 2), 4000);
929        tracker.update_partition(PartitionId::new(0, 3), 4500);
930
931        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
932
933        // Mark slow partition as idle
934        let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
935        assert_eq!(wm, Some(Watermark::new(4000)));
936        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
937    }
938
939    #[test]
940    fn test_partitioned_tracker_all_idle_uses_max() {
941        let mut tracker = PartitionedWatermarkTracker::new();
942        tracker.register_source(0, 2);
943
944        tracker.update_partition(PartitionId::new(0, 0), 5000);
945        tracker.update_partition(PartitionId::new(0, 1), 3000);
946
947        tracker.mark_partition_idle(PartitionId::new(0, 0));
948        let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
949
950        // When all idle, use max to allow progress
951        assert_eq!(wm, Some(Watermark::new(5000)));
952    }
953
954    #[test]
955    fn test_partitioned_tracker_partition_reactivated_on_update() {
956        let mut tracker = PartitionedWatermarkTracker::new();
957        tracker.register_source(0, 2);
958
959        tracker.update_partition(PartitionId::new(0, 0), 5000);
960        tracker.update_partition(PartitionId::new(0, 1), 3000);
961
962        // Mark partition 1 idle
963        tracker.mark_partition_idle(PartitionId::new(0, 1));
964        assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
965        assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000)));
966
967        // Update partition 1 - should reactivate it
968        tracker.update_partition(PartitionId::new(0, 1), 4000);
969        assert!(!tracker.is_partition_idle(PartitionId::new(0, 1)));
970
971        // Watermark should now be min of both (4000)
972        assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000))); // Still 5000 because combined can't regress
973    }
974
975    #[test]
976    fn test_partitioned_tracker_add_partition_during_operation() {
977        let mut tracker = PartitionedWatermarkTracker::new();
978        tracker.register_source(0, 2);
979
980        tracker.update_partition(PartitionId::new(0, 0), 5000);
981        tracker.update_partition(PartitionId::new(0, 1), 4000);
982
983        // Add a new partition (Kafka rebalance)
984        tracker.add_partition(PartitionId::new(0, 2)).unwrap();
985
986        // New partition starts at MIN, so watermark shouldn't advance yet
987        assert_eq!(tracker.partition_count(0), 3);
988
989        // Update new partition
990        tracker.update_partition(PartitionId::new(0, 2), 3000);
991        // Watermark is now 3000 (min of all)
992        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000))); // Can't regress
993    }
994
995    #[test]
996    fn test_partitioned_tracker_remove_partition_recalculates_watermark() {
997        let mut tracker = PartitionedWatermarkTracker::new();
998        tracker.register_source(0, 3);
999
1000        tracker.update_partition(PartitionId::new(0, 0), 5000);
1001        tracker.update_partition(PartitionId::new(0, 1), 2000); // Slowest
1002        tracker.update_partition(PartitionId::new(0, 2), 4000);
1003
1004        assert_eq!(tracker.current_watermark(), Some(Watermark::new(2000)));
1005
1006        // Remove slowest partition (e.g., Kafka rebalance)
1007        let state = tracker.remove_partition(PartitionId::new(0, 1));
1008        assert!(state.is_some());
1009        assert_eq!(state.unwrap().watermark, 2000);
1010
1011        // Watermark advances to 4000 (min of remaining: 5000, 4000)
1012        // This is correct - removing a slow partition allows progress
1013        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
1014    }
1015
1016    #[test]
1017    fn test_partitioned_tracker_check_idle_marks_stale_partitions() {
1018        let mut tracker = PartitionedWatermarkTracker::with_idle_timeout(Duration::from_millis(10));
1019        tracker.register_source(0, 2);
1020
1021        tracker.update_partition(PartitionId::new(0, 0), 5000);
1022        tracker.update_partition(PartitionId::new(0, 1), 3000);
1023
1024        // Wait for timeout
1025        std::thread::sleep(Duration::from_millis(20));
1026
1027        // Only update partition 0
1028        tracker.update_partition(PartitionId::new(0, 0), 6000);
1029
1030        // Check for idle - partition 1 should be marked idle
1031        let wm = tracker.check_idle_partitions();
1032
1033        assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
1034        // Watermark should advance since idle partition is excluded
1035        assert!(wm.is_some() || tracker.current_watermark() == Some(Watermark::new(6000)));
1036    }
1037
1038    #[test]
1039    fn test_partitioned_tracker_source_watermark_aggregates_partitions() {
1040        let mut tracker = PartitionedWatermarkTracker::new();
1041        tracker.register_source(0, 2);
1042        tracker.register_source(1, 2);
1043
1044        tracker.update_partition(PartitionId::new(0, 0), 5000);
1045        tracker.update_partition(PartitionId::new(0, 1), 3000);
1046        tracker.update_partition(PartitionId::new(1, 0), 4000);
1047        tracker.update_partition(PartitionId::new(1, 1), 6000);
1048
1049        assert_eq!(tracker.source_watermark(0), Some(3000));
1050        assert_eq!(tracker.source_watermark(1), Some(4000));
1051    }
1052
1053    #[test]
1054    fn test_partitioned_tracker_metrics_accurate() {
1055        let mut tracker = PartitionedWatermarkTracker::new();
1056        tracker.register_source(0, 4);
1057
1058        tracker.update_partition(PartitionId::new(0, 0), 1000);
1059        tracker.update_partition(PartitionId::new(0, 1), 2000);
1060
1061        let metrics = tracker.metrics();
1062        assert_eq!(metrics.total_partitions, 4);
1063        assert_eq!(metrics.active_partitions, 4);
1064        assert_eq!(metrics.idle_partitions, 0);
1065
1066        tracker.mark_partition_idle(PartitionId::new(0, 2));
1067
1068        let metrics = tracker.metrics();
1069        assert_eq!(metrics.idle_partitions, 1);
1070        assert_eq!(metrics.active_partitions, 3);
1071    }
1072
1073    #[test]
1074    fn test_partitioned_tracker_core_assignment() {
1075        let mut tracker = PartitionedWatermarkTracker::new();
1076        tracker.register_source(0, 4);
1077
1078        tracker.assign_partition_to_core(PartitionId::new(0, 0), 0);
1079        tracker.assign_partition_to_core(PartitionId::new(0, 1), 0);
1080        tracker.assign_partition_to_core(PartitionId::new(0, 2), 1);
1081        tracker.assign_partition_to_core(PartitionId::new(0, 3), 1);
1082
1083        assert_eq!(tracker.partition_core(PartitionId::new(0, 0)), Some(0));
1084        assert_eq!(tracker.partition_core(PartitionId::new(0, 2)), Some(1));
1085
1086        let core0_partitions = tracker.partitions_for_core(0);
1087        assert_eq!(core0_partitions.len(), 2);
1088    }
1089
1090    #[test]
1091    fn test_partitioned_tracker_multiple_sources() {
1092        let mut tracker = PartitionedWatermarkTracker::new();
1093        tracker.register_source(0, 2);
1094        tracker.register_source(1, 3);
1095
1096        assert_eq!(tracker.num_sources(), 2);
1097        assert_eq!(tracker.partition_count(0), 2);
1098        assert_eq!(tracker.partition_count(1), 3);
1099    }
1100
1101    #[test]
1102    fn test_partitioned_tracker_update_from_event() {
1103        let mut tracker = PartitionedWatermarkTracker::new();
1104        tracker.register_source(0, 1);
1105
1106        // Event time 5000, max lateness 1000 -> watermark 4000
1107        let wm = tracker.update_partition_from_event(PartitionId::new(0, 0), 5000, 1000);
1108        assert_eq!(wm, Some(Watermark::new(4000)));
1109    }
1110
1111    #[test]
1112    fn test_partitioned_tracker_add_partition_error() {
1113        let mut tracker = PartitionedWatermarkTracker::new();
1114        tracker.register_source(0, 2);
1115
1116        // Adding existing partition should fail
1117        let result = tracker.add_partition(PartitionId::new(0, 0));
1118        assert!(matches!(result, Err(WatermarkError::PartitionExists(_))));
1119    }
1120
1121    #[test]
1122    fn test_core_watermark_state_creation() {
1123        let state = CoreWatermarkState::new(0);
1124        assert_eq!(state.core_id(), 0);
1125        assert_eq!(state.partition_count(), 0);
1126        assert_eq!(state.local_watermark(), i64::MIN);
1127    }
1128
1129    #[test]
1130    fn test_core_watermark_state_with_partitions() {
1131        let partitions = vec![PartitionId::new(0, 0), PartitionId::new(0, 1)];
1132        let state = CoreWatermarkState::with_partitions(1, partitions);
1133
1134        assert_eq!(state.core_id(), 1);
1135        assert_eq!(state.partition_count(), 2);
1136    }
1137
1138    #[test]
1139    fn test_core_watermark_state_update() {
1140        let mut state = CoreWatermarkState::with_partitions(
1141            0,
1142            vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1143        );
1144
1145        // First update - local watermark is still MIN because partition 1 is at MIN
1146        let wm = state.update_partition(PartitionId::new(0, 0), 5000);
1147        assert!(wm.is_none()); // Can't advance - other partition still at MIN
1148        assert_eq!(state.local_watermark(), i64::MIN);
1149
1150        // Second update - now both partitions have values
1151        let wm = state.update_partition(PartitionId::new(0, 1), 3000);
1152        assert_eq!(wm, Some(3000)); // Local watermark advances to min of both
1153        assert_eq!(state.local_watermark(), 3000);
1154
1155        // Update partition 0 again - no change to local (still 3000)
1156        let wm = state.update_partition(PartitionId::new(0, 0), 6000);
1157        assert!(wm.is_none());
1158        assert_eq!(state.local_watermark(), 3000);
1159
1160        // Update partition 1 - local advances
1161        let wm = state.update_partition(PartitionId::new(0, 1), 4000);
1162        assert_eq!(wm, Some(4000));
1163        assert_eq!(state.local_watermark(), 4000);
1164    }
1165
1166    #[test]
1167    fn test_core_watermark_state_idle() {
1168        let mut state = CoreWatermarkState::with_partitions(
1169            0,
1170            vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1171        );
1172
1173        state.update_partition(PartitionId::new(0, 0), 5000);
1174        state.update_partition(PartitionId::new(0, 1), 2000);
1175
1176        assert_eq!(state.local_watermark(), 2000);
1177
1178        // Mark slow partition idle
1179        let wm = state.mark_idle(PartitionId::new(0, 1));
1180        assert_eq!(wm, Some(5000));
1181        assert_eq!(state.local_watermark(), 5000);
1182    }
1183
1184    #[test]
1185    fn test_core_watermark_state_assign_remove() {
1186        let mut state = CoreWatermarkState::new(0);
1187
1188        state.assign_partition(PartitionId::new(0, 0));
1189        state.assign_partition(PartitionId::new(0, 1));
1190        assert_eq!(state.partition_count(), 2);
1191
1192        state.update_partition(PartitionId::new(0, 0), 5000);
1193        state.update_partition(PartitionId::new(0, 1), 3000);
1194        assert_eq!(state.local_watermark(), 3000);
1195
1196        // Remove slow partition
1197        let removed = state.remove_partition(PartitionId::new(0, 1));
1198        assert!(removed);
1199        assert_eq!(state.partition_count(), 1);
1200        assert_eq!(state.local_watermark(), 5000);
1201    }
1202
1203    #[test]
1204    fn test_global_collector_creation() {
1205        let collector = GlobalWatermarkCollector::new(4);
1206        assert_eq!(collector.num_cores(), 4);
1207        assert_eq!(collector.global_watermark(), None);
1208    }
1209
1210    #[test]
1211    fn test_global_collector_update() {
1212        let mut collector = GlobalWatermarkCollector::new(3);
1213
1214        collector.update_core(0, 5000);
1215        collector.update_core(1, 3000);
1216        let wm = collector.update_core(2, 4000);
1217
1218        // Global is min of all (3000)
1219        assert_eq!(wm, Some(Watermark::new(3000)));
1220        assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1221    }
1222
1223    #[test]
1224    fn test_global_collector_advancement() {
1225        let mut collector = GlobalWatermarkCollector::new(2);
1226
1227        collector.update_core(0, 5000);
1228        collector.update_core(1, 3000);
1229
1230        assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1231
1232        // Advance the slower core
1233        let wm = collector.update_core(1, 4000);
1234        assert_eq!(wm, Some(Watermark::new(4000)));
1235    }
1236
1237    #[test]
1238    fn test_global_collector_no_regression() {
1239        let mut collector = GlobalWatermarkCollector::new(2);
1240
1241        collector.update_core(0, 5000);
1242        collector.update_core(1, 4000);
1243
1244        // Try to go backwards (should not regress)
1245        let wm = collector.update_core(1, 3000);
1246        assert!(wm.is_none());
1247        // The core watermark is updated, but global doesn't regress
1248        assert_eq!(collector.core_watermark(1), Some(3000));
1249    }
1250}