Skip to main content

laminar_connectors/
checkpoint.rs

1//! Source checkpoints: the minimal state needed to resume a connector
2//! where it left off after a restart (Kafka offsets, CDC LSN/GTID, etc.).
3#![allow(clippy::disallowed_types)] // cold path: connector checkpoint
4
5use std::collections::HashMap;
6
7/// Checkpoint state for a source connector.
8///
9/// Captures the connector's position using string key-value pairs.
10/// This is flexible enough to represent:
11/// - Kafka: `{"partition-0": "1234", "partition-1": "5678"}`
12/// - `PostgreSQL` CDC: `{"lsn": "0/1234ABCD"}`
13/// - File: `{"path": "/data/file.csv", "offset": "4096"}`
14#[derive(Debug, Clone, Default, PartialEq, Eq)]
15pub struct SourceCheckpoint {
16    /// Connector-specific offset data.
17    offsets: HashMap<String, String>,
18
19    /// Epoch number this checkpoint belongs to.
20    epoch: u64,
21
22    /// Optional metadata for the checkpoint.
23    metadata: HashMap<String, String>,
24}
25
26impl SourceCheckpoint {
27    /// Creates an empty checkpoint.
28    #[must_use]
29    pub fn new(epoch: u64) -> Self {
30        Self {
31            offsets: HashMap::new(),
32            epoch,
33            metadata: HashMap::new(),
34        }
35    }
36
37    /// Creates a checkpoint with the given offsets.
38    #[must_use]
39    pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
40        Self {
41            offsets,
42            epoch,
43            metadata: HashMap::new(),
44        }
45    }
46
47    /// Sets an offset value.
48    pub fn set_offset(&mut self, key: impl Into<String>, value: impl Into<String>) {
49        self.offsets.insert(key.into(), value.into());
50    }
51
52    /// Gets an offset value.
53    #[must_use]
54    pub fn get_offset(&self, key: &str) -> Option<&str> {
55        self.offsets.get(key).map(String::as_str)
56    }
57
58    /// Returns all offsets.
59    #[must_use]
60    pub fn offsets(&self) -> &HashMap<String, String> {
61        &self.offsets
62    }
63
64    /// Returns the epoch number.
65    #[must_use]
66    pub fn epoch(&self) -> u64 {
67        self.epoch
68    }
69
70    /// Sets metadata on the checkpoint.
71    pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
72        self.metadata.insert(key.into(), value.into());
73    }
74
75    /// Gets metadata from the checkpoint.
76    #[must_use]
77    pub fn get_metadata(&self, key: &str) -> Option<&str> {
78        self.metadata.get(key).map(String::as_str)
79    }
80
81    /// Returns all metadata.
82    #[must_use]
83    pub fn metadata(&self) -> &HashMap<String, String> {
84        &self.metadata
85    }
86
87    /// Returns `true` if the checkpoint has no offsets.
88    #[must_use]
89    pub fn is_empty(&self) -> bool {
90        self.offsets.is_empty()
91    }
92}
93
94/// Runtime-managed checkpoint that wraps a source checkpoint with
95/// additional tracking information.
96#[derive(Debug, Clone)]
97pub struct RuntimeCheckpoint {
98    /// The source connector's checkpoint.
99    pub source: SourceCheckpoint,
100
101    /// Timestamp when this checkpoint was created (millis since epoch).
102    pub timestamp_ms: u64,
103
104    /// Name of the connector instance.
105    pub connector_name: String,
106}
107
108impl RuntimeCheckpoint {
109    /// Creates a new runtime checkpoint.
110    #[must_use]
111    pub fn new(
112        source: SourceCheckpoint,
113        connector_name: impl Into<String>,
114        timestamp_ms: u64,
115    ) -> Self {
116        Self {
117            source,
118            timestamp_ms,
119            connector_name: connector_name.into(),
120        }
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn test_source_checkpoint_basic() {
130        let mut cp = SourceCheckpoint::new(1);
131        cp.set_offset("partition-0", "1234");
132        cp.set_offset("partition-1", "5678");
133
134        assert_eq!(cp.epoch(), 1);
135        assert_eq!(cp.get_offset("partition-0"), Some("1234"));
136        assert_eq!(cp.get_offset("partition-1"), Some("5678"));
137        assert_eq!(cp.get_offset("partition-2"), None);
138        assert!(!cp.is_empty());
139    }
140
141    #[test]
142    fn test_source_checkpoint_with_offsets() {
143        let mut offsets = HashMap::new();
144        offsets.insert("lsn".to_string(), "0/1234ABCD".to_string());
145
146        let cp = SourceCheckpoint::with_offsets(5, offsets);
147        assert_eq!(cp.epoch(), 5);
148        assert_eq!(cp.get_offset("lsn"), Some("0/1234ABCD"));
149    }
150
151    #[test]
152    fn test_source_checkpoint_metadata() {
153        let mut cp = SourceCheckpoint::new(1);
154        cp.set_metadata("connector", "kafka");
155        cp.set_metadata("topic", "events");
156
157        assert_eq!(cp.get_metadata("connector"), Some("kafka"));
158        assert_eq!(cp.get_metadata("topic"), Some("events"));
159    }
160
161    #[test]
162    fn test_runtime_checkpoint() {
163        let source = SourceCheckpoint::new(3);
164        let rt = RuntimeCheckpoint::new(source, "my-kafka-source", 1_700_000_000_000);
165
166        assert_eq!(rt.source.epoch(), 3);
167        assert_eq!(rt.connector_name, "my-kafka-source");
168        assert_eq!(rt.timestamp_ms, 1_700_000_000_000);
169    }
170
171    #[test]
172    fn test_empty_checkpoint() {
173        let cp = SourceCheckpoint::new(0);
174        assert!(cp.is_empty());
175    }
176}