Skip to main content

laminar_core/streaming/
checkpoint.rs

1//! Streaming checkpoint configuration.
2
3use std::fmt;
4
5/// Configuration for streaming checkpoints.
6#[derive(Debug, Clone, Default)]
7pub struct StreamCheckpointConfig {
8    /// Checkpoint interval in milliseconds. `None` = manual only.
9    pub interval_ms: Option<u64>,
10    /// Directory for persisting checkpoints. `None` = in-memory only.
11    pub data_dir: Option<std::path::PathBuf>,
12    /// Maximum number of retained checkpoints. `None` = default (3).
13    pub max_retained: Option<usize>,
14    /// Barrier-alignment timeout in milliseconds at fan-in operators.
15    /// `None` = default (`30_000`).
16    pub alignment_timeout_ms: Option<u64>,
17    /// Max epochs admitted between capture and restorable (the upload
18    /// backlog). `None` = default (4). Exactly-once pipelines are
19    /// capped at 1 regardless.
20    pub max_in_flight_epochs: Option<u64>,
21    /// Cap on captured-state bytes held by in-flight epochs; admission
22    /// pauses at the cap. `None` = default (512 MiB).
23    pub max_staged_bytes: Option<u64>,
24}
25
26/// Errors from checkpoint operations.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum CheckpointError {
29    /// Checkpointing is disabled.
30    Disabled,
31    /// A data directory is required.
32    DataDirRequired,
33    /// No checkpoint available for restore.
34    NoCheckpoint,
35    /// Operation timed out.
36    Timeout,
37    /// Invalid configuration.
38    InvalidConfig(String),
39    /// I/O error (stored as string for Clone/PartialEq).
40    IoError(String),
41}
42
43impl fmt::Display for CheckpointError {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        match self {
46            Self::Disabled => write!(f, "checkpointing is disabled"),
47            Self::DataDirRequired => write!(f, "data directory is required"),
48            Self::NoCheckpoint => write!(f, "no checkpoint available"),
49            Self::Timeout => write!(f, "checkpoint operation timed out"),
50            Self::InvalidConfig(msg) => write!(f, "invalid checkpoint config: {msg}"),
51            Self::IoError(msg) => write!(f, "checkpoint I/O error: {msg}"),
52        }
53    }
54}
55
56impl std::error::Error for CheckpointError {}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61
62    #[test]
63    fn test_default_config() {
64        let config = StreamCheckpointConfig::default();
65        assert!(config.interval_ms.is_none());
66        assert!(config.data_dir.is_none());
67        assert!(config.max_retained.is_none());
68        assert!(config.alignment_timeout_ms.is_none());
69    }
70}