Skip to main content

laminar_core/checkpoint/
barrier.rs

1//! Checkpoint barrier protocol for distributed snapshots.
2//!
3//! This module implements the Chandy-Lamport style barrier protocol for
4//! consistent distributed checkpoints. Barriers flow through the dataflow
5//! graph alongside events, triggering state snapshots at each operator.
6//!
7//! ## Protocol Overview
8//!
9//! 1. The coordinator injects a [`CheckpointBarrier`] into all sources
10//! 2. Barriers propagate through operators via [`StreamMessage::Barrier`]
11//! 3. Operators with multiple inputs align barriers before snapshotting
12//! 4. Once all sinks acknowledge, the checkpoint is complete
13//!
14//! ## Fast Path
15//!
16//! The [`CheckpointBarrierInjector`] uses a packed `AtomicU64` command
17//! word for cross-thread signaling. The poll fast path (no pending barrier)
18//! is a single atomic load — typically < 10ns.
19
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23/// Barrier flags — packed into the `flags` field.
24pub mod flags {
25    /// No special behavior.
26    pub const NONE: u64 = 0;
27    /// This barrier requires a full snapshot (not incremental).
28    pub const FULL_SNAPSHOT: u64 = 1 << 0;
29    /// This barrier is the final barrier before shutdown.
30    pub const DRAIN: u64 = 1 << 1;
31    /// Cancel any in-progress checkpoint with this ID.
32    pub const CANCEL: u64 = 1 << 2;
33    /// This barrier was part of an unaligned checkpoint.
34    ///
35    /// Unaligned checkpoints capture in-flight channel data instead of
36    /// blocking on barrier alignment, trading larger checkpoint size for
37    /// faster completion under backpressure.
38    pub const UNALIGNED: u64 = 1 << 3;
39}
40
41/// A checkpoint barrier that flows through the dataflow graph.
42///
43/// This is a 24-byte `#[repr(C)]` value type that can be cheaply copied
44/// and embedded in channel messages. It carries the checkpoint identity
45/// and behavior flags.
46///
47/// ## Layout (24 bytes)
48///
49/// | Field          | Offset | Size | Description                |
50/// |----------------|--------|------|----------------------------|
51/// | checkpoint_id  | 0      | 8    | Unique checkpoint ID       |
52/// | epoch          | 8      | 8    | Monotonic epoch number     |
53/// | flags          | 16     | 8    | Behavior flags (see [`flags`]) |
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55#[repr(C)]
56pub struct CheckpointBarrier {
57    /// Unique identifier for this checkpoint.
58    pub checkpoint_id: u64,
59    /// Monotonically increasing epoch number.
60    pub epoch: u64,
61    /// Behavior flags (see [`flags`] module constants).
62    pub flags: u64,
63}
64
65// Verify the struct is exactly 24 bytes as promised.
66const _: () = assert!(std::mem::size_of::<CheckpointBarrier>() == 24);
67
68impl CheckpointBarrier {
69    /// Create a new barrier with the given checkpoint ID and epoch.
70    #[must_use]
71    pub const fn new(checkpoint_id: u64, epoch: u64) -> Self {
72        Self {
73            checkpoint_id,
74            epoch,
75            flags: flags::NONE,
76        }
77    }
78
79    /// Create a barrier that requests a full snapshot.
80    #[must_use]
81    pub const fn full_snapshot(checkpoint_id: u64, epoch: u64) -> Self {
82        Self {
83            checkpoint_id,
84            epoch,
85            flags: flags::FULL_SNAPSHOT,
86        }
87    }
88
89    /// Check whether this barrier requests a full (non-incremental) snapshot.
90    #[must_use]
91    pub const fn is_full_snapshot(&self) -> bool {
92        self.flags & flags::FULL_SNAPSHOT != 0
93    }
94
95    /// Check whether this barrier signals drain/shutdown.
96    #[must_use]
97    pub const fn is_drain(&self) -> bool {
98        self.flags & flags::DRAIN != 0
99    }
100
101    /// Check whether this barrier cancels an in-progress checkpoint.
102    #[must_use]
103    pub const fn is_cancel(&self) -> bool {
104        self.flags & flags::CANCEL != 0
105    }
106
107    /// Check whether this barrier is from an unaligned checkpoint.
108    #[must_use]
109    pub const fn is_unaligned(&self) -> bool {
110        self.flags & flags::UNALIGNED != 0
111    }
112}
113
114/// A message that flows through streaming channels.
115///
116/// Wraps user events with control messages (watermarks and barriers)
117/// in a single enum. Operators pattern-match on this to handle
118/// data vs. control flow.
119///
120/// ## Generic Parameter
121///
122/// `T` is the event payload type — typically `RecordBatch` or a
123/// domain-specific event struct.
124#[derive(Debug, Clone, PartialEq)]
125pub enum StreamMessage<T> {
126    /// A user data event.
127    Event(T),
128    /// A watermark indicating event-time progress (millis since epoch).
129    Watermark(i64),
130    /// A checkpoint barrier for consistent snapshots.
131    Barrier(CheckpointBarrier),
132}
133
134impl<T> StreamMessage<T> {
135    /// Returns `true` if this is a barrier message.
136    #[must_use]
137    pub const fn is_barrier(&self) -> bool {
138        matches!(self, Self::Barrier(_))
139    }
140
141    /// Returns `true` if this is a watermark message.
142    #[must_use]
143    pub const fn is_watermark(&self) -> bool {
144        matches!(self, Self::Watermark(_))
145    }
146
147    /// Returns `true` if this is a data event.
148    #[must_use]
149    pub const fn is_event(&self) -> bool {
150        matches!(self, Self::Event(_))
151    }
152
153    /// Extracts the barrier if this is a [`StreamMessage::Barrier`].
154    #[must_use]
155    pub const fn as_barrier(&self) -> Option<&CheckpointBarrier> {
156        match self {
157            Self::Barrier(b) => Some(b),
158            _ => None,
159        }
160    }
161}
162
163/// Packed barrier command for cross-thread signaling.
164///
165/// Encodes a `checkpoint_id` (upper 32 bits) and flags (lower 32 bits)
166/// into a single `u64` that can be atomically stored. A value of 0
167/// means "no pending barrier".
168///
169/// ## Encoding
170///
171/// ```text
172/// [  checkpoint_id (32)  |  flags (32)  ]
173///       bits 63..32          bits 31..0
174/// ```
175#[inline]
176const fn pack_barrier_cmd(checkpoint_id: u32, flags: u32) -> u64 {
177    ((checkpoint_id as u64) << 32) | (flags as u64)
178}
179
180/// Unpack a barrier command into (`checkpoint_id`, flags).
181#[inline]
182#[allow(clippy::cast_possible_truncation)]
183const fn unpack_barrier_cmd(packed: u64) -> (u32, u32) {
184    let checkpoint_id = (packed >> 32) as u32;
185    let flags = packed as u32;
186    (checkpoint_id, flags)
187}
188
189/// Cross-thread barrier injector for source operators.
190///
191/// The coordinator thread stores a packed barrier command via
192/// [`trigger`](Self::trigger). Source operators poll via
193/// [`BarrierPollHandle::poll`] on each iteration of their event loop.
194///
195/// ## Fast Path
196///
197/// The poll path is a single `AtomicU64::load(Relaxed)` — typically < 10ns.
198/// Only when a barrier is pending does the source perform a compare-exchange
199/// to claim it.
200#[derive(Debug)]
201pub struct CheckpointBarrierInjector {
202    /// Packed command: 0 = no pending, otherwise (`checkpoint_id` << 32 | flags).
203    cmd: Arc<AtomicU64>,
204    /// The epoch counter, incremented each time a barrier is triggered.
205    epoch: AtomicU64,
206}
207
208impl CheckpointBarrierInjector {
209    /// Create a new injector with no pending barrier.
210    #[must_use]
211    pub fn new() -> Self {
212        Self {
213            cmd: Arc::new(AtomicU64::new(0)),
214            epoch: AtomicU64::new(0),
215        }
216    }
217
218    /// Get a handle that source operators use to poll for barriers.
219    #[must_use]
220    pub fn handle(&self) -> BarrierPollHandle {
221        BarrierPollHandle {
222            cmd: Arc::clone(&self.cmd),
223        }
224    }
225
226    /// Trigger a new checkpoint barrier.
227    ///
228    /// The next [`BarrierPollHandle::poll`] call on any source will
229    /// observe this barrier and return it. If a previous barrier has
230    /// not been consumed, it is superseded — this is intentional for
231    /// the Chandy-Lamport protocol where only the latest checkpoint
232    /// matters.
233    ///
234    /// # Arguments
235    ///
236    /// * `checkpoint_id` - Unique checkpoint ID (must fit in 32 bits)
237    /// * `barrier_flags` - Barrier flags (must fit in 32 bits)
238    ///
239    /// # Panics
240    ///
241    /// Debug-asserts that `checkpoint_id` and `barrier_flags` fit in u32.
242    #[allow(clippy::cast_possible_truncation)]
243    pub fn trigger(&self, checkpoint_id: u64, barrier_flags: u64) {
244        debug_assert!(
245            u32::try_from(checkpoint_id).is_ok(),
246            "checkpoint_id {checkpoint_id} exceeds u32::MAX"
247        );
248        debug_assert!(
249            u32::try_from(barrier_flags).is_ok(),
250            "barrier_flags {barrier_flags:#x} exceeds u32::MAX"
251        );
252        let packed = pack_barrier_cmd(checkpoint_id as u32, barrier_flags as u32);
253        self.cmd.store(packed, Ordering::Release);
254        self.epoch.fetch_add(1, Ordering::Relaxed);
255    }
256
257    /// Get the current epoch (number of barriers triggered).
258    #[must_use]
259    pub fn epoch(&self) -> u64 {
260        self.epoch.load(Ordering::Relaxed)
261    }
262}
263
264impl Default for CheckpointBarrierInjector {
265    fn default() -> Self {
266        Self::new()
267    }
268}
269
270/// Handle used by source operators to poll for pending barriers.
271///
272/// Cloned from [`CheckpointBarrierInjector::handle`] and stored in the
273/// source operator. The fast path is a single atomic load.
274#[derive(Debug, Clone)]
275pub struct BarrierPollHandle {
276    /// Shared packed command word.
277    cmd: Arc<AtomicU64>,
278}
279
280impl BarrierPollHandle {
281    /// Poll for a pending barrier.
282    ///
283    /// Returns `Some(CheckpointBarrier)` if a barrier is pending and
284    /// this call successfully claimed it (exactly-once delivery across
285    /// handles sharing the same injector). Returns `None` if no barrier
286    /// is pending or another handle already claimed it.
287    ///
288    /// The `epoch` parameter is supplied by the caller (typically the
289    /// source operator's current epoch) and is embedded in the returned
290    /// barrier. The injector does not encode the epoch in the atomic
291    /// command word — only checkpoint ID and flags are packed.
292    ///
293    /// ## Performance
294    ///
295    /// Fast path (no barrier): single `load(Relaxed)` — < 10ns.
296    /// Slow path (barrier pending): one `compare_exchange`.
297    #[must_use]
298    pub fn poll(&self, epoch: u64) -> Option<CheckpointBarrier> {
299        // Fast path: no barrier pending
300        let packed = self.cmd.load(Ordering::Relaxed);
301        if packed == 0 {
302            return None;
303        }
304
305        // Barrier pending — try to claim it with compare-exchange
306        if self
307            .cmd
308            .compare_exchange(packed, 0, Ordering::AcqRel, Ordering::Relaxed)
309            .is_ok()
310        {
311            let (checkpoint_id, barrier_flags) = unpack_barrier_cmd(packed);
312            Some(CheckpointBarrier {
313                checkpoint_id: u64::from(checkpoint_id),
314                epoch,
315                flags: u64::from(barrier_flags),
316            })
317        } else {
318            // Another thread claimed it first
319            None
320        }
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_barrier_size() {
330        assert_eq!(std::mem::size_of::<CheckpointBarrier>(), 24);
331    }
332
333    #[test]
334    fn test_barrier_flags() {
335        let barrier = CheckpointBarrier::new(1, 1);
336        assert!(!barrier.is_full_snapshot());
337        assert!(!barrier.is_drain());
338        assert!(!barrier.is_cancel());
339
340        let full = CheckpointBarrier::full_snapshot(1, 1);
341        assert!(full.is_full_snapshot());
342        assert!(!full.is_drain());
343
344        let drain = CheckpointBarrier {
345            checkpoint_id: 1,
346            epoch: 1,
347            flags: flags::DRAIN,
348        };
349        assert!(drain.is_drain());
350    }
351
352    #[test]
353    fn test_pack_unpack_barrier_cmd() {
354        let packed = pack_barrier_cmd(42, 0x03);
355        let (id, flags) = unpack_barrier_cmd(packed);
356        assert_eq!(id, 42);
357        assert_eq!(flags, 0x03);
358
359        // Zero encodes "no command"
360        let (id, flags) = unpack_barrier_cmd(0);
361        assert_eq!(id, 0);
362        assert_eq!(flags, 0);
363    }
364
365    #[test]
366    fn test_stream_message_variants() {
367        let event: StreamMessage<String> = StreamMessage::Event("hello".into());
368        assert!(event.is_event());
369        assert!(!event.is_barrier());
370        assert!(!event.is_watermark());
371
372        let watermark: StreamMessage<String> = StreamMessage::Watermark(1000);
373        assert!(watermark.is_watermark());
374
375        let barrier: StreamMessage<String> = StreamMessage::Barrier(CheckpointBarrier::new(1, 1));
376        assert!(barrier.is_barrier());
377        assert_eq!(barrier.as_barrier().unwrap().checkpoint_id, 1);
378    }
379
380    #[test]
381    fn test_injector_poll_no_barrier() {
382        let injector = CheckpointBarrierInjector::new();
383        let handle = injector.handle();
384
385        // No barrier pending
386        assert!(handle.poll(0).is_none());
387    }
388
389    #[test]
390    fn test_injector_trigger_and_poll() {
391        let injector = CheckpointBarrierInjector::new();
392        let handle = injector.handle();
393
394        // Trigger barrier
395        injector.trigger(42, flags::FULL_SNAPSHOT);
396        assert_eq!(injector.epoch(), 1);
397
398        // Poll should return the barrier
399        let barrier = handle.poll(1).unwrap();
400        assert_eq!(barrier.checkpoint_id, 42);
401        assert_eq!(barrier.epoch, 1);
402        assert!(barrier.is_full_snapshot());
403
404        // Second poll should return None (already claimed)
405        assert!(handle.poll(1).is_none());
406    }
407
408    #[test]
409    fn test_injector_multiple_handles() {
410        let injector = CheckpointBarrierInjector::new();
411        let handle1 = injector.handle();
412        let handle2 = injector.handle();
413
414        injector.trigger(1, flags::NONE);
415
416        // Only one handle should claim it
417        let r1 = handle1.poll(1);
418        let r2 = handle2.poll(1);
419
420        // Exactly one should succeed
421        assert!(r1.is_some() || r2.is_some());
422        if r1.is_some() {
423            assert!(r2.is_none());
424        }
425    }
426}