laminar_connectors/kafka/
watermarks.rs1use std::sync::atomic::{AtomicU64, Ordering};
49use std::time::{Duration, Instant};
50
51#[derive(Debug)]
56pub struct KafkaWatermarkTracker {
57 source_id: usize,
59 partition_watermarks: Vec<PartitionState>,
61 combined_watermark: i64,
63 idle_timeout: Duration,
65 max_out_of_orderness: Duration,
67 metrics: WatermarkMetrics,
69}
70
71#[derive(Debug, Clone)]
73struct PartitionState {
74 watermark: i64,
76 max_event_time: i64,
78 last_activity: Instant,
80 is_idle: bool,
82}
83
84impl PartitionState {
85 fn new() -> Self {
86 Self {
87 watermark: i64::MIN,
88 max_event_time: i64::MIN,
89 last_activity: Instant::now(),
90 is_idle: false,
91 }
92 }
93}
94
95#[derive(Debug, Default)]
97pub struct WatermarkMetrics {
98 pub updates: AtomicU64,
100 pub advances: AtomicU64,
102 pub idle_transitions: AtomicU64,
104 pub active_transitions: AtomicU64,
106}
107
108impl WatermarkMetrics {
109 fn new() -> Self {
110 Self::default()
111 }
112
113 #[must_use]
115 pub fn snapshot(&self) -> WatermarkMetricsSnapshot {
116 WatermarkMetricsSnapshot {
117 updates: self.updates.load(Ordering::Relaxed),
118 advances: self.advances.load(Ordering::Relaxed),
119 idle_transitions: self.idle_transitions.load(Ordering::Relaxed),
120 active_transitions: self.active_transitions.load(Ordering::Relaxed),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Default)]
127pub struct WatermarkMetricsSnapshot {
128 pub updates: u64,
130 pub advances: u64,
132 pub idle_transitions: u64,
134 pub active_transitions: u64,
136}
137
138impl KafkaWatermarkTracker {
139 #[must_use]
146 pub fn new(source_id: usize, idle_timeout: Duration) -> Self {
147 Self {
148 source_id,
149 partition_watermarks: Vec::new(),
150 combined_watermark: i64::MIN,
151 idle_timeout,
152 max_out_of_orderness: Duration::from_secs(5),
153 metrics: WatermarkMetrics::new(),
154 }
155 }
156
157 #[must_use]
159 pub fn with_max_out_of_orderness(mut self, max_out_of_orderness: Duration) -> Self {
160 self.max_out_of_orderness = max_out_of_orderness;
161 self
162 }
163
164 #[must_use]
166 pub fn source_id(&self) -> usize {
167 self.source_id
168 }
169
170 pub fn register_partitions(&mut self, num_partitions: usize) {
174 self.partition_watermarks
175 .resize_with(num_partitions, PartitionState::new);
176 }
177
178 pub fn add_partition(&mut self, partition: i32) {
180 let Some(idx) = usize::try_from(partition).ok() else {
181 return; };
183 if idx >= self.partition_watermarks.len() {
184 self.partition_watermarks
185 .resize_with(idx + 1, PartitionState::new);
186 }
187 }
188
189 pub fn remove_partition(&mut self, partition: i32) {
191 let Some(idx) = usize::try_from(partition).ok() else {
192 return; };
194 if idx < self.partition_watermarks.len() {
195 self.partition_watermarks[idx].is_idle = true;
197 self.partition_watermarks[idx].watermark = i64::MAX; }
199 self.recompute_combined();
200 }
201
202 pub fn update_partition(&mut self, partition: i32, event_time: i64) -> bool {
213 let Some(idx) = usize::try_from(partition).ok() else {
214 return false; };
216 if idx >= self.partition_watermarks.len() {
217 self.partition_watermarks
218 .resize_with(idx + 1, PartitionState::new);
219 }
220
221 let state = &mut self.partition_watermarks[idx];
222 state.last_activity = Instant::now();
223 self.metrics.updates.fetch_add(1, Ordering::Relaxed);
224
225 if state.is_idle {
227 state.is_idle = false;
228 self.metrics
229 .active_transitions
230 .fetch_add(1, Ordering::Relaxed);
231 }
232
233 if event_time > state.max_event_time {
235 state.max_event_time = event_time;
236 let out_of_order_ms =
239 i64::try_from(self.max_out_of_orderness.as_millis()).unwrap_or(i64::MAX);
240 let new_watermark = event_time.saturating_sub(out_of_order_ms);
241 if new_watermark > state.watermark {
242 state.watermark = new_watermark;
243 }
244 }
245
246 self.recompute_combined()
247 }
248
249 pub fn mark_idle(&mut self, partition: i32) {
253 let Some(idx) = usize::try_from(partition).ok() else {
254 return; };
256 if idx < self.partition_watermarks.len() && !self.partition_watermarks[idx].is_idle {
257 self.partition_watermarks[idx].is_idle = true;
258 self.metrics
259 .idle_transitions
260 .fetch_add(1, Ordering::Relaxed);
261 self.recompute_combined();
262 }
263 }
264
265 pub fn check_idle_partitions(&mut self) {
269 let now = Instant::now();
270 let mut any_changed = false;
271
272 for state in &mut self.partition_watermarks {
273 if !state.is_idle && now.duration_since(state.last_activity) > self.idle_timeout {
274 state.is_idle = true;
275 self.metrics
276 .idle_transitions
277 .fetch_add(1, Ordering::Relaxed);
278 any_changed = true;
279 }
280 }
281
282 if any_changed {
283 self.recompute_combined();
284 }
285 }
286
287 #[must_use]
291 pub fn current_watermark(&self) -> Option<i64> {
292 if self.combined_watermark == i64::MIN {
293 None
294 } else {
295 Some(self.combined_watermark)
296 }
297 }
298
299 #[must_use]
301 pub fn active_partition_count(&self) -> usize {
302 self.partition_watermarks
303 .iter()
304 .filter(|s| !s.is_idle)
305 .count()
306 }
307
308 #[must_use]
310 pub fn idle_partition_count(&self) -> usize {
311 self.partition_watermarks
312 .iter()
313 .filter(|s| s.is_idle)
314 .count()
315 }
316
317 #[must_use]
319 pub fn partition_count(&self) -> usize {
320 self.partition_watermarks.len()
321 }
322
323 #[must_use]
325 pub fn metrics(&self) -> &WatermarkMetrics {
326 &self.metrics
327 }
328
329 #[must_use]
331 pub fn partition_watermark(&self, partition: i32) -> Option<i64> {
332 let idx = usize::try_from(partition).ok()?;
333 self.partition_watermarks.get(idx).and_then(|s| {
334 if s.watermark == i64::MIN {
335 None
336 } else {
337 Some(s.watermark)
338 }
339 })
340 }
341
342 #[must_use]
344 pub fn is_partition_idle(&self, partition: i32) -> bool {
345 let Some(idx) = usize::try_from(partition).ok() else {
346 return false;
347 };
348 self.partition_watermarks
349 .get(idx)
350 .is_some_and(|s| s.is_idle)
351 }
352
353 fn recompute_combined(&mut self) -> bool {
355 let old = self.combined_watermark;
356
357 let min = self
359 .partition_watermarks
360 .iter()
361 .filter(|s| !s.is_idle && s.watermark != i64::MIN)
362 .map(|s| s.watermark)
363 .min();
364
365 self.combined_watermark = min.unwrap_or(i64::MIN);
366
367 let advanced = self.combined_watermark > old && old != i64::MIN;
368 if advanced {
369 self.metrics.advances.fetch_add(1, Ordering::Relaxed);
370 }
371 advanced
372 }
373}
374
375impl Default for KafkaWatermarkTracker {
376 fn default() -> Self {
377 Self::new(0, Duration::from_secs(30))
378 }
379}
380
381#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
383pub enum KafkaAlignmentMode {
384 #[default]
386 Pause,
387 WarnOnly,
389 DropExcess,
391}
392
393str_enum!(KafkaAlignmentMode, lowercase_udash, String, "invalid alignment mode",
394 Pause => "pause";
395 WarnOnly => "warn-only", "warnonly", "warn";
396 DropExcess => "drop-excess", "dropexcess", "drop"
397);
398
399#[derive(Debug, Clone)]
404pub struct KafkaAlignmentConfig {
405 pub group_id: String,
407 pub max_drift: Duration,
409 pub mode: KafkaAlignmentMode,
411}
412
413impl KafkaAlignmentConfig {
414 #[must_use]
416 pub fn new(group_id: impl Into<String>) -> Self {
417 Self {
418 group_id: group_id.into(),
419 max_drift: Duration::from_secs(300), mode: KafkaAlignmentMode::Pause,
421 }
422 }
423
424 #[must_use]
426 pub fn with_max_drift(mut self, max_drift: Duration) -> Self {
427 self.max_drift = max_drift;
428 self
429 }
430
431 #[must_use]
433 pub fn with_mode(mut self, mode: KafkaAlignmentMode) -> Self {
434 self.mode = mode;
435 self
436 }
437}
438
439impl Default for KafkaAlignmentConfig {
440 fn default() -> Self {
441 Self::new("default")
442 }
443}
444
445#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum AlignmentCheckResult {
448 Continue,
450 Pause,
452 Resume,
454 Drop,
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_tracker_new() {
464 let tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
465 assert_eq!(tracker.source_id(), 0);
466 assert_eq!(tracker.partition_count(), 0);
467 assert!(tracker.current_watermark().is_none());
468 }
469
470 #[test]
471 fn test_register_partitions() {
472 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
473 tracker.register_partitions(4);
474 assert_eq!(tracker.partition_count(), 4);
475 assert_eq!(tracker.active_partition_count(), 4);
476 assert_eq!(tracker.idle_partition_count(), 0);
477 }
478
479 #[test]
480 fn test_update_partition() {
481 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
482 .with_max_out_of_orderness(Duration::from_millis(1000));
483 tracker.register_partitions(2);
484
485 tracker.update_partition(0, 5000);
486 tracker.update_partition(1, 3000);
487
488 assert_eq!(tracker.current_watermark(), Some(2000));
490 }
491
492 #[test]
493 fn test_idle_partition() {
494 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
495 .with_max_out_of_orderness(Duration::from_millis(1000));
496 tracker.register_partitions(2);
497
498 tracker.update_partition(0, 5000);
499 tracker.update_partition(1, 3000);
500
501 tracker.mark_idle(1);
503
504 assert_eq!(tracker.current_watermark(), Some(4000));
506 assert_eq!(tracker.active_partition_count(), 1);
507 assert_eq!(tracker.idle_partition_count(), 1);
508 }
509
510 #[test]
511 fn test_resume_from_idle() {
512 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
513 .with_max_out_of_orderness(Duration::from_millis(1000));
514 tracker.register_partitions(2);
515
516 tracker.update_partition(0, 5000);
517 tracker.mark_idle(1);
518 assert_eq!(tracker.active_partition_count(), 1);
519
520 tracker.update_partition(1, 4000);
522 assert_eq!(tracker.active_partition_count(), 2);
523 assert_eq!(tracker.current_watermark(), Some(3000));
525 }
526
527 #[test]
528 fn test_add_partition_dynamically() {
529 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
530 .with_max_out_of_orderness(Duration::from_millis(1000));
531
532 tracker.update_partition(0, 5000);
533 tracker.add_partition(5);
534 tracker.update_partition(5, 3000);
535
536 assert_eq!(tracker.partition_count(), 6); assert_eq!(tracker.current_watermark(), Some(2000));
538 }
539
540 #[test]
541 fn test_remove_partition() {
542 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
543 .with_max_out_of_orderness(Duration::from_millis(1000));
544 tracker.register_partitions(2);
545
546 tracker.update_partition(0, 5000);
547 tracker.update_partition(1, 3000);
548
549 tracker.remove_partition(1);
550
551 assert_eq!(tracker.current_watermark(), Some(4000));
553 }
554
555 #[test]
556 fn test_partition_watermark() {
557 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
558 .with_max_out_of_orderness(Duration::from_millis(1000));
559 tracker.register_partitions(2);
560
561 tracker.update_partition(0, 5000);
562 tracker.update_partition(1, 3000);
563
564 assert_eq!(tracker.partition_watermark(0), Some(4000));
565 assert_eq!(tracker.partition_watermark(1), Some(2000));
566 assert!(tracker.partition_watermark(99).is_none());
567 }
568
569 #[test]
570 fn test_metrics() {
571 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
572 tracker.register_partitions(2);
573
574 tracker.update_partition(0, 5000);
575 tracker.update_partition(1, 3000);
576 tracker.mark_idle(1);
577 tracker.update_partition(1, 4000); let snapshot = tracker.metrics().snapshot();
580 assert_eq!(snapshot.updates, 3);
581 assert_eq!(snapshot.idle_transitions, 1);
582 assert_eq!(snapshot.active_transitions, 1);
583 }
584
585 #[test]
586 fn test_alignment_mode_parsing() {
587 assert_eq!(
588 "pause".parse::<KafkaAlignmentMode>().unwrap(),
589 KafkaAlignmentMode::Pause
590 );
591 assert_eq!(
592 "warn-only".parse::<KafkaAlignmentMode>().unwrap(),
593 KafkaAlignmentMode::WarnOnly
594 );
595 assert_eq!(
596 "drop-excess".parse::<KafkaAlignmentMode>().unwrap(),
597 KafkaAlignmentMode::DropExcess
598 );
599 assert!("invalid".parse::<KafkaAlignmentMode>().is_err());
600 }
601
602 #[test]
603 fn test_alignment_config() {
604 let config = KafkaAlignmentConfig::new("test-group")
605 .with_max_drift(Duration::from_secs(60))
606 .with_mode(KafkaAlignmentMode::WarnOnly);
607
608 assert_eq!(config.group_id, "test-group");
609 assert_eq!(config.max_drift, Duration::from_secs(60));
610 assert_eq!(config.mode, KafkaAlignmentMode::WarnOnly);
611 }
612
613 #[test]
614 fn test_all_partitions_idle() {
615 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
616 tracker.register_partitions(2);
617
618 tracker.update_partition(0, 5000);
619 tracker.update_partition(1, 3000);
620 tracker.mark_idle(0);
621 tracker.mark_idle(1);
622
623 assert!(tracker.current_watermark().is_none());
625 }
626}