laminar_core/streaming/
checkpoint.rs1use std::fmt;
4
5#[derive(Debug, Clone, Default)]
7pub struct StreamCheckpointConfig {
8 pub interval_ms: Option<u64>,
10 pub data_dir: Option<std::path::PathBuf>,
12 pub max_retained: Option<usize>,
14 pub alignment_timeout_ms: Option<u64>,
17 pub max_in_flight_epochs: Option<u64>,
21 pub max_staged_bytes: Option<u64>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum CheckpointError {
29 Disabled,
31 DataDirRequired,
33 NoCheckpoint,
35 Timeout,
37 InvalidConfig(String),
39 IoError(String),
41}
42
43impl fmt::Display for CheckpointError {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 match self {
46 Self::Disabled => write!(f, "checkpointing is disabled"),
47 Self::DataDirRequired => write!(f, "data directory is required"),
48 Self::NoCheckpoint => write!(f, "no checkpoint available"),
49 Self::Timeout => write!(f, "checkpoint operation timed out"),
50 Self::InvalidConfig(msg) => write!(f, "invalid checkpoint config: {msg}"),
51 Self::IoError(msg) => write!(f, "checkpoint I/O error: {msg}"),
52 }
53 }
54}
55
56impl std::error::Error for CheckpointError {}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61
62 #[test]
63 fn test_default_config() {
64 let config = StreamCheckpointConfig::default();
65 assert!(config.interval_ms.is_none());
66 assert!(config.data_dir.is_none());
67 assert!(config.max_retained.is_none());
68 assert!(config.alignment_timeout_ms.is_none());
69 }
70}