Skip to main content

laminar_connectors/websocket/
checkpoint.rs

1//! `WebSocket` source checkpoint types.
2//!
3//! `WebSocket` is non-replayable -- checkpoints capture state for best-effort recovery.
4//! On recovery, data gaps should be expected and logged.
5
6use serde::{Deserialize, Serialize};
7
8use crate::checkpoint::SourceCheckpoint;
9
10/// Checkpoint state for a WebSocket source connector.
11///
12/// Since WebSocket has no intrinsic offsets or replay capability,
13/// this checkpoint captures application-level state for best-effort recovery.
14/// On recovery, data gaps should be expected and logged.
15///
16/// The entire struct is serialized as JSON and stored under a single key
17/// (`"websocket_state"`) in the [`SourceCheckpoint`] offsets map.
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct WebSocketSourceCheckpoint {
20    /// Application-level sequence number (if the WS server provides one).
21    pub last_sequence: Option<u64>,
22    /// Server-provided position token (if any -- some APIs support resume tokens).
23    pub position_token: Option<String>,
24    /// Last event timestamp processed (epoch millis).
25    pub last_event_time: Option<i64>,
26    /// Current watermark value.
27    pub watermark: i64,
28    /// Which URL was active at checkpoint time.
29    pub active_url: Option<String>,
30    /// Subscription messages to re-send on recovery.
31    pub subscriptions: Vec<String>,
32}
33
34/// Key used to store the serialized checkpoint in [`SourceCheckpoint`] offsets.
35const CHECKPOINT_KEY: &str = "websocket_state";
36
37impl WebSocketSourceCheckpoint {
38    /// Serializes this checkpoint into a [`SourceCheckpoint`]. The entire
39    /// struct is stored as a JSON string under `"websocket_state"` in the
40    /// checkpoint's offsets map.
41    #[must_use]
42    pub fn to_source_checkpoint(&self, epoch: u64) -> SourceCheckpoint {
43        let json = match serde_json::to_string(self) {
44            Ok(j) => j,
45            Err(e) => {
46                tracing::error!(error = %e, "failed to serialize WebSocket checkpoint state");
47                return SourceCheckpoint::new(epoch);
48            }
49        };
50        let mut cp = SourceCheckpoint::new(epoch);
51        cp.set_offset(CHECKPOINT_KEY, json);
52        cp
53    }
54
55    /// Deserializes a [`WebSocketSourceCheckpoint`] from a [`SourceCheckpoint`].
56    ///
57    /// Looks for the `"websocket_state"` key in the checkpoint's offsets map
58    /// and deserializes the JSON value. Returns a default (empty) checkpoint
59    /// if the key is missing or the JSON is malformed.
60    #[must_use]
61    pub fn from_source_checkpoint(cp: &SourceCheckpoint) -> Self {
62        cp.get_offset(CHECKPOINT_KEY)
63            .and_then(|json| serde_json::from_str(json).ok())
64            .unwrap_or_default()
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71
72    #[test]
73    fn test_default_checkpoint() {
74        let cp = WebSocketSourceCheckpoint::default();
75        assert!(cp.last_sequence.is_none());
76        assert!(cp.position_token.is_none());
77        assert!(cp.last_event_time.is_none());
78        assert_eq!(cp.watermark, 0);
79        assert!(cp.active_url.is_none());
80        assert!(cp.subscriptions.is_empty());
81    }
82
83    #[test]
84    fn test_roundtrip_empty() {
85        let ws_cp = WebSocketSourceCheckpoint::default();
86        let source_cp = ws_cp.to_source_checkpoint(1);
87        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
88
89        assert_eq!(restored.last_sequence, None);
90        assert_eq!(restored.position_token, None);
91        assert_eq!(restored.last_event_time, None);
92        assert_eq!(restored.watermark, 0);
93        assert_eq!(restored.active_url, None);
94        assert!(restored.subscriptions.is_empty());
95    }
96
97    #[test]
98    fn test_roundtrip_full() {
99        let ws_cp = WebSocketSourceCheckpoint {
100            last_sequence: Some(42),
101            position_token: Some("tok-abc-123".to_string()),
102            last_event_time: Some(1_700_000_000_000),
103            watermark: 1_699_999_999_000,
104            active_url: Some("wss://feed.example.com/v1".to_string()),
105            subscriptions: vec![
106                r#"{"action":"subscribe","channel":"trades"}"#.to_string(),
107                r#"{"action":"subscribe","channel":"orderbook"}"#.to_string(),
108            ],
109        };
110
111        let source_cp = ws_cp.to_source_checkpoint(5);
112        assert_eq!(source_cp.epoch(), 5);
113
114        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
115        assert_eq!(restored.last_sequence, Some(42));
116        assert_eq!(restored.position_token.as_deref(), Some("tok-abc-123"));
117        assert_eq!(restored.last_event_time, Some(1_700_000_000_000));
118        assert_eq!(restored.watermark, 1_699_999_999_000);
119        assert_eq!(
120            restored.active_url.as_deref(),
121            Some("wss://feed.example.com/v1")
122        );
123        assert_eq!(restored.subscriptions.len(), 2);
124        assert!(restored.subscriptions[0].contains("trades"));
125        assert!(restored.subscriptions[1].contains("orderbook"));
126    }
127
128    #[test]
129    fn test_from_empty_source_checkpoint() {
130        let source_cp = SourceCheckpoint::new(0);
131        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
132        assert!(restored.last_sequence.is_none());
133        assert_eq!(restored.watermark, 0);
134    }
135
136    #[test]
137    fn test_from_invalid_json() {
138        let mut source_cp = SourceCheckpoint::new(1);
139        source_cp.set_offset("websocket_state", "not valid json");
140        let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
141        // Should fall back to default
142        assert!(restored.last_sequence.is_none());
143        assert_eq!(restored.watermark, 0);
144    }
145
146    #[test]
147    fn test_epoch_preserved() {
148        let ws_cp = WebSocketSourceCheckpoint {
149            watermark: 100,
150            ..Default::default()
151        };
152        let source_cp = ws_cp.to_source_checkpoint(42);
153        assert_eq!(source_cp.epoch(), 42);
154    }
155
156    #[test]
157    fn test_checkpoint_key_in_offsets() {
158        let ws_cp = WebSocketSourceCheckpoint {
159            last_sequence: Some(99),
160            ..Default::default()
161        };
162        let source_cp = ws_cp.to_source_checkpoint(1);
163        let raw_json = source_cp.get_offset("websocket_state");
164        assert!(raw_json.is_some());
165        assert!(raw_json.unwrap().contains("99"));
166    }
167
168    #[test]
169    fn test_serde_json_roundtrip() {
170        let ws_cp = WebSocketSourceCheckpoint {
171            last_sequence: Some(7),
172            position_token: None,
173            last_event_time: Some(-1000),
174            watermark: -500,
175            active_url: Some("ws://localhost:8080".to_string()),
176            subscriptions: vec!["sub1".to_string()],
177        };
178
179        let json = serde_json::to_string(&ws_cp).unwrap();
180        let restored: WebSocketSourceCheckpoint = serde_json::from_str(&json).unwrap();
181
182        assert_eq!(restored.last_sequence, Some(7));
183        assert_eq!(restored.last_event_time, Some(-1000));
184        assert_eq!(restored.watermark, -500);
185        assert_eq!(restored.subscriptions, vec!["sub1"]);
186    }
187}