Skip to main content

laminar_connectors/websocket/
checkpoint.rs

1//! WebSocket source checkpoint types.
2//!
3//! WebSocket is non-replayable -- checkpoints capture state for best-effort recovery.
4//! On recovery, data gaps should be expected and logged.
5
6use serde::{Deserialize, Serialize};
7
8use crate::checkpoint::SourceCheckpoint;
9
10/// Checkpoint state for a WebSocket source connector.
11///
12/// Since WebSocket has no intrinsic offsets or replay capability,
13/// this checkpoint captures application-level state for best-effort recovery.
14/// On recovery, data gaps should be expected and logged.
15///
16/// The entire struct is serialized as JSON and stored under a single key
17/// (`"websocket_state"`) in the [`SourceCheckpoint`] offsets map.
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct WebSocketSourceCheckpoint {
20    /// Application-level sequence number (if the WS server provides one).
21    pub last_sequence: Option<u64>,
22    /// Server-provided position token (if any -- some APIs support resume tokens).
23    pub position_token: Option<String>,
24    /// Last event timestamp processed (epoch millis).
25    pub last_event_time: Option<i64>,
26    /// Current watermark value.
27    pub watermark: i64,
28    /// Which URL was active at checkpoint time.
29    pub active_url: Option<String>,
30    /// Subscription messages to re-send on recovery.
31    pub subscriptions: Vec<String>,
32}
33
34/// Key used to store the serialized checkpoint in [`SourceCheckpoint`] offsets.
35const CHECKPOINT_KEY: &str = "websocket_state";
36
37impl WebSocketSourceCheckpoint {
38    /// Serializes this checkpoint into a [`SourceCheckpoint`].
39    ///
40    /// The entire struct is stored as a JSON string under the key
41    /// `"websocket_state"` in the checkpoint's offsets map.
42    ///
43    /// # Arguments
44    ///
45    /// * `epoch` - The epoch number for the checkpoint.
46    ///
47    #[must_use]
48    pub fn to_source_checkpoint(&self, epoch: u64) -> SourceCheckpoint {
49        let json = match serde_json::to_string(self) {
50            Ok(j) => j,
51            Err(e) => {
52                tracing::error!(error = %e, "failed to serialize WebSocket checkpoint state");
53                return SourceCheckpoint::new(epoch);
54            }
55        };
56        let mut cp = SourceCheckpoint::new(epoch);
57        cp.set_offset(CHECKPOINT_KEY, json);
58        cp
59    }
60
61    /// Deserializes a [`WebSocketSourceCheckpoint`] from a [`SourceCheckpoint`].
62    ///
63    /// Looks for the `"websocket_state"` key in the checkpoint's offsets map
64    /// and deserializes the JSON value. Returns a default (empty) checkpoint
65    /// if the key is missing or the JSON is malformed.
66    #[must_use]
67    pub fn from_source_checkpoint(cp: &SourceCheckpoint) -> Self {
68        cp.get_offset(CHECKPOINT_KEY)
69            .and_then(|json| serde_json::from_str(json).ok())
70            .unwrap_or_default()
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77
78    #[test]
79    fn test_default_checkpoint() {
80        let cp = WebSocketSourceCheckpoint::default();
81        assert!(cp.last_sequence.is_none());
82        assert!(cp.position_token.is_none());
83        assert!(cp.last_event_time.is_none());
84        assert_eq!(cp.watermark, 0);
85        assert!(cp.active_url.is_none());
86        assert!(cp.subscriptions.is_empty());
87    }
88
89    #[test]
90    fn test_roundtrip_empty() {
91        let ws_cp = WebSocketSourceCheckpoint::default();
92        let source_cp = ws_cp.to_source_checkpoint(1);
93        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
94
95        assert_eq!(restored.last_sequence, None);
96        assert_eq!(restored.position_token, None);
97        assert_eq!(restored.last_event_time, None);
98        assert_eq!(restored.watermark, 0);
99        assert_eq!(restored.active_url, None);
100        assert!(restored.subscriptions.is_empty());
101    }
102
103    #[test]
104    fn test_roundtrip_full() {
105        let ws_cp = WebSocketSourceCheckpoint {
106            last_sequence: Some(42),
107            position_token: Some("tok-abc-123".to_string()),
108            last_event_time: Some(1_700_000_000_000),
109            watermark: 1_699_999_999_000,
110            active_url: Some("wss://feed.example.com/v1".to_string()),
111            subscriptions: vec![
112                r#"{"action":"subscribe","channel":"trades"}"#.to_string(),
113                r#"{"action":"subscribe","channel":"orderbook"}"#.to_string(),
114            ],
115        };
116
117        let source_cp = ws_cp.to_source_checkpoint(5);
118        assert_eq!(source_cp.epoch(), 5);
119
120        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
121        assert_eq!(restored.last_sequence, Some(42));
122        assert_eq!(restored.position_token.as_deref(), Some("tok-abc-123"));
123        assert_eq!(restored.last_event_time, Some(1_700_000_000_000));
124        assert_eq!(restored.watermark, 1_699_999_999_000);
125        assert_eq!(
126            restored.active_url.as_deref(),
127            Some("wss://feed.example.com/v1")
128        );
129        assert_eq!(restored.subscriptions.len(), 2);
130        assert!(restored.subscriptions[0].contains("trades"));
131        assert!(restored.subscriptions[1].contains("orderbook"));
132    }
133
134    #[test]
135    fn test_from_empty_source_checkpoint() {
136        let source_cp = SourceCheckpoint::new(0);
137        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
138        assert!(restored.last_sequence.is_none());
139        assert_eq!(restored.watermark, 0);
140    }
141
142    #[test]
143    fn test_from_invalid_json() {
144        let mut source_cp = SourceCheckpoint::new(1);
145        source_cp.set_offset("websocket_state", "not valid json");
146        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
147        // Should fall back to default
148        assert!(restored.last_sequence.is_none());
149        assert_eq!(restored.watermark, 0);
150    }
151
152    #[test]
153    fn test_epoch_preserved() {
154        let ws_cp = WebSocketSourceCheckpoint {
155            watermark: 100,
156            ..Default::default()
157        };
158        let source_cp = ws_cp.to_source_checkpoint(42);
159        assert_eq!(source_cp.epoch(), 42);
160    }
161
162    #[test]
163    fn test_checkpoint_key_in_offsets() {
164        let ws_cp = WebSocketSourceCheckpoint {
165            last_sequence: Some(99),
166            ..Default::default()
167        };
168        let source_cp = ws_cp.to_source_checkpoint(1);
169        let raw_json = source_cp.get_offset("websocket_state");
170        assert!(raw_json.is_some());
171        assert!(raw_json.unwrap().contains("99"));
172    }
173
174    #[test]
175    fn test_serde_json_roundtrip() {
176        let ws_cp = WebSocketSourceCheckpoint {
177            last_sequence: Some(7),
178            position_token: None,
179            last_event_time: Some(-1000),
180            watermark: -500,
181            active_url: Some("ws://localhost:8080".to_string()),
182            subscriptions: vec!["sub1".to_string()],
183        };
184
185        let json = serde_json::to_string(&ws_cp).unwrap();
186        let restored: WebSocketSourceCheckpoint = serde_json::from_str(&json).unwrap();
187
188        assert_eq!(restored.last_sequence, Some(7));
189        assert_eq!(restored.last_event_time, Some(-1000));
190        assert_eq!(restored.watermark, -500);
191        assert_eq!(restored.subscriptions, vec!["sub1"]);
192    }
193}