laminar_connectors/kafka/
watermarks.rs1use std::time::{Duration, Instant};
4
5#[derive(Debug)]
10pub struct KafkaWatermarkTracker {
11 source_id: usize,
13 partition_watermarks: Vec<PartitionState>,
15 combined_watermark: i64,
17 idle_timeout: Duration,
19 max_out_of_orderness: Duration,
21 metrics: WatermarkMetrics,
23}
24
25#[derive(Debug, Clone)]
27struct PartitionState {
28 watermark: i64,
30 max_event_time: i64,
32 last_activity: Instant,
34 is_idle: bool,
36}
37
38impl PartitionState {
39 fn new() -> Self {
40 Self {
41 watermark: i64::MIN,
42 max_event_time: i64::MIN,
43 last_activity: Instant::now(),
44 is_idle: false,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Copy, Default)]
54pub struct WatermarkMetrics {
55 pub updates: u64,
57 pub advances: u64,
59 pub idle_transitions: u64,
61 pub active_transitions: u64,
63}
64
65impl KafkaWatermarkTracker {
66 #[must_use]
73 pub fn new(source_id: usize, idle_timeout: Duration) -> Self {
74 Self {
75 source_id,
76 partition_watermarks: Vec::new(),
77 combined_watermark: i64::MIN,
78 idle_timeout,
79 max_out_of_orderness: Duration::from_secs(5),
80 metrics: WatermarkMetrics::default(),
81 }
82 }
83
84 #[must_use]
86 pub fn with_max_out_of_orderness(mut self, max_out_of_orderness: Duration) -> Self {
87 self.max_out_of_orderness = max_out_of_orderness;
88 self
89 }
90
91 #[must_use]
93 pub fn source_id(&self) -> usize {
94 self.source_id
95 }
96
97 pub fn register_partitions(&mut self, num_partitions: usize) {
101 self.partition_watermarks
102 .resize_with(num_partitions, PartitionState::new);
103 }
104
105 pub fn add_partition(&mut self, partition: i32) {
107 let Some(idx) = usize::try_from(partition).ok() else {
108 return; };
110 if idx >= self.partition_watermarks.len() {
111 self.partition_watermarks
112 .resize_with(idx + 1, PartitionState::new);
113 }
114 }
115
116 pub fn remove_partition(&mut self, partition: i32) {
118 let Some(idx) = usize::try_from(partition).ok() else {
119 return; };
121 if idx < self.partition_watermarks.len() {
122 self.partition_watermarks[idx].is_idle = true;
124 self.partition_watermarks[idx].watermark = i64::MAX; }
126 while self.partition_watermarks.last().is_some_and(|s| s.is_idle) {
129 self.partition_watermarks.pop();
130 }
131 self.recompute_combined();
132 }
133
134 pub fn update_partition(&mut self, partition: i32, event_time: i64) -> bool {
145 let Some(idx) = usize::try_from(partition).ok() else {
146 return false; };
148 if idx >= self.partition_watermarks.len() {
149 self.partition_watermarks
150 .resize_with(idx + 1, PartitionState::new);
151 }
152
153 let state = &mut self.partition_watermarks[idx];
154 state.last_activity = Instant::now();
155 self.metrics.updates += 1;
156
157 if state.is_idle {
159 state.is_idle = false;
160 self.metrics.active_transitions += 1;
161 }
162
163 if event_time > state.max_event_time {
165 state.max_event_time = event_time;
166 let out_of_order_ms =
169 i64::try_from(self.max_out_of_orderness.as_millis()).unwrap_or(i64::MAX);
170 let new_watermark = event_time.saturating_sub(out_of_order_ms);
171 if new_watermark > state.watermark {
172 state.watermark = new_watermark;
173 }
174 }
175
176 self.recompute_combined()
177 }
178
179 pub fn mark_idle(&mut self, partition: i32) {
183 let Some(idx) = usize::try_from(partition).ok() else {
184 return; };
186 if idx < self.partition_watermarks.len() && !self.partition_watermarks[idx].is_idle {
187 self.partition_watermarks[idx].is_idle = true;
188 self.metrics.idle_transitions += 1;
189 self.recompute_combined();
190 }
191 }
192
193 pub fn check_idle_partitions(&mut self) {
197 let now = Instant::now();
198 let mut any_changed = false;
199
200 for state in &mut self.partition_watermarks {
201 if !state.is_idle && now.duration_since(state.last_activity) > self.idle_timeout {
202 state.is_idle = true;
203 self.metrics.idle_transitions += 1;
204 any_changed = true;
205 }
206 }
207
208 if any_changed {
209 self.recompute_combined();
210 }
211 }
212
213 #[must_use]
217 pub fn current_watermark(&self) -> Option<i64> {
218 if self.combined_watermark == i64::MIN {
219 None
220 } else {
221 Some(self.combined_watermark)
222 }
223 }
224
225 #[must_use]
227 pub fn active_partition_count(&self) -> usize {
228 self.partition_watermarks
229 .iter()
230 .filter(|s| !s.is_idle)
231 .count()
232 }
233
234 #[must_use]
236 pub fn idle_partition_count(&self) -> usize {
237 self.partition_watermarks
238 .iter()
239 .filter(|s| s.is_idle)
240 .count()
241 }
242
243 #[must_use]
245 pub fn partition_count(&self) -> usize {
246 self.partition_watermarks.len()
247 }
248
249 #[must_use]
251 pub fn metrics(&self) -> &WatermarkMetrics {
252 &self.metrics
253 }
254
255 #[must_use]
257 pub fn partition_watermark(&self, partition: i32) -> Option<i64> {
258 let idx = usize::try_from(partition).ok()?;
259 self.partition_watermarks.get(idx).and_then(|s| {
260 if s.watermark == i64::MIN {
261 None
262 } else {
263 Some(s.watermark)
264 }
265 })
266 }
267
268 #[must_use]
270 pub fn is_partition_idle(&self, partition: i32) -> bool {
271 let Some(idx) = usize::try_from(partition).ok() else {
272 return false;
273 };
274 self.partition_watermarks
275 .get(idx)
276 .is_some_and(|s| s.is_idle)
277 }
278
279 fn recompute_combined(&mut self) -> bool {
281 let old = self.combined_watermark;
282
283 let min = self
285 .partition_watermarks
286 .iter()
287 .filter(|s| !s.is_idle && s.watermark != i64::MIN)
288 .map(|s| s.watermark)
289 .min();
290
291 self.combined_watermark = min.unwrap_or(i64::MIN);
292
293 let advanced = self.combined_watermark > old && old != i64::MIN;
294 if advanced {
295 self.metrics.advances += 1;
296 }
297 advanced
298 }
299}
300
301impl Default for KafkaWatermarkTracker {
302 fn default() -> Self {
303 Self::new(0, Duration::from_secs(30))
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_tracker_new() {
313 let tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
314 assert_eq!(tracker.source_id(), 0);
315 assert_eq!(tracker.partition_count(), 0);
316 assert!(tracker.current_watermark().is_none());
317 }
318
319 #[test]
320 fn test_register_partitions() {
321 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
322 tracker.register_partitions(4);
323 assert_eq!(tracker.partition_count(), 4);
324 assert_eq!(tracker.active_partition_count(), 4);
325 assert_eq!(tracker.idle_partition_count(), 0);
326 }
327
328 #[test]
329 fn test_update_partition() {
330 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
331 .with_max_out_of_orderness(Duration::from_secs(1));
332 tracker.register_partitions(2);
333
334 tracker.update_partition(0, 5000);
335 tracker.update_partition(1, 3000);
336
337 assert_eq!(tracker.current_watermark(), Some(2000));
339 }
340
341 #[test]
342 fn test_idle_partition() {
343 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
344 .with_max_out_of_orderness(Duration::from_secs(1));
345 tracker.register_partitions(2);
346
347 tracker.update_partition(0, 5000);
348 tracker.update_partition(1, 3000);
349
350 tracker.mark_idle(1);
352
353 assert_eq!(tracker.current_watermark(), Some(4000));
355 assert_eq!(tracker.active_partition_count(), 1);
356 assert_eq!(tracker.idle_partition_count(), 1);
357 }
358
359 #[test]
360 fn test_resume_from_idle() {
361 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
362 .with_max_out_of_orderness(Duration::from_secs(1));
363 tracker.register_partitions(2);
364
365 tracker.update_partition(0, 5000);
366 tracker.mark_idle(1);
367 assert_eq!(tracker.active_partition_count(), 1);
368
369 tracker.update_partition(1, 4000);
371 assert_eq!(tracker.active_partition_count(), 2);
372 assert_eq!(tracker.current_watermark(), Some(3000));
374 }
375
376 #[test]
377 fn test_add_partition_dynamically() {
378 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
379 .with_max_out_of_orderness(Duration::from_secs(1));
380
381 tracker.update_partition(0, 5000);
382 tracker.add_partition(5);
383 tracker.update_partition(5, 3000);
384
385 assert_eq!(tracker.partition_count(), 6); assert_eq!(tracker.current_watermark(), Some(2000));
387 }
388
389 #[test]
390 fn test_remove_partition() {
391 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
392 .with_max_out_of_orderness(Duration::from_secs(1));
393 tracker.register_partitions(2);
394
395 tracker.update_partition(0, 5000);
396 tracker.update_partition(1, 3000);
397
398 tracker.remove_partition(1);
399
400 assert_eq!(tracker.current_watermark(), Some(4000));
402 }
403
404 #[test]
405 fn test_partition_watermark() {
406 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
407 .with_max_out_of_orderness(Duration::from_secs(1));
408 tracker.register_partitions(2);
409
410 tracker.update_partition(0, 5000);
411 tracker.update_partition(1, 3000);
412
413 assert_eq!(tracker.partition_watermark(0), Some(4000));
414 assert_eq!(tracker.partition_watermark(1), Some(2000));
415 assert!(tracker.partition_watermark(99).is_none());
416 }
417
418 #[test]
419 fn test_metrics() {
420 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
421 tracker.register_partitions(2);
422
423 tracker.update_partition(0, 5000);
424 tracker.update_partition(1, 3000);
425 tracker.mark_idle(1);
426 tracker.update_partition(1, 4000); let m = tracker.metrics();
429 assert_eq!(m.updates, 3);
430 assert_eq!(m.idle_transitions, 1);
431 assert_eq!(m.active_transitions, 1);
432 }
433
434 #[test]
435 fn test_all_partitions_idle() {
436 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
437 tracker.register_partitions(2);
438
439 tracker.update_partition(0, 5000);
440 tracker.update_partition(1, 3000);
441 tracker.mark_idle(0);
442 tracker.mark_idle(1);
443
444 assert!(tracker.current_watermark().is_none());
446 }
447
448 #[test]
449 fn test_remove_partition_truncates_trailing() {
450 let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
451 tracker.register_partitions(4); tracker.update_partition(0, 1000);
453 tracker.update_partition(1, 1000);
454 tracker.update_partition(3, 2000);
455
456 tracker.remove_partition(3);
463 assert_eq!(tracker.partition_count(), 3, "trailing idle p3 truncated");
464 tracker.remove_partition(2);
465 assert_eq!(tracker.partition_count(), 2, "trailing idle p2 truncated");
466
467 tracker.register_partitions(4);
469 tracker.update_partition(3, 2000);
470 tracker.remove_partition(1);
471 assert_eq!(
472 tracker.partition_count(),
473 4,
474 "middle idle does not truncate"
475 );
476 }
477}