Skip to main content

laminar_core/checkpoint/
barrier.rs

1//! Checkpoint barrier protocol.
2//!
3//! The coordinator injects barriers into sources via [`CheckpointBarrierInjector`].
4//! Sources deliver barriers alongside events. The fast path (no pending barrier)
5//! is a single `AtomicU64` load (~10ns).
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10/// Barrier flags — packed into the `flags` field.
11pub mod flags {
12    /// No special behavior.
13    pub const NONE: u64 = 0;
14    /// This barrier requires a full snapshot (not incremental).
15    pub const FULL_SNAPSHOT: u64 = 1 << 0;
16    /// This barrier is the final barrier before shutdown.
17    pub const DRAIN: u64 = 1 << 1;
18    /// Cancel any in-progress checkpoint with this ID.
19    pub const CANCEL: u64 = 1 << 2;
20}
21
22/// A checkpoint barrier that flows through the dataflow graph.
23///
24/// This is a 24-byte `#[repr(C)]` value type that can be cheaply copied
25/// and embedded in channel messages. It carries the checkpoint identity
26/// and behavior flags.
27///
28/// ## Layout (24 bytes)
29///
30/// | Field          | Offset | Size | Description                |
31/// |----------------|--------|------|----------------------------|
32/// | checkpoint_id  | 0      | 8    | Unique checkpoint ID       |
33/// | epoch          | 8      | 8    | Monotonic epoch number     |
34/// | flags          | 16     | 8    | Behavior flags (see [`flags`]) |
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36#[repr(C)]
37pub struct CheckpointBarrier {
38    /// Unique identifier for this checkpoint.
39    pub checkpoint_id: u64,
40    /// Monotonically increasing epoch number.
41    pub epoch: u64,
42    /// Behavior flags (see [`flags`] module constants).
43    pub flags: u64,
44}
45
46// Verify the struct is exactly 24 bytes as promised.
47const _: () = assert!(std::mem::size_of::<CheckpointBarrier>() == 24);
48
49impl CheckpointBarrier {
50    /// Create a new barrier with the given checkpoint ID and epoch.
51    #[must_use]
52    pub const fn new(checkpoint_id: u64, epoch: u64) -> Self {
53        Self {
54            checkpoint_id,
55            epoch,
56            flags: flags::NONE,
57        }
58    }
59
60    /// Create a barrier that requests a full snapshot.
61    #[must_use]
62    pub const fn full_snapshot(checkpoint_id: u64, epoch: u64) -> Self {
63        Self {
64            checkpoint_id,
65            epoch,
66            flags: flags::FULL_SNAPSHOT,
67        }
68    }
69
70    /// Check whether this barrier requests a full (non-incremental) snapshot.
71    #[must_use]
72    pub const fn is_full_snapshot(&self) -> bool {
73        self.flags & flags::FULL_SNAPSHOT != 0
74    }
75
76    /// Check whether this barrier signals drain/shutdown.
77    #[must_use]
78    pub const fn is_drain(&self) -> bool {
79        self.flags & flags::DRAIN != 0
80    }
81
82    /// Check whether this barrier cancels an in-progress checkpoint.
83    #[must_use]
84    pub const fn is_cancel(&self) -> bool {
85        self.flags & flags::CANCEL != 0
86    }
87}
88
89/// A message that flows through streaming channels.
90///
91/// Wraps user events with control messages (watermarks and barriers)
92/// in a single enum. Operators pattern-match on this to handle
93/// data vs. control flow.
94///
95/// ## Generic Parameter
96///
97/// `T` is the event payload type — typically `RecordBatch` or a
98/// domain-specific event struct.
99#[derive(Debug, Clone, PartialEq)]
100pub enum StreamMessage<T> {
101    /// A user data event.
102    Event(T),
103    /// A watermark indicating event-time progress (millis since epoch).
104    Watermark(i64),
105    /// A checkpoint barrier for consistent snapshots.
106    Barrier(CheckpointBarrier),
107}
108
109impl<T> StreamMessage<T> {
110    /// Returns `true` if this is a barrier message.
111    #[must_use]
112    pub const fn is_barrier(&self) -> bool {
113        matches!(self, Self::Barrier(_))
114    }
115
116    /// Returns `true` if this is a watermark message.
117    #[must_use]
118    pub const fn is_watermark(&self) -> bool {
119        matches!(self, Self::Watermark(_))
120    }
121
122    /// Returns `true` if this is a data event.
123    #[must_use]
124    pub const fn is_event(&self) -> bool {
125        matches!(self, Self::Event(_))
126    }
127
128    /// Extracts the barrier if this is a [`StreamMessage::Barrier`].
129    #[must_use]
130    pub const fn as_barrier(&self) -> Option<&CheckpointBarrier> {
131        match self {
132            Self::Barrier(b) => Some(b),
133            _ => None,
134        }
135    }
136}
137
138/// Cross-thread barrier injector for source operators.
139///
140/// The coordinator thread stores a packed barrier command via
141/// [`trigger`](Self::trigger). Source operators poll via
142/// [`BarrierPollHandle::poll`] on each iteration of their event loop.
143///
144/// ## Fast Path
145///
146/// The poll path is a single `AtomicU64::load(Relaxed)` — typically < 10ns.
147/// Only when a barrier is pending does the source perform a compare-exchange
148/// to claim it.
149#[derive(Debug)]
150pub struct CheckpointBarrierInjector {
151    /// Packed command: 0 = no pending, otherwise (`checkpoint_id` << 32 | flags).
152    cmd: Arc<AtomicU64>,
153    /// The epoch counter, incremented each time a barrier is triggered.
154    epoch: Arc<AtomicU64>,
155}
156
157impl CheckpointBarrierInjector {
158    /// Create a new injector with no pending barrier.
159    #[must_use]
160    pub fn new() -> Self {
161        Self {
162            cmd: Arc::new(AtomicU64::new(0)),
163            epoch: Arc::new(AtomicU64::new(0)),
164        }
165    }
166
167    /// Get a handle that source operators use to poll for barriers.
168    #[must_use]
169    pub fn handle(&self) -> BarrierPollHandle {
170        BarrierPollHandle {
171            cmd: Arc::clone(&self.cmd),
172        }
173    }
174
175    /// Trigger a new checkpoint barrier.
176    ///
177    /// The next [`BarrierPollHandle::poll`] call on any source will
178    /// observe this barrier and return it. If a previous barrier has
179    /// not been consumed, it is superseded — this is intentional for
180    /// the Chandy-Lamport protocol where only the latest checkpoint
181    /// matters.
182    ///
183    /// # Arguments
184    ///
185    /// * `checkpoint_id` - Unique checkpoint ID (must fit in 32 bits)
186    /// * `barrier_flags` - Barrier flags (must fit in 32 bits)
187    ///
188    /// # Panics
189    ///
190    /// Debug-asserts that `checkpoint_id` and `barrier_flags` fit in u32.
191    #[allow(clippy::cast_possible_truncation)]
192    pub fn trigger(&self, checkpoint_id: u64, barrier_flags: u64) {
193        debug_assert!(
194            u32::try_from(checkpoint_id).is_ok(),
195            "checkpoint_id {checkpoint_id} exceeds u32::MAX"
196        );
197        debug_assert!(
198            u32::try_from(barrier_flags).is_ok(),
199            "barrier_flags {barrier_flags:#x} exceeds u32::MAX"
200        );
201        // Pack checkpoint_id (upper 32) | flags (lower 32). 0 = no barrier.
202        let packed = (u64::from(checkpoint_id as u32) << 32) | u64::from(barrier_flags as u32);
203        self.cmd.store(packed, Ordering::Release);
204        self.epoch.fetch_add(1, Ordering::Relaxed);
205    }
206
207    /// Get the current epoch (number of barriers triggered).
208    #[must_use]
209    pub fn epoch(&self) -> u64 {
210        self.epoch.load(Ordering::Relaxed)
211    }
212}
213
214impl Default for CheckpointBarrierInjector {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220impl Clone for CheckpointBarrierInjector {
221    fn clone(&self) -> Self {
222        Self {
223            cmd: Arc::clone(&self.cmd),
224            epoch: Arc::clone(&self.epoch),
225        }
226    }
227}
228
229/// Handle used by source operators to poll for pending barriers.
230///
231/// Cloned from [`CheckpointBarrierInjector::handle`] and stored in the
232/// source operator. The fast path is a single atomic load.
233#[derive(Debug, Clone)]
234pub struct BarrierPollHandle {
235    /// Shared packed command word.
236    cmd: Arc<AtomicU64>,
237}
238
239impl BarrierPollHandle {
240    /// Poll for a pending barrier.
241    ///
242    /// Returns `Some(CheckpointBarrier)` if a barrier is pending and
243    /// this call successfully claimed it (exactly-once delivery across
244    /// handles sharing the same injector). Returns `None` if no barrier
245    /// is pending or another handle already claimed it.
246    ///
247    /// The `epoch` parameter is supplied by the caller (typically the
248    /// source operator's current epoch) and is embedded in the returned
249    /// barrier. The injector does not encode the epoch in the atomic
250    /// command word — only checkpoint ID and flags are packed.
251    ///
252    /// ## Performance
253    ///
254    /// Fast path (no barrier): single `load(Relaxed)` — < 10ns.
255    /// Slow path (barrier pending): one `compare_exchange`.
256    #[must_use]
257    pub fn poll(&self, epoch: u64) -> Option<CheckpointBarrier> {
258        // Fast path: no barrier pending
259        let packed = self.cmd.load(Ordering::Relaxed);
260        if packed == 0 {
261            return None;
262        }
263
264        // Barrier pending — try to claim it with compare-exchange
265        if self
266            .cmd
267            .compare_exchange(packed, 0, Ordering::AcqRel, Ordering::Relaxed)
268            .is_ok()
269        {
270            Some(CheckpointBarrier {
271                checkpoint_id: packed >> 32,
272                epoch,
273                flags: packed & 0xFFFF_FFFF,
274            })
275        } else {
276            // Another thread claimed it first
277            None
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_barrier_size() {
288        assert_eq!(std::mem::size_of::<CheckpointBarrier>(), 24);
289    }
290
291    #[test]
292    fn test_barrier_flags() {
293        let barrier = CheckpointBarrier::new(1, 1);
294        assert!(!barrier.is_full_snapshot());
295        assert!(!barrier.is_drain());
296        assert!(!barrier.is_cancel());
297
298        let full = CheckpointBarrier::full_snapshot(1, 1);
299        assert!(full.is_full_snapshot());
300        assert!(!full.is_drain());
301
302        let drain = CheckpointBarrier {
303            checkpoint_id: 1,
304            epoch: 1,
305            flags: flags::DRAIN,
306        };
307        assert!(drain.is_drain());
308    }
309
310    #[test]
311    fn test_barrier_roundtrip_via_injector() {
312        let injector = CheckpointBarrierInjector::new();
313        let handle = injector.handle();
314        injector.trigger(42, flags::DRAIN);
315        let barrier = handle.poll(0).expect("barrier should be available");
316        assert_eq!(barrier.checkpoint_id, 42);
317        assert_eq!(barrier.flags, flags::DRAIN);
318        assert!(handle.poll(1).is_none(), "cleared after one poll");
319    }
320
321    #[test]
322    fn test_stream_message_variants() {
323        let event: StreamMessage<String> = StreamMessage::Event("hello".into());
324        assert!(event.is_event());
325        assert!(!event.is_barrier());
326        assert!(!event.is_watermark());
327
328        let watermark: StreamMessage<String> = StreamMessage::Watermark(1000);
329        assert!(watermark.is_watermark());
330
331        let barrier: StreamMessage<String> = StreamMessage::Barrier(CheckpointBarrier::new(1, 1));
332        assert!(barrier.is_barrier());
333        assert_eq!(barrier.as_barrier().unwrap().checkpoint_id, 1);
334    }
335
336    #[test]
337    fn test_injector_poll_no_barrier() {
338        let injector = CheckpointBarrierInjector::new();
339        let handle = injector.handle();
340
341        // No barrier pending
342        assert!(handle.poll(0).is_none());
343    }
344
345    #[test]
346    fn test_injector_trigger_and_poll() {
347        let injector = CheckpointBarrierInjector::new();
348        let handle = injector.handle();
349
350        // Trigger barrier
351        injector.trigger(42, flags::FULL_SNAPSHOT);
352        assert_eq!(injector.epoch(), 1);
353
354        // Poll should return the barrier
355        let barrier = handle.poll(1).unwrap();
356        assert_eq!(barrier.checkpoint_id, 42);
357        assert_eq!(barrier.epoch, 1);
358        assert!(barrier.is_full_snapshot());
359
360        // Second poll should return None (already claimed)
361        assert!(handle.poll(1).is_none());
362    }
363
364    #[test]
365    fn test_injector_multiple_handles() {
366        let injector = CheckpointBarrierInjector::new();
367        let handle1 = injector.handle();
368        let handle2 = injector.handle();
369
370        injector.trigger(1, flags::NONE);
371
372        // Only one handle should claim it
373        let r1 = handle1.poll(1);
374        let r2 = handle2.poll(1);
375
376        // Exactly one should succeed
377        assert!(r1.is_some() || r2.is_some());
378        if r1.is_some() {
379            assert!(r2.is_none());
380        }
381    }
382}