Skip to main content

laminar_connectors/
checkpoint.rs

1//! Connector checkpoint types.
2//!
3//! Checkpoints capture the position of a source connector so it can
4//! resume from where it left off after a restart.
5#![allow(clippy::disallowed_types)] // cold path: connector checkpoint
6
7use std::collections::HashMap;
8
9/// Checkpoint state for a source connector.
10///
11/// Captures the connector's position using string key-value pairs.
12/// This is flexible enough to represent:
13/// - Kafka: `{"partition-0": "1234", "partition-1": "5678"}`
14/// - `PostgreSQL` CDC: `{"lsn": "0/1234ABCD"}`
15/// - File: `{"path": "/data/file.csv", "offset": "4096"}`
16#[derive(Debug, Clone, Default, PartialEq, Eq)]
17pub struct SourceCheckpoint {
18    /// Connector-specific offset data.
19    offsets: HashMap<String, String>,
20
21    /// Epoch number this checkpoint belongs to.
22    epoch: u64,
23
24    /// Optional metadata for the checkpoint.
25    metadata: HashMap<String, String>,
26}
27
28impl SourceCheckpoint {
29    /// Creates an empty checkpoint.
30    #[must_use]
31    pub fn new(epoch: u64) -> Self {
32        Self {
33            offsets: HashMap::new(),
34            epoch,
35            metadata: HashMap::new(),
36        }
37    }
38
39    /// Creates a checkpoint with the given offsets.
40    #[must_use]
41    pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
42        Self {
43            offsets,
44            epoch,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Sets an offset value.
50    pub fn set_offset(&mut self, key: impl Into<String>, value: impl Into<String>) {
51        self.offsets.insert(key.into(), value.into());
52    }
53
54    /// Gets an offset value.
55    #[must_use]
56    pub fn get_offset(&self, key: &str) -> Option<&str> {
57        self.offsets.get(key).map(String::as_str)
58    }
59
60    /// Returns all offsets.
61    #[must_use]
62    pub fn offsets(&self) -> &HashMap<String, String> {
63        &self.offsets
64    }
65
66    /// Returns the epoch number.
67    #[must_use]
68    pub fn epoch(&self) -> u64 {
69        self.epoch
70    }
71
72    /// Sets metadata on the checkpoint.
73    pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
74        self.metadata.insert(key.into(), value.into());
75    }
76
77    /// Gets metadata from the checkpoint.
78    #[must_use]
79    pub fn get_metadata(&self, key: &str) -> Option<&str> {
80        self.metadata.get(key).map(String::as_str)
81    }
82
83    /// Returns all metadata.
84    #[must_use]
85    pub fn metadata(&self) -> &HashMap<String, String> {
86        &self.metadata
87    }
88
89    /// Returns `true` if the checkpoint has any offsets.
90    #[must_use]
91    pub fn is_empty(&self) -> bool {
92        self.offsets.is_empty()
93    }
94}
95
96/// Runtime-managed checkpoint that wraps a source checkpoint with
97/// additional tracking information.
98#[derive(Debug, Clone)]
99pub struct RuntimeCheckpoint {
100    /// The source connector's checkpoint.
101    pub source: SourceCheckpoint,
102
103    /// Timestamp when this checkpoint was created (millis since epoch).
104    pub timestamp_ms: u64,
105
106    /// Name of the connector instance.
107    pub connector_name: String,
108}
109
110impl RuntimeCheckpoint {
111    /// Creates a new runtime checkpoint.
112    #[must_use]
113    pub fn new(
114        source: SourceCheckpoint,
115        connector_name: impl Into<String>,
116        timestamp_ms: u64,
117    ) -> Self {
118        Self {
119            source,
120            timestamp_ms,
121            connector_name: connector_name.into(),
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn test_source_checkpoint_basic() {
132        let mut cp = SourceCheckpoint::new(1);
133        cp.set_offset("partition-0", "1234");
134        cp.set_offset("partition-1", "5678");
135
136        assert_eq!(cp.epoch(), 1);
137        assert_eq!(cp.get_offset("partition-0"), Some("1234"));
138        assert_eq!(cp.get_offset("partition-1"), Some("5678"));
139        assert_eq!(cp.get_offset("partition-2"), None);
140        assert!(!cp.is_empty());
141    }
142
143    #[test]
144    fn test_source_checkpoint_with_offsets() {
145        let mut offsets = HashMap::new();
146        offsets.insert("lsn".to_string(), "0/1234ABCD".to_string());
147
148        let cp = SourceCheckpoint::with_offsets(5, offsets);
149        assert_eq!(cp.epoch(), 5);
150        assert_eq!(cp.get_offset("lsn"), Some("0/1234ABCD"));
151    }
152
153    #[test]
154    fn test_source_checkpoint_metadata() {
155        let mut cp = SourceCheckpoint::new(1);
156        cp.set_metadata("connector", "kafka");
157        cp.set_metadata("topic", "events");
158
159        assert_eq!(cp.get_metadata("connector"), Some("kafka"));
160        assert_eq!(cp.get_metadata("topic"), Some("events"));
161    }
162
163    #[test]
164    fn test_runtime_checkpoint() {
165        let source = SourceCheckpoint::new(3);
166        let rt = RuntimeCheckpoint::new(source, "my-kafka-source", 1_700_000_000_000);
167
168        assert_eq!(rt.source.epoch(), 3);
169        assert_eq!(rt.connector_name, "my-kafka-source");
170        assert_eq!(rt.timestamp_ms, 1_700_000_000_000);
171    }
172
173    #[test]
174    fn test_empty_checkpoint() {
175        let cp = SourceCheckpoint::new(0);
176        assert!(cp.is_empty());
177    }
178}