laminar_core/checkpoint/barrier.rs
1//! Checkpoint barrier protocol.
2//!
3//! The coordinator injects barriers into sources via [`CheckpointBarrierInjector`].
4//! Sources deliver barriers alongside events. The fast path (no pending barrier)
5//! is a single `AtomicU64` load (~10ns).
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10/// Barrier flags — packed into the `flags` field.
11pub mod flags {
12 /// No special behavior.
13 pub const NONE: u64 = 0;
14 /// This barrier requires a full snapshot (not incremental).
15 pub const FULL_SNAPSHOT: u64 = 1 << 0;
16 /// This barrier is the final barrier before shutdown.
17 pub const DRAIN: u64 = 1 << 1;
18 /// Cancel any in-progress checkpoint with this ID.
19 pub const CANCEL: u64 = 1 << 2;
20}
21
22/// A checkpoint barrier that flows through the dataflow graph.
23///
24/// This is a 24-byte `#[repr(C)]` value type that can be cheaply copied
25/// and embedded in channel messages. It carries the checkpoint identity
26/// and behavior flags.
27///
28/// ## Layout (24 bytes)
29///
30/// | Field | Offset | Size | Description |
31/// |----------------|--------|------|----------------------------|
32/// | checkpoint_id | 0 | 8 | Unique checkpoint ID |
33/// | epoch | 8 | 8 | Monotonic epoch number |
34/// | flags | 16 | 8 | Behavior flags (see [`flags`]) |
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36#[repr(C)]
37pub struct CheckpointBarrier {
38 /// Unique identifier for this checkpoint.
39 pub checkpoint_id: u64,
40 /// Monotonically increasing epoch number.
41 pub epoch: u64,
42 /// Behavior flags (see [`flags`] module constants).
43 pub flags: u64,
44}
45
46// Verify the struct is exactly 24 bytes as promised.
47const _: () = assert!(std::mem::size_of::<CheckpointBarrier>() == 24);
48
49impl CheckpointBarrier {
50 /// Create a new barrier with the given checkpoint ID and epoch.
51 #[must_use]
52 pub const fn new(checkpoint_id: u64, epoch: u64) -> Self {
53 Self {
54 checkpoint_id,
55 epoch,
56 flags: flags::NONE,
57 }
58 }
59
60 /// Create a barrier that requests a full snapshot.
61 #[must_use]
62 pub const fn full_snapshot(checkpoint_id: u64, epoch: u64) -> Self {
63 Self {
64 checkpoint_id,
65 epoch,
66 flags: flags::FULL_SNAPSHOT,
67 }
68 }
69
70 /// Check whether this barrier requests a full (non-incremental) snapshot.
71 #[must_use]
72 pub const fn is_full_snapshot(&self) -> bool {
73 self.flags & flags::FULL_SNAPSHOT != 0
74 }
75
76 /// Check whether this barrier signals drain/shutdown.
77 #[must_use]
78 pub const fn is_drain(&self) -> bool {
79 self.flags & flags::DRAIN != 0
80 }
81
82 /// Check whether this barrier cancels an in-progress checkpoint.
83 #[must_use]
84 pub const fn is_cancel(&self) -> bool {
85 self.flags & flags::CANCEL != 0
86 }
87}
88
89/// A message that flows through streaming channels.
90///
91/// Wraps user events with control messages (watermarks and barriers)
92/// in a single enum. Operators pattern-match on this to handle
93/// data vs. control flow.
94///
95/// ## Generic Parameter
96///
97/// `T` is the event payload type — typically `RecordBatch` or a
98/// domain-specific event struct.
99#[derive(Debug, Clone, PartialEq)]
100pub enum StreamMessage<T> {
101 /// A user data event.
102 Event(T),
103 /// A watermark indicating event-time progress (millis since epoch).
104 Watermark(i64),
105 /// A checkpoint barrier for consistent snapshots.
106 Barrier(CheckpointBarrier),
107}
108
109impl<T> StreamMessage<T> {
110 /// Returns `true` if this is a barrier message.
111 #[must_use]
112 pub const fn is_barrier(&self) -> bool {
113 matches!(self, Self::Barrier(_))
114 }
115
116 /// Returns `true` if this is a watermark message.
117 #[must_use]
118 pub const fn is_watermark(&self) -> bool {
119 matches!(self, Self::Watermark(_))
120 }
121
122 /// Returns `true` if this is a data event.
123 #[must_use]
124 pub const fn is_event(&self) -> bool {
125 matches!(self, Self::Event(_))
126 }
127
128 /// Extracts the barrier if this is a [`StreamMessage::Barrier`].
129 #[must_use]
130 pub const fn as_barrier(&self) -> Option<&CheckpointBarrier> {
131 match self {
132 Self::Barrier(b) => Some(b),
133 _ => None,
134 }
135 }
136}
137
138/// Cross-thread barrier injector for source operators.
139///
140/// The coordinator thread stores a packed barrier command via
141/// [`trigger`](Self::trigger). Source operators poll via
142/// [`BarrierPollHandle::poll`] on each iteration of their event loop.
143///
144/// ## Fast Path
145///
146/// The poll path is a single `AtomicU64::load(Relaxed)` — typically < 10ns.
147/// Only when a barrier is pending does the source perform a compare-exchange
148/// to claim it.
149#[derive(Debug)]
150pub struct CheckpointBarrierInjector {
151 /// Packed command: 0 = no pending, otherwise (`checkpoint_id` << 32 | flags).
152 cmd: Arc<AtomicU64>,
153 /// The epoch counter, incremented each time a barrier is triggered.
154 epoch: Arc<AtomicU64>,
155}
156
157impl CheckpointBarrierInjector {
158 /// Create a new injector with no pending barrier.
159 #[must_use]
160 pub fn new() -> Self {
161 Self {
162 cmd: Arc::new(AtomicU64::new(0)),
163 epoch: Arc::new(AtomicU64::new(0)),
164 }
165 }
166
167 /// Get a handle that source operators use to poll for barriers.
168 #[must_use]
169 pub fn handle(&self) -> BarrierPollHandle {
170 BarrierPollHandle {
171 cmd: Arc::clone(&self.cmd),
172 }
173 }
174
175 /// Trigger a new checkpoint barrier.
176 ///
177 /// The next [`BarrierPollHandle::poll`] call on any source will
178 /// observe this barrier and return it. If a previous barrier has
179 /// not been consumed, it is superseded — this is intentional for
180 /// the Chandy-Lamport protocol where only the latest checkpoint
181 /// matters.
182 ///
183 /// # Arguments
184 ///
185 /// * `checkpoint_id` - Unique checkpoint ID (must fit in 32 bits)
186 /// * `barrier_flags` - Barrier flags (must fit in 32 bits)
187 ///
188 /// # Panics
189 ///
190 /// Debug-asserts that `checkpoint_id` and `barrier_flags` fit in u32.
191 #[allow(clippy::cast_possible_truncation)]
192 pub fn trigger(&self, checkpoint_id: u64, barrier_flags: u64) {
193 debug_assert!(
194 u32::try_from(checkpoint_id).is_ok(),
195 "checkpoint_id {checkpoint_id} exceeds u32::MAX"
196 );
197 debug_assert!(
198 u32::try_from(barrier_flags).is_ok(),
199 "barrier_flags {barrier_flags:#x} exceeds u32::MAX"
200 );
201 // Pack checkpoint_id (upper 32) | flags (lower 32). 0 = no barrier.
202 let packed = (u64::from(checkpoint_id as u32) << 32) | u64::from(barrier_flags as u32);
203 self.cmd.store(packed, Ordering::Release);
204 self.epoch.fetch_add(1, Ordering::Relaxed);
205 }
206
207 /// Get the current epoch (number of barriers triggered).
208 #[must_use]
209 pub fn epoch(&self) -> u64 {
210 self.epoch.load(Ordering::Relaxed)
211 }
212}
213
214impl Default for CheckpointBarrierInjector {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220impl Clone for CheckpointBarrierInjector {
221 fn clone(&self) -> Self {
222 Self {
223 cmd: Arc::clone(&self.cmd),
224 epoch: Arc::clone(&self.epoch),
225 }
226 }
227}
228
229/// Handle used by source operators to poll for pending barriers.
230///
231/// Cloned from [`CheckpointBarrierInjector::handle`] and stored in the
232/// source operator. The fast path is a single atomic load.
233#[derive(Debug, Clone)]
234pub struct BarrierPollHandle {
235 /// Shared packed command word.
236 cmd: Arc<AtomicU64>,
237}
238
239impl BarrierPollHandle {
240 /// Poll for a pending barrier.
241 ///
242 /// Returns `Some(CheckpointBarrier)` if a barrier is pending and
243 /// this call successfully claimed it (exactly-once delivery across
244 /// handles sharing the same injector). Returns `None` if no barrier
245 /// is pending or another handle already claimed it.
246 ///
247 /// The `epoch` parameter is supplied by the caller (typically the
248 /// source operator's current epoch) and is embedded in the returned
249 /// barrier. The injector does not encode the epoch in the atomic
250 /// command word — only checkpoint ID and flags are packed.
251 ///
252 /// ## Performance
253 ///
254 /// Fast path (no barrier): single `load(Relaxed)` — < 10ns.
255 /// Slow path (barrier pending): one `compare_exchange`.
256 #[must_use]
257 pub fn poll(&self, epoch: u64) -> Option<CheckpointBarrier> {
258 // Fast path: no barrier pending
259 let packed = self.cmd.load(Ordering::Relaxed);
260 if packed == 0 {
261 return None;
262 }
263
264 // Barrier pending — try to claim it with compare-exchange
265 if self
266 .cmd
267 .compare_exchange(packed, 0, Ordering::AcqRel, Ordering::Relaxed)
268 .is_ok()
269 {
270 Some(CheckpointBarrier {
271 checkpoint_id: packed >> 32,
272 epoch,
273 flags: packed & 0xFFFF_FFFF,
274 })
275 } else {
276 // Another thread claimed it first
277 None
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn test_barrier_size() {
288 assert_eq!(std::mem::size_of::<CheckpointBarrier>(), 24);
289 }
290
291 #[test]
292 fn test_barrier_flags() {
293 let barrier = CheckpointBarrier::new(1, 1);
294 assert!(!barrier.is_full_snapshot());
295 assert!(!barrier.is_drain());
296 assert!(!barrier.is_cancel());
297
298 let full = CheckpointBarrier::full_snapshot(1, 1);
299 assert!(full.is_full_snapshot());
300 assert!(!full.is_drain());
301
302 let drain = CheckpointBarrier {
303 checkpoint_id: 1,
304 epoch: 1,
305 flags: flags::DRAIN,
306 };
307 assert!(drain.is_drain());
308 }
309
310 #[test]
311 fn test_barrier_roundtrip_via_injector() {
312 let injector = CheckpointBarrierInjector::new();
313 let handle = injector.handle();
314 injector.trigger(42, flags::DRAIN);
315 let barrier = handle.poll(0).expect("barrier should be available");
316 assert_eq!(barrier.checkpoint_id, 42);
317 assert_eq!(barrier.flags, flags::DRAIN);
318 assert!(handle.poll(1).is_none(), "cleared after one poll");
319 }
320
321 #[test]
322 fn test_stream_message_variants() {
323 let event: StreamMessage<String> = StreamMessage::Event("hello".into());
324 assert!(event.is_event());
325 assert!(!event.is_barrier());
326 assert!(!event.is_watermark());
327
328 let watermark: StreamMessage<String> = StreamMessage::Watermark(1000);
329 assert!(watermark.is_watermark());
330
331 let barrier: StreamMessage<String> = StreamMessage::Barrier(CheckpointBarrier::new(1, 1));
332 assert!(barrier.is_barrier());
333 assert_eq!(barrier.as_barrier().unwrap().checkpoint_id, 1);
334 }
335
336 #[test]
337 fn test_injector_poll_no_barrier() {
338 let injector = CheckpointBarrierInjector::new();
339 let handle = injector.handle();
340
341 // No barrier pending
342 assert!(handle.poll(0).is_none());
343 }
344
345 #[test]
346 fn test_injector_trigger_and_poll() {
347 let injector = CheckpointBarrierInjector::new();
348 let handle = injector.handle();
349
350 // Trigger barrier
351 injector.trigger(42, flags::FULL_SNAPSHOT);
352 assert_eq!(injector.epoch(), 1);
353
354 // Poll should return the barrier
355 let barrier = handle.poll(1).unwrap();
356 assert_eq!(barrier.checkpoint_id, 42);
357 assert_eq!(barrier.epoch, 1);
358 assert!(barrier.is_full_snapshot());
359
360 // Second poll should return None (already claimed)
361 assert!(handle.poll(1).is_none());
362 }
363
364 #[test]
365 fn test_injector_multiple_handles() {
366 let injector = CheckpointBarrierInjector::new();
367 let handle1 = injector.handle();
368 let handle2 = injector.handle();
369
370 injector.trigger(1, flags::NONE);
371
372 // Only one handle should claim it
373 let r1 = handle1.poll(1);
374 let r2 = handle2.poll(1);
375
376 // Exactly one should succeed
377 assert!(r1.is_some() || r2.is_some());
378 if r1.is_some() {
379 assert!(r2.is_none());
380 }
381 }
382}