laminar_core/checkpoint/barrier.rs
1//! Checkpoint barrier protocol.
2//!
3//! The coordinator injects barriers into sources via [`CheckpointBarrierInjector`].
4//! Sources deliver barriers alongside events. The fast path (no pending barrier)
5//! is a single `AtomicU64` load (~10ns).
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10/// Barrier flags — packed into the `flags` field.
11pub mod flags {
12 /// No special behavior.
13 pub const NONE: u64 = 0;
14 /// This barrier requires a full snapshot (not incremental).
15 pub const FULL_SNAPSHOT: u64 = 1 << 0;
16 /// This barrier is the final barrier before shutdown.
17 pub const DRAIN: u64 = 1 << 1;
18 /// Cancel any in-progress checkpoint with this ID.
19 pub const CANCEL: u64 = 1 << 2;
20}
21
22/// A checkpoint barrier that flows through the dataflow graph.
23///
24/// This is a 24-byte `#[repr(C)]` value type that can be cheaply copied
25/// and embedded in channel messages. It carries the checkpoint identity
26/// and behavior flags.
27///
28/// ## Layout (24 bytes)
29///
30/// | Field | Offset | Size | Description |
31/// |----------------|--------|------|----------------------------|
32/// | checkpoint_id | 0 | 8 | Unique checkpoint ID |
33/// | epoch | 8 | 8 | Monotonic epoch number |
34/// | flags | 16 | 8 | Behavior flags (see [`flags`]) |
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36#[repr(C)]
37pub struct CheckpointBarrier {
38 /// Unique identifier for this checkpoint.
39 pub checkpoint_id: u64,
40 /// Monotonically increasing epoch number.
41 pub epoch: u64,
42 /// Behavior flags (see [`flags`] module constants).
43 pub flags: u64,
44}
45
46// Verify the struct is exactly 24 bytes as promised.
47const _: () = assert!(std::mem::size_of::<CheckpointBarrier>() == 24);
48
49impl CheckpointBarrier {
50 /// Create a new barrier with the given checkpoint ID and epoch.
51 #[must_use]
52 pub const fn new(checkpoint_id: u64, epoch: u64) -> Self {
53 Self {
54 checkpoint_id,
55 epoch,
56 flags: flags::NONE,
57 }
58 }
59
60 /// Create a barrier that requests a full snapshot.
61 #[must_use]
62 pub const fn full_snapshot(checkpoint_id: u64, epoch: u64) -> Self {
63 Self {
64 checkpoint_id,
65 epoch,
66 flags: flags::FULL_SNAPSHOT,
67 }
68 }
69
70 /// Check whether this barrier requests a full (non-incremental) snapshot.
71 #[must_use]
72 pub const fn is_full_snapshot(&self) -> bool {
73 self.flags & flags::FULL_SNAPSHOT != 0
74 }
75
76 /// Check whether this barrier signals drain/shutdown.
77 #[must_use]
78 pub const fn is_drain(&self) -> bool {
79 self.flags & flags::DRAIN != 0
80 }
81
82 /// Check whether this barrier cancels an in-progress checkpoint.
83 #[must_use]
84 pub const fn is_cancel(&self) -> bool {
85 self.flags & flags::CANCEL != 0
86 }
87}
88
89/// A message that flows through streaming channels.
90///
91/// Wraps user events with control messages (watermarks and barriers)
92/// in a single enum. Operators pattern-match on this to handle
93/// data vs. control flow.
94///
95/// ## Generic Parameter
96///
97/// `T` is the event payload type — typically `RecordBatch` or a
98/// domain-specific event struct.
99#[derive(Debug, Clone, PartialEq)]
100pub enum StreamMessage<T> {
101 /// A user data event.
102 Event(T),
103 /// A watermark indicating event-time progress (millis since epoch).
104 Watermark(i64),
105 /// A checkpoint barrier for consistent snapshots.
106 Barrier(CheckpointBarrier),
107}
108
109impl<T> StreamMessage<T> {
110 /// Returns `true` if this is a barrier message.
111 #[must_use]
112 pub const fn is_barrier(&self) -> bool {
113 matches!(self, Self::Barrier(_))
114 }
115
116 /// Returns `true` if this is a watermark message.
117 #[must_use]
118 pub const fn is_watermark(&self) -> bool {
119 matches!(self, Self::Watermark(_))
120 }
121
122 /// Returns `true` if this is a data event.
123 #[must_use]
124 pub const fn is_event(&self) -> bool {
125 matches!(self, Self::Event(_))
126 }
127
128 /// Extracts the barrier if this is a [`StreamMessage::Barrier`].
129 #[must_use]
130 pub const fn as_barrier(&self) -> Option<&CheckpointBarrier> {
131 match self {
132 Self::Barrier(b) => Some(b),
133 _ => None,
134 }
135 }
136}
137
138/// Cross-thread barrier injector for source operators.
139///
140/// The coordinator thread stores a packed barrier command via
141/// [`trigger`](Self::trigger). Source operators poll via
142/// [`BarrierPollHandle::poll`] on each iteration of their event loop.
143///
144/// ## Fast Path
145///
146/// The poll path is a single `AtomicU64::load(Relaxed)` — typically < 10ns.
147/// Only when a barrier is pending does the source perform a compare-exchange
148/// to claim it.
149#[derive(Debug)]
150pub struct CheckpointBarrierInjector {
151 /// Packed command: 0 = no pending, otherwise (`checkpoint_id` << 32 | flags).
152 cmd: Arc<AtomicU64>,
153 /// The epoch counter, incremented each time a barrier is triggered.
154 epoch: AtomicU64,
155}
156
157impl CheckpointBarrierInjector {
158 /// Create a new injector with no pending barrier.
159 #[must_use]
160 pub fn new() -> Self {
161 Self {
162 cmd: Arc::new(AtomicU64::new(0)),
163 epoch: AtomicU64::new(0),
164 }
165 }
166
167 /// Get a handle that source operators use to poll for barriers.
168 #[must_use]
169 pub fn handle(&self) -> BarrierPollHandle {
170 BarrierPollHandle {
171 cmd: Arc::clone(&self.cmd),
172 }
173 }
174
175 /// Trigger a new checkpoint barrier.
176 ///
177 /// The next [`BarrierPollHandle::poll`] call on any source will
178 /// observe this barrier and return it. If a previous barrier has
179 /// not been consumed, it is superseded — this is intentional for
180 /// the Chandy-Lamport protocol where only the latest checkpoint
181 /// matters.
182 ///
183 /// # Arguments
184 ///
185 /// * `checkpoint_id` - Unique checkpoint ID (must fit in 32 bits)
186 /// * `barrier_flags` - Barrier flags (must fit in 32 bits)
187 ///
188 /// # Panics
189 ///
190 /// Debug-asserts that `checkpoint_id` and `barrier_flags` fit in u32.
191 #[allow(clippy::cast_possible_truncation)]
192 pub fn trigger(&self, checkpoint_id: u64, barrier_flags: u64) {
193 debug_assert!(
194 u32::try_from(checkpoint_id).is_ok(),
195 "checkpoint_id {checkpoint_id} exceeds u32::MAX"
196 );
197 debug_assert!(
198 u32::try_from(barrier_flags).is_ok(),
199 "barrier_flags {barrier_flags:#x} exceeds u32::MAX"
200 );
201 // Pack checkpoint_id (upper 32) | flags (lower 32). 0 = no barrier.
202 let packed = (u64::from(checkpoint_id as u32) << 32) | u64::from(barrier_flags as u32);
203 self.cmd.store(packed, Ordering::Release);
204 self.epoch.fetch_add(1, Ordering::Relaxed);
205 }
206
207 /// Get the current epoch (number of barriers triggered).
208 #[must_use]
209 pub fn epoch(&self) -> u64 {
210 self.epoch.load(Ordering::Relaxed)
211 }
212}
213
214impl Default for CheckpointBarrierInjector {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220/// Handle used by source operators to poll for pending barriers.
221///
222/// Cloned from [`CheckpointBarrierInjector::handle`] and stored in the
223/// source operator. The fast path is a single atomic load.
224#[derive(Debug, Clone)]
225pub struct BarrierPollHandle {
226 /// Shared packed command word.
227 cmd: Arc<AtomicU64>,
228}
229
230impl BarrierPollHandle {
231 /// Poll for a pending barrier.
232 ///
233 /// Returns `Some(CheckpointBarrier)` if a barrier is pending and
234 /// this call successfully claimed it (exactly-once delivery across
235 /// handles sharing the same injector). Returns `None` if no barrier
236 /// is pending or another handle already claimed it.
237 ///
238 /// The `epoch` parameter is supplied by the caller (typically the
239 /// source operator's current epoch) and is embedded in the returned
240 /// barrier. The injector does not encode the epoch in the atomic
241 /// command word — only checkpoint ID and flags are packed.
242 ///
243 /// ## Performance
244 ///
245 /// Fast path (no barrier): single `load(Relaxed)` — < 10ns.
246 /// Slow path (barrier pending): one `compare_exchange`.
247 #[must_use]
248 pub fn poll(&self, epoch: u64) -> Option<CheckpointBarrier> {
249 // Fast path: no barrier pending
250 let packed = self.cmd.load(Ordering::Relaxed);
251 if packed == 0 {
252 return None;
253 }
254
255 // Barrier pending — try to claim it with compare-exchange
256 if self
257 .cmd
258 .compare_exchange(packed, 0, Ordering::AcqRel, Ordering::Relaxed)
259 .is_ok()
260 {
261 Some(CheckpointBarrier {
262 checkpoint_id: packed >> 32,
263 epoch,
264 flags: packed & 0xFFFF_FFFF,
265 })
266 } else {
267 // Another thread claimed it first
268 None
269 }
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_barrier_size() {
279 assert_eq!(std::mem::size_of::<CheckpointBarrier>(), 24);
280 }
281
282 #[test]
283 fn test_barrier_flags() {
284 let barrier = CheckpointBarrier::new(1, 1);
285 assert!(!barrier.is_full_snapshot());
286 assert!(!barrier.is_drain());
287 assert!(!barrier.is_cancel());
288
289 let full = CheckpointBarrier::full_snapshot(1, 1);
290 assert!(full.is_full_snapshot());
291 assert!(!full.is_drain());
292
293 let drain = CheckpointBarrier {
294 checkpoint_id: 1,
295 epoch: 1,
296 flags: flags::DRAIN,
297 };
298 assert!(drain.is_drain());
299 }
300
301 #[test]
302 fn test_barrier_roundtrip_via_injector() {
303 let injector = CheckpointBarrierInjector::new();
304 let handle = injector.handle();
305 injector.trigger(42, flags::DRAIN);
306 let barrier = handle.poll(0).expect("barrier should be available");
307 assert_eq!(barrier.checkpoint_id, 42);
308 assert_eq!(barrier.flags, flags::DRAIN);
309 assert!(handle.poll(1).is_none(), "cleared after one poll");
310 }
311
312 #[test]
313 fn test_stream_message_variants() {
314 let event: StreamMessage<String> = StreamMessage::Event("hello".into());
315 assert!(event.is_event());
316 assert!(!event.is_barrier());
317 assert!(!event.is_watermark());
318
319 let watermark: StreamMessage<String> = StreamMessage::Watermark(1000);
320 assert!(watermark.is_watermark());
321
322 let barrier: StreamMessage<String> = StreamMessage::Barrier(CheckpointBarrier::new(1, 1));
323 assert!(barrier.is_barrier());
324 assert_eq!(barrier.as_barrier().unwrap().checkpoint_id, 1);
325 }
326
327 #[test]
328 fn test_injector_poll_no_barrier() {
329 let injector = CheckpointBarrierInjector::new();
330 let handle = injector.handle();
331
332 // No barrier pending
333 assert!(handle.poll(0).is_none());
334 }
335
336 #[test]
337 fn test_injector_trigger_and_poll() {
338 let injector = CheckpointBarrierInjector::new();
339 let handle = injector.handle();
340
341 // Trigger barrier
342 injector.trigger(42, flags::FULL_SNAPSHOT);
343 assert_eq!(injector.epoch(), 1);
344
345 // Poll should return the barrier
346 let barrier = handle.poll(1).unwrap();
347 assert_eq!(barrier.checkpoint_id, 42);
348 assert_eq!(barrier.epoch, 1);
349 assert!(barrier.is_full_snapshot());
350
351 // Second poll should return None (already claimed)
352 assert!(handle.poll(1).is_none());
353 }
354
355 #[test]
356 fn test_injector_multiple_handles() {
357 let injector = CheckpointBarrierInjector::new();
358 let handle1 = injector.handle();
359 let handle2 = injector.handle();
360
361 injector.trigger(1, flags::NONE);
362
363 // Only one handle should claim it
364 let r1 = handle1.poll(1);
365 let r2 = handle2.poll(1);
366
367 // Exactly one should succeed
368 assert!(r1.is_some() || r2.is_some());
369 if r1.is_some() {
370 assert!(r2.is_none());
371 }
372 }
373}