laminar_connectors/websocket/
checkpoint.rs1use serde::{Deserialize, Serialize};
7
8use crate::checkpoint::SourceCheckpoint;
9
10#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct WebSocketSourceCheckpoint {
20 pub last_sequence: Option<u64>,
22 pub position_token: Option<String>,
24 pub last_event_time: Option<i64>,
26 pub watermark: i64,
28 pub active_url: Option<String>,
30 pub subscriptions: Vec<String>,
32}
33
34const CHECKPOINT_KEY: &str = "websocket_state";
36
37impl WebSocketSourceCheckpoint {
38 #[must_use]
48 pub fn to_source_checkpoint(&self, epoch: u64) -> SourceCheckpoint {
49 let json = match serde_json::to_string(self) {
50 Ok(j) => j,
51 Err(e) => {
52 tracing::error!(error = %e, "failed to serialize WebSocket checkpoint state");
53 return SourceCheckpoint::new(epoch);
54 }
55 };
56 let mut cp = SourceCheckpoint::new(epoch);
57 cp.set_offset(CHECKPOINT_KEY, json);
58 cp
59 }
60
61 #[must_use]
67 pub fn from_source_checkpoint(cp: &SourceCheckpoint) -> Self {
68 cp.get_offset(CHECKPOINT_KEY)
69 .and_then(|json| serde_json::from_str(json).ok())
70 .unwrap_or_default()
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77
78 #[test]
79 fn test_default_checkpoint() {
80 let cp = WebSocketSourceCheckpoint::default();
81 assert!(cp.last_sequence.is_none());
82 assert!(cp.position_token.is_none());
83 assert!(cp.last_event_time.is_none());
84 assert_eq!(cp.watermark, 0);
85 assert!(cp.active_url.is_none());
86 assert!(cp.subscriptions.is_empty());
87 }
88
89 #[test]
90 fn test_roundtrip_empty() {
91 let ws_cp = WebSocketSourceCheckpoint::default();
92 let source_cp = ws_cp.to_source_checkpoint(1);
93 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
94
95 assert_eq!(restored.last_sequence, None);
96 assert_eq!(restored.position_token, None);
97 assert_eq!(restored.last_event_time, None);
98 assert_eq!(restored.watermark, 0);
99 assert_eq!(restored.active_url, None);
100 assert!(restored.subscriptions.is_empty());
101 }
102
103 #[test]
104 fn test_roundtrip_full() {
105 let ws_cp = WebSocketSourceCheckpoint {
106 last_sequence: Some(42),
107 position_token: Some("tok-abc-123".to_string()),
108 last_event_time: Some(1_700_000_000_000),
109 watermark: 1_699_999_999_000,
110 active_url: Some("wss://feed.example.com/v1".to_string()),
111 subscriptions: vec![
112 r#"{"action":"subscribe","channel":"trades"}"#.to_string(),
113 r#"{"action":"subscribe","channel":"orderbook"}"#.to_string(),
114 ],
115 };
116
117 let source_cp = ws_cp.to_source_checkpoint(5);
118 assert_eq!(source_cp.epoch(), 5);
119
120 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
121 assert_eq!(restored.last_sequence, Some(42));
122 assert_eq!(restored.position_token.as_deref(), Some("tok-abc-123"));
123 assert_eq!(restored.last_event_time, Some(1_700_000_000_000));
124 assert_eq!(restored.watermark, 1_699_999_999_000);
125 assert_eq!(
126 restored.active_url.as_deref(),
127 Some("wss://feed.example.com/v1")
128 );
129 assert_eq!(restored.subscriptions.len(), 2);
130 assert!(restored.subscriptions[0].contains("trades"));
131 assert!(restored.subscriptions[1].contains("orderbook"));
132 }
133
134 #[test]
135 fn test_from_empty_source_checkpoint() {
136 let source_cp = SourceCheckpoint::new(0);
137 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
138 assert!(restored.last_sequence.is_none());
139 assert_eq!(restored.watermark, 0);
140 }
141
142 #[test]
143 fn test_from_invalid_json() {
144 let mut source_cp = SourceCheckpoint::new(1);
145 source_cp.set_offset("websocket_state", "not valid json");
146 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
147 assert!(restored.last_sequence.is_none());
149 assert_eq!(restored.watermark, 0);
150 }
151
152 #[test]
153 fn test_epoch_preserved() {
154 let ws_cp = WebSocketSourceCheckpoint {
155 watermark: 100,
156 ..Default::default()
157 };
158 let source_cp = ws_cp.to_source_checkpoint(42);
159 assert_eq!(source_cp.epoch(), 42);
160 }
161
162 #[test]
163 fn test_checkpoint_key_in_offsets() {
164 let ws_cp = WebSocketSourceCheckpoint {
165 last_sequence: Some(99),
166 ..Default::default()
167 };
168 let source_cp = ws_cp.to_source_checkpoint(1);
169 let raw_json = source_cp.get_offset("websocket_state");
170 assert!(raw_json.is_some());
171 assert!(raw_json.unwrap().contains("99"));
172 }
173
174 #[test]
175 fn test_serde_json_roundtrip() {
176 let ws_cp = WebSocketSourceCheckpoint {
177 last_sequence: Some(7),
178 position_token: None,
179 last_event_time: Some(-1000),
180 watermark: -500,
181 active_url: Some("ws://localhost:8080".to_string()),
182 subscriptions: vec!["sub1".to_string()],
183 };
184
185 let json = serde_json::to_string(&ws_cp).unwrap();
186 let restored: WebSocketSourceCheckpoint = serde_json::from_str(&json).unwrap();
187
188 assert_eq!(restored.last_sequence, Some(7));
189 assert_eq!(restored.last_event_time, Some(-1000));
190 assert_eq!(restored.watermark, -500);
191 assert_eq!(restored.subscriptions, vec!["sub1"]);
192 }
193}