Skip to main content

laminar_core/checkpoint/
barrier.rs

1//! Checkpoint barrier protocol.
2//!
3//! The coordinator injects barriers into sources via [`CheckpointBarrierInjector`].
4//! Sources deliver barriers alongside events. The fast path (no pending barrier)
5//! is a single `AtomicU64` load (~10ns).
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10/// Barrier flags — packed into the `flags` field.
11pub mod flags {
12    /// No special behavior.
13    pub const NONE: u64 = 0;
14    /// This barrier requires a full snapshot (not incremental).
15    pub const FULL_SNAPSHOT: u64 = 1 << 0;
16    /// This barrier is the final barrier before shutdown.
17    pub const DRAIN: u64 = 1 << 1;
18    /// Cancel any in-progress checkpoint with this ID.
19    pub const CANCEL: u64 = 1 << 2;
20}
21
22/// A checkpoint barrier that flows through the dataflow graph.
23///
24/// This is a 24-byte `#[repr(C)]` value type that can be cheaply copied
25/// and embedded in channel messages. It carries the checkpoint identity
26/// and behavior flags.
27///
28/// ## Layout (24 bytes)
29///
30/// | Field          | Offset | Size | Description                |
31/// |----------------|--------|------|----------------------------|
32/// | checkpoint_id  | 0      | 8    | Unique checkpoint ID       |
33/// | epoch          | 8      | 8    | Monotonic epoch number     |
34/// | flags          | 16     | 8    | Behavior flags (see [`flags`]) |
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36#[repr(C)]
37pub struct CheckpointBarrier {
38    /// Unique identifier for this checkpoint.
39    pub checkpoint_id: u64,
40    /// Monotonically increasing epoch number.
41    pub epoch: u64,
42    /// Behavior flags (see [`flags`] module constants).
43    pub flags: u64,
44}
45
46// Verify the struct is exactly 24 bytes as promised.
47const _: () = assert!(std::mem::size_of::<CheckpointBarrier>() == 24);
48
49impl CheckpointBarrier {
50    /// Create a new barrier with the given checkpoint ID and epoch.
51    #[must_use]
52    pub const fn new(checkpoint_id: u64, epoch: u64) -> Self {
53        Self {
54            checkpoint_id,
55            epoch,
56            flags: flags::NONE,
57        }
58    }
59
60    /// Create a barrier that requests a full snapshot.
61    #[must_use]
62    pub const fn full_snapshot(checkpoint_id: u64, epoch: u64) -> Self {
63        Self {
64            checkpoint_id,
65            epoch,
66            flags: flags::FULL_SNAPSHOT,
67        }
68    }
69
70    /// Check whether this barrier requests a full (non-incremental) snapshot.
71    #[must_use]
72    pub const fn is_full_snapshot(&self) -> bool {
73        self.flags & flags::FULL_SNAPSHOT != 0
74    }
75
76    /// Check whether this barrier signals drain/shutdown.
77    #[must_use]
78    pub const fn is_drain(&self) -> bool {
79        self.flags & flags::DRAIN != 0
80    }
81
82    /// Check whether this barrier cancels an in-progress checkpoint.
83    #[must_use]
84    pub const fn is_cancel(&self) -> bool {
85        self.flags & flags::CANCEL != 0
86    }
87}
88
89/// A message that flows through streaming channels.
90///
91/// Wraps user events with control messages (watermarks and barriers)
92/// in a single enum. Operators pattern-match on this to handle
93/// data vs. control flow.
94///
95/// ## Generic Parameter
96///
97/// `T` is the event payload type — typically `RecordBatch` or a
98/// domain-specific event struct.
99#[derive(Debug, Clone, PartialEq)]
100pub enum StreamMessage<T> {
101    /// A user data event.
102    Event(T),
103    /// A watermark indicating event-time progress (millis since epoch).
104    Watermark(i64),
105    /// A checkpoint barrier for consistent snapshots.
106    Barrier(CheckpointBarrier),
107}
108
109impl<T> StreamMessage<T> {
110    /// Returns `true` if this is a barrier message.
111    #[must_use]
112    pub const fn is_barrier(&self) -> bool {
113        matches!(self, Self::Barrier(_))
114    }
115
116    /// Returns `true` if this is a watermark message.
117    #[must_use]
118    pub const fn is_watermark(&self) -> bool {
119        matches!(self, Self::Watermark(_))
120    }
121
122    /// Returns `true` if this is a data event.
123    #[must_use]
124    pub const fn is_event(&self) -> bool {
125        matches!(self, Self::Event(_))
126    }
127
128    /// Extracts the barrier if this is a [`StreamMessage::Barrier`].
129    #[must_use]
130    pub const fn as_barrier(&self) -> Option<&CheckpointBarrier> {
131        match self {
132            Self::Barrier(b) => Some(b),
133            _ => None,
134        }
135    }
136}
137
138/// Cross-thread barrier injector for source operators.
139///
140/// The coordinator thread stores a packed barrier command via
141/// [`trigger`](Self::trigger). Source operators poll via
142/// [`BarrierPollHandle::poll`] on each iteration of their event loop.
143///
144/// ## Fast Path
145///
146/// The poll path is a single `AtomicU64::load(Relaxed)` — typically < 10ns.
147/// Only when a barrier is pending does the source perform a compare-exchange
148/// to claim it.
149#[derive(Debug)]
150pub struct CheckpointBarrierInjector {
151    /// Packed command: 0 = no pending, otherwise (`checkpoint_id` << 32 | flags).
152    cmd: Arc<AtomicU64>,
153    /// The epoch counter, incremented each time a barrier is triggered.
154    epoch: AtomicU64,
155}
156
157impl CheckpointBarrierInjector {
158    /// Create a new injector with no pending barrier.
159    #[must_use]
160    pub fn new() -> Self {
161        Self {
162            cmd: Arc::new(AtomicU64::new(0)),
163            epoch: AtomicU64::new(0),
164        }
165    }
166
167    /// Get a handle that source operators use to poll for barriers.
168    #[must_use]
169    pub fn handle(&self) -> BarrierPollHandle {
170        BarrierPollHandle {
171            cmd: Arc::clone(&self.cmd),
172        }
173    }
174
175    /// Trigger a new checkpoint barrier.
176    ///
177    /// The next [`BarrierPollHandle::poll`] call on any source will
178    /// observe this barrier and return it. If a previous barrier has
179    /// not been consumed, it is superseded — this is intentional for
180    /// the Chandy-Lamport protocol where only the latest checkpoint
181    /// matters.
182    ///
183    /// # Arguments
184    ///
185    /// * `checkpoint_id` - Unique checkpoint ID (must fit in 32 bits)
186    /// * `barrier_flags` - Barrier flags (must fit in 32 bits)
187    ///
188    /// # Panics
189    ///
190    /// Debug-asserts that `checkpoint_id` and `barrier_flags` fit in u32.
191    #[allow(clippy::cast_possible_truncation)]
192    pub fn trigger(&self, checkpoint_id: u64, barrier_flags: u64) {
193        debug_assert!(
194            u32::try_from(checkpoint_id).is_ok(),
195            "checkpoint_id {checkpoint_id} exceeds u32::MAX"
196        );
197        debug_assert!(
198            u32::try_from(barrier_flags).is_ok(),
199            "barrier_flags {barrier_flags:#x} exceeds u32::MAX"
200        );
201        // Pack checkpoint_id (upper 32) | flags (lower 32). 0 = no barrier.
202        let packed = (u64::from(checkpoint_id as u32) << 32) | u64::from(barrier_flags as u32);
203        self.cmd.store(packed, Ordering::Release);
204        self.epoch.fetch_add(1, Ordering::Relaxed);
205    }
206
207    /// Get the current epoch (number of barriers triggered).
208    #[must_use]
209    pub fn epoch(&self) -> u64 {
210        self.epoch.load(Ordering::Relaxed)
211    }
212}
213
214impl Default for CheckpointBarrierInjector {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220/// Handle used by source operators to poll for pending barriers.
221///
222/// Cloned from [`CheckpointBarrierInjector::handle`] and stored in the
223/// source operator. The fast path is a single atomic load.
224#[derive(Debug, Clone)]
225pub struct BarrierPollHandle {
226    /// Shared packed command word.
227    cmd: Arc<AtomicU64>,
228}
229
230impl BarrierPollHandle {
231    /// Poll for a pending barrier.
232    ///
233    /// Returns `Some(CheckpointBarrier)` if a barrier is pending and
234    /// this call successfully claimed it (exactly-once delivery across
235    /// handles sharing the same injector). Returns `None` if no barrier
236    /// is pending or another handle already claimed it.
237    ///
238    /// The `epoch` parameter is supplied by the caller (typically the
239    /// source operator's current epoch) and is embedded in the returned
240    /// barrier. The injector does not encode the epoch in the atomic
241    /// command word — only checkpoint ID and flags are packed.
242    ///
243    /// ## Performance
244    ///
245    /// Fast path (no barrier): single `load(Relaxed)` — < 10ns.
246    /// Slow path (barrier pending): one `compare_exchange`.
247    #[must_use]
248    pub fn poll(&self, epoch: u64) -> Option<CheckpointBarrier> {
249        // Fast path: no barrier pending
250        let packed = self.cmd.load(Ordering::Relaxed);
251        if packed == 0 {
252            return None;
253        }
254
255        // Barrier pending — try to claim it with compare-exchange
256        if self
257            .cmd
258            .compare_exchange(packed, 0, Ordering::AcqRel, Ordering::Relaxed)
259            .is_ok()
260        {
261            Some(CheckpointBarrier {
262                checkpoint_id: packed >> 32,
263                epoch,
264                flags: packed & 0xFFFF_FFFF,
265            })
266        } else {
267            // Another thread claimed it first
268            None
269        }
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_barrier_size() {
279        assert_eq!(std::mem::size_of::<CheckpointBarrier>(), 24);
280    }
281
282    #[test]
283    fn test_barrier_flags() {
284        let barrier = CheckpointBarrier::new(1, 1);
285        assert!(!barrier.is_full_snapshot());
286        assert!(!barrier.is_drain());
287        assert!(!barrier.is_cancel());
288
289        let full = CheckpointBarrier::full_snapshot(1, 1);
290        assert!(full.is_full_snapshot());
291        assert!(!full.is_drain());
292
293        let drain = CheckpointBarrier {
294            checkpoint_id: 1,
295            epoch: 1,
296            flags: flags::DRAIN,
297        };
298        assert!(drain.is_drain());
299    }
300
301    #[test]
302    fn test_barrier_roundtrip_via_injector() {
303        let injector = CheckpointBarrierInjector::new();
304        let handle = injector.handle();
305        injector.trigger(42, flags::DRAIN);
306        let barrier = handle.poll(0).expect("barrier should be available");
307        assert_eq!(barrier.checkpoint_id, 42);
308        assert_eq!(barrier.flags, flags::DRAIN);
309        assert!(handle.poll(1).is_none(), "cleared after one poll");
310    }
311
312    #[test]
313    fn test_stream_message_variants() {
314        let event: StreamMessage<String> = StreamMessage::Event("hello".into());
315        assert!(event.is_event());
316        assert!(!event.is_barrier());
317        assert!(!event.is_watermark());
318
319        let watermark: StreamMessage<String> = StreamMessage::Watermark(1000);
320        assert!(watermark.is_watermark());
321
322        let barrier: StreamMessage<String> = StreamMessage::Barrier(CheckpointBarrier::new(1, 1));
323        assert!(barrier.is_barrier());
324        assert_eq!(barrier.as_barrier().unwrap().checkpoint_id, 1);
325    }
326
327    #[test]
328    fn test_injector_poll_no_barrier() {
329        let injector = CheckpointBarrierInjector::new();
330        let handle = injector.handle();
331
332        // No barrier pending
333        assert!(handle.poll(0).is_none());
334    }
335
336    #[test]
337    fn test_injector_trigger_and_poll() {
338        let injector = CheckpointBarrierInjector::new();
339        let handle = injector.handle();
340
341        // Trigger barrier
342        injector.trigger(42, flags::FULL_SNAPSHOT);
343        assert_eq!(injector.epoch(), 1);
344
345        // Poll should return the barrier
346        let barrier = handle.poll(1).unwrap();
347        assert_eq!(barrier.checkpoint_id, 42);
348        assert_eq!(barrier.epoch, 1);
349        assert!(barrier.is_full_snapshot());
350
351        // Second poll should return None (already claimed)
352        assert!(handle.poll(1).is_none());
353    }
354
355    #[test]
356    fn test_injector_multiple_handles() {
357        let injector = CheckpointBarrierInjector::new();
358        let handle1 = injector.handle();
359        let handle2 = injector.handle();
360
361        injector.trigger(1, flags::NONE);
362
363        // Only one handle should claim it
364        let r1 = handle1.poll(1);
365        let r2 = handle2.poll(1);
366
367        // Exactly one should succeed
368        assert!(r1.is_some() || r2.is_some());
369        if r1.is_some() {
370            assert!(r2.is_none());
371        }
372    }
373}