laminar_core/checkpoint/barrier.rs
1//! Checkpoint barrier protocol for distributed snapshots.
2//!
3//! This module implements the Chandy-Lamport style barrier protocol for
4//! consistent distributed checkpoints. Barriers flow through the dataflow
5//! graph alongside events, triggering state snapshots at each operator.
6//!
7//! ## Protocol Overview
8//!
9//! 1. The coordinator injects a [`CheckpointBarrier`] into all sources
10//! 2. Barriers propagate through operators via [`StreamMessage::Barrier`]
11//! 3. Operators with multiple inputs align barriers before snapshotting
12//! 4. Once all sinks acknowledge, the checkpoint is complete
13//!
14//! ## Fast Path
15//!
16//! The [`CheckpointBarrierInjector`] uses a packed `AtomicU64` command
17//! word for cross-thread signaling. The poll fast path (no pending barrier)
18//! is a single atomic load — typically < 10ns.
19
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23/// Barrier flags — packed into the `flags` field.
24pub mod flags {
25 /// No special behavior.
26 pub const NONE: u64 = 0;
27 /// This barrier requires a full snapshot (not incremental).
28 pub const FULL_SNAPSHOT: u64 = 1 << 0;
29 /// This barrier is the final barrier before shutdown.
30 pub const DRAIN: u64 = 1 << 1;
31 /// Cancel any in-progress checkpoint with this ID.
32 pub const CANCEL: u64 = 1 << 2;
33 /// This barrier was part of an unaligned checkpoint.
34 ///
35 /// Unaligned checkpoints capture in-flight channel data instead of
36 /// blocking on barrier alignment, trading larger checkpoint size for
37 /// faster completion under backpressure.
38 pub const UNALIGNED: u64 = 1 << 3;
39}
40
41/// A checkpoint barrier that flows through the dataflow graph.
42///
43/// This is a 24-byte `#[repr(C)]` value type that can be cheaply copied
44/// and embedded in channel messages. It carries the checkpoint identity
45/// and behavior flags.
46///
47/// ## Layout (24 bytes)
48///
49/// | Field | Offset | Size | Description |
50/// |----------------|--------|------|----------------------------|
51/// | checkpoint_id | 0 | 8 | Unique checkpoint ID |
52/// | epoch | 8 | 8 | Monotonic epoch number |
53/// | flags | 16 | 8 | Behavior flags (see [`flags`]) |
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55#[repr(C)]
56pub struct CheckpointBarrier {
57 /// Unique identifier for this checkpoint.
58 pub checkpoint_id: u64,
59 /// Monotonically increasing epoch number.
60 pub epoch: u64,
61 /// Behavior flags (see [`flags`] module constants).
62 pub flags: u64,
63}
64
65// Verify the struct is exactly 24 bytes as promised.
66const _: () = assert!(std::mem::size_of::<CheckpointBarrier>() == 24);
67
68impl CheckpointBarrier {
69 /// Create a new barrier with the given checkpoint ID and epoch.
70 #[must_use]
71 pub const fn new(checkpoint_id: u64, epoch: u64) -> Self {
72 Self {
73 checkpoint_id,
74 epoch,
75 flags: flags::NONE,
76 }
77 }
78
79 /// Create a barrier that requests a full snapshot.
80 #[must_use]
81 pub const fn full_snapshot(checkpoint_id: u64, epoch: u64) -> Self {
82 Self {
83 checkpoint_id,
84 epoch,
85 flags: flags::FULL_SNAPSHOT,
86 }
87 }
88
89 /// Check whether this barrier requests a full (non-incremental) snapshot.
90 #[must_use]
91 pub const fn is_full_snapshot(&self) -> bool {
92 self.flags & flags::FULL_SNAPSHOT != 0
93 }
94
95 /// Check whether this barrier signals drain/shutdown.
96 #[must_use]
97 pub const fn is_drain(&self) -> bool {
98 self.flags & flags::DRAIN != 0
99 }
100
101 /// Check whether this barrier cancels an in-progress checkpoint.
102 #[must_use]
103 pub const fn is_cancel(&self) -> bool {
104 self.flags & flags::CANCEL != 0
105 }
106
107 /// Check whether this barrier is from an unaligned checkpoint.
108 #[must_use]
109 pub const fn is_unaligned(&self) -> bool {
110 self.flags & flags::UNALIGNED != 0
111 }
112}
113
114/// A message that flows through streaming channels.
115///
116/// Wraps user events with control messages (watermarks and barriers)
117/// in a single enum. Operators pattern-match on this to handle
118/// data vs. control flow.
119///
120/// ## Generic Parameter
121///
122/// `T` is the event payload type — typically `RecordBatch` or a
123/// domain-specific event struct.
124#[derive(Debug, Clone, PartialEq)]
125pub enum StreamMessage<T> {
126 /// A user data event.
127 Event(T),
128 /// A watermark indicating event-time progress (millis since epoch).
129 Watermark(i64),
130 /// A checkpoint barrier for consistent snapshots.
131 Barrier(CheckpointBarrier),
132}
133
134impl<T> StreamMessage<T> {
135 /// Returns `true` if this is a barrier message.
136 #[must_use]
137 pub const fn is_barrier(&self) -> bool {
138 matches!(self, Self::Barrier(_))
139 }
140
141 /// Returns `true` if this is a watermark message.
142 #[must_use]
143 pub const fn is_watermark(&self) -> bool {
144 matches!(self, Self::Watermark(_))
145 }
146
147 /// Returns `true` if this is a data event.
148 #[must_use]
149 pub const fn is_event(&self) -> bool {
150 matches!(self, Self::Event(_))
151 }
152
153 /// Extracts the barrier if this is a [`StreamMessage::Barrier`].
154 #[must_use]
155 pub const fn as_barrier(&self) -> Option<&CheckpointBarrier> {
156 match self {
157 Self::Barrier(b) => Some(b),
158 _ => None,
159 }
160 }
161}
162
163/// Packed barrier command for cross-thread signaling.
164///
165/// Encodes a `checkpoint_id` (upper 32 bits) and flags (lower 32 bits)
166/// into a single `u64` that can be atomically stored. A value of 0
167/// means "no pending barrier".
168///
169/// ## Encoding
170///
171/// ```text
172/// [ checkpoint_id (32) | flags (32) ]
173/// bits 63..32 bits 31..0
174/// ```
175#[inline]
176const fn pack_barrier_cmd(checkpoint_id: u32, flags: u32) -> u64 {
177 ((checkpoint_id as u64) << 32) | (flags as u64)
178}
179
180/// Unpack a barrier command into (`checkpoint_id`, flags).
181#[inline]
182#[allow(clippy::cast_possible_truncation)]
183const fn unpack_barrier_cmd(packed: u64) -> (u32, u32) {
184 let checkpoint_id = (packed >> 32) as u32;
185 let flags = packed as u32;
186 (checkpoint_id, flags)
187}
188
189/// Cross-thread barrier injector for source operators.
190///
191/// The coordinator thread stores a packed barrier command via
192/// [`trigger`](Self::trigger). Source operators poll via
193/// [`BarrierPollHandle::poll`] on each iteration of their event loop.
194///
195/// ## Fast Path
196///
197/// The poll path is a single `AtomicU64::load(Relaxed)` — typically < 10ns.
198/// Only when a barrier is pending does the source perform a compare-exchange
199/// to claim it.
200#[derive(Debug)]
201pub struct CheckpointBarrierInjector {
202 /// Packed command: 0 = no pending, otherwise (`checkpoint_id` << 32 | flags).
203 cmd: Arc<AtomicU64>,
204 /// The epoch counter, incremented each time a barrier is triggered.
205 epoch: AtomicU64,
206}
207
208impl CheckpointBarrierInjector {
209 /// Create a new injector with no pending barrier.
210 #[must_use]
211 pub fn new() -> Self {
212 Self {
213 cmd: Arc::new(AtomicU64::new(0)),
214 epoch: AtomicU64::new(0),
215 }
216 }
217
218 /// Get a handle that source operators use to poll for barriers.
219 #[must_use]
220 pub fn handle(&self) -> BarrierPollHandle {
221 BarrierPollHandle {
222 cmd: Arc::clone(&self.cmd),
223 }
224 }
225
226 /// Trigger a new checkpoint barrier.
227 ///
228 /// The next [`BarrierPollHandle::poll`] call on any source will
229 /// observe this barrier and return it. If a previous barrier has
230 /// not been consumed, it is superseded — this is intentional for
231 /// the Chandy-Lamport protocol where only the latest checkpoint
232 /// matters.
233 ///
234 /// # Arguments
235 ///
236 /// * `checkpoint_id` - Unique checkpoint ID (must fit in 32 bits)
237 /// * `barrier_flags` - Barrier flags (must fit in 32 bits)
238 ///
239 /// # Panics
240 ///
241 /// Debug-asserts that `checkpoint_id` and `barrier_flags` fit in u32.
242 #[allow(clippy::cast_possible_truncation)]
243 pub fn trigger(&self, checkpoint_id: u64, barrier_flags: u64) {
244 debug_assert!(
245 u32::try_from(checkpoint_id).is_ok(),
246 "checkpoint_id {checkpoint_id} exceeds u32::MAX"
247 );
248 debug_assert!(
249 u32::try_from(barrier_flags).is_ok(),
250 "barrier_flags {barrier_flags:#x} exceeds u32::MAX"
251 );
252 let packed = pack_barrier_cmd(checkpoint_id as u32, barrier_flags as u32);
253 self.cmd.store(packed, Ordering::Release);
254 self.epoch.fetch_add(1, Ordering::Relaxed);
255 }
256
257 /// Get the current epoch (number of barriers triggered).
258 #[must_use]
259 pub fn epoch(&self) -> u64 {
260 self.epoch.load(Ordering::Relaxed)
261 }
262}
263
264impl Default for CheckpointBarrierInjector {
265 fn default() -> Self {
266 Self::new()
267 }
268}
269
270/// Handle used by source operators to poll for pending barriers.
271///
272/// Cloned from [`CheckpointBarrierInjector::handle`] and stored in the
273/// source operator. The fast path is a single atomic load.
274#[derive(Debug, Clone)]
275pub struct BarrierPollHandle {
276 /// Shared packed command word.
277 cmd: Arc<AtomicU64>,
278}
279
280impl BarrierPollHandle {
281 /// Poll for a pending barrier.
282 ///
283 /// Returns `Some(CheckpointBarrier)` if a barrier is pending and
284 /// this call successfully claimed it (exactly-once delivery across
285 /// handles sharing the same injector). Returns `None` if no barrier
286 /// is pending or another handle already claimed it.
287 ///
288 /// The `epoch` parameter is supplied by the caller (typically the
289 /// source operator's current epoch) and is embedded in the returned
290 /// barrier. The injector does not encode the epoch in the atomic
291 /// command word — only checkpoint ID and flags are packed.
292 ///
293 /// ## Performance
294 ///
295 /// Fast path (no barrier): single `load(Relaxed)` — < 10ns.
296 /// Slow path (barrier pending): one `compare_exchange`.
297 #[must_use]
298 pub fn poll(&self, epoch: u64) -> Option<CheckpointBarrier> {
299 // Fast path: no barrier pending
300 let packed = self.cmd.load(Ordering::Relaxed);
301 if packed == 0 {
302 return None;
303 }
304
305 // Barrier pending — try to claim it with compare-exchange
306 if self
307 .cmd
308 .compare_exchange(packed, 0, Ordering::AcqRel, Ordering::Relaxed)
309 .is_ok()
310 {
311 let (checkpoint_id, barrier_flags) = unpack_barrier_cmd(packed);
312 Some(CheckpointBarrier {
313 checkpoint_id: u64::from(checkpoint_id),
314 epoch,
315 flags: u64::from(barrier_flags),
316 })
317 } else {
318 // Another thread claimed it first
319 None
320 }
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_barrier_size() {
330 assert_eq!(std::mem::size_of::<CheckpointBarrier>(), 24);
331 }
332
333 #[test]
334 fn test_barrier_flags() {
335 let barrier = CheckpointBarrier::new(1, 1);
336 assert!(!barrier.is_full_snapshot());
337 assert!(!barrier.is_drain());
338 assert!(!barrier.is_cancel());
339
340 let full = CheckpointBarrier::full_snapshot(1, 1);
341 assert!(full.is_full_snapshot());
342 assert!(!full.is_drain());
343
344 let drain = CheckpointBarrier {
345 checkpoint_id: 1,
346 epoch: 1,
347 flags: flags::DRAIN,
348 };
349 assert!(drain.is_drain());
350 }
351
352 #[test]
353 fn test_pack_unpack_barrier_cmd() {
354 let packed = pack_barrier_cmd(42, 0x03);
355 let (id, flags) = unpack_barrier_cmd(packed);
356 assert_eq!(id, 42);
357 assert_eq!(flags, 0x03);
358
359 // Zero encodes "no command"
360 let (id, flags) = unpack_barrier_cmd(0);
361 assert_eq!(id, 0);
362 assert_eq!(flags, 0);
363 }
364
365 #[test]
366 fn test_stream_message_variants() {
367 let event: StreamMessage<String> = StreamMessage::Event("hello".into());
368 assert!(event.is_event());
369 assert!(!event.is_barrier());
370 assert!(!event.is_watermark());
371
372 let watermark: StreamMessage<String> = StreamMessage::Watermark(1000);
373 assert!(watermark.is_watermark());
374
375 let barrier: StreamMessage<String> = StreamMessage::Barrier(CheckpointBarrier::new(1, 1));
376 assert!(barrier.is_barrier());
377 assert_eq!(barrier.as_barrier().unwrap().checkpoint_id, 1);
378 }
379
380 #[test]
381 fn test_injector_poll_no_barrier() {
382 let injector = CheckpointBarrierInjector::new();
383 let handle = injector.handle();
384
385 // No barrier pending
386 assert!(handle.poll(0).is_none());
387 }
388
389 #[test]
390 fn test_injector_trigger_and_poll() {
391 let injector = CheckpointBarrierInjector::new();
392 let handle = injector.handle();
393
394 // Trigger barrier
395 injector.trigger(42, flags::FULL_SNAPSHOT);
396 assert_eq!(injector.epoch(), 1);
397
398 // Poll should return the barrier
399 let barrier = handle.poll(1).unwrap();
400 assert_eq!(barrier.checkpoint_id, 42);
401 assert_eq!(barrier.epoch, 1);
402 assert!(barrier.is_full_snapshot());
403
404 // Second poll should return None (already claimed)
405 assert!(handle.poll(1).is_none());
406 }
407
408 #[test]
409 fn test_injector_multiple_handles() {
410 let injector = CheckpointBarrierInjector::new();
411 let handle1 = injector.handle();
412 let handle2 = injector.handle();
413
414 injector.trigger(1, flags::NONE);
415
416 // Only one handle should claim it
417 let r1 = handle1.poll(1);
418 let r2 = handle2.poll(1);
419
420 // Exactly one should succeed
421 assert!(r1.is_some() || r2.is_some());
422 if r1.is_some() {
423 assert!(r2.is_none());
424 }
425 }
426}