laminar_connectors/websocket/
checkpoint.rs1use serde::{Deserialize, Serialize};
7
8use crate::checkpoint::SourceCheckpoint;
9
10#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct WebSocketSourceCheckpoint {
20 pub last_sequence: Option<u64>,
22 pub position_token: Option<String>,
24 pub last_event_time: Option<i64>,
26 pub watermark: i64,
28 pub active_url: Option<String>,
30 pub subscriptions: Vec<String>,
32}
33
34const CHECKPOINT_KEY: &str = "websocket_state";
36
37impl WebSocketSourceCheckpoint {
38 #[must_use]
42 pub fn to_source_checkpoint(&self, epoch: u64) -> SourceCheckpoint {
43 let json = match serde_json::to_string(self) {
44 Ok(j) => j,
45 Err(e) => {
46 tracing::error!(error = %e, "failed to serialize WebSocket checkpoint state");
47 return SourceCheckpoint::new(epoch);
48 }
49 };
50 let mut cp = SourceCheckpoint::new(epoch);
51 cp.set_offset(CHECKPOINT_KEY, json);
52 cp
53 }
54
55 #[must_use]
61 pub fn from_source_checkpoint(cp: &SourceCheckpoint) -> Self {
62 cp.get_offset(CHECKPOINT_KEY)
63 .and_then(|json| serde_json::from_str(json).ok())
64 .unwrap_or_default()
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71
72 #[test]
73 fn test_default_checkpoint() {
74 let cp = WebSocketSourceCheckpoint::default();
75 assert!(cp.last_sequence.is_none());
76 assert!(cp.position_token.is_none());
77 assert!(cp.last_event_time.is_none());
78 assert_eq!(cp.watermark, 0);
79 assert!(cp.active_url.is_none());
80 assert!(cp.subscriptions.is_empty());
81 }
82
83 #[test]
84 fn test_roundtrip_empty() {
85 let ws_cp = WebSocketSourceCheckpoint::default();
86 let source_cp = ws_cp.to_source_checkpoint(1);
87 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
88
89 assert_eq!(restored.last_sequence, None);
90 assert_eq!(restored.position_token, None);
91 assert_eq!(restored.last_event_time, None);
92 assert_eq!(restored.watermark, 0);
93 assert_eq!(restored.active_url, None);
94 assert!(restored.subscriptions.is_empty());
95 }
96
97 #[test]
98 fn test_roundtrip_full() {
99 let ws_cp = WebSocketSourceCheckpoint {
100 last_sequence: Some(42),
101 position_token: Some("tok-abc-123".to_string()),
102 last_event_time: Some(1_700_000_000_000),
103 watermark: 1_699_999_999_000,
104 active_url: Some("wss://feed.example.com/v1".to_string()),
105 subscriptions: vec![
106 r#"{"action":"subscribe","channel":"trades"}"#.to_string(),
107 r#"{"action":"subscribe","channel":"orderbook"}"#.to_string(),
108 ],
109 };
110
111 let source_cp = ws_cp.to_source_checkpoint(5);
112 assert_eq!(source_cp.epoch(), 5);
113
114 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
115 assert_eq!(restored.last_sequence, Some(42));
116 assert_eq!(restored.position_token.as_deref(), Some("tok-abc-123"));
117 assert_eq!(restored.last_event_time, Some(1_700_000_000_000));
118 assert_eq!(restored.watermark, 1_699_999_999_000);
119 assert_eq!(
120 restored.active_url.as_deref(),
121 Some("wss://feed.example.com/v1")
122 );
123 assert_eq!(restored.subscriptions.len(), 2);
124 assert!(restored.subscriptions[0].contains("trades"));
125 assert!(restored.subscriptions[1].contains("orderbook"));
126 }
127
128 #[test]
129 fn test_from_empty_source_checkpoint() {
130 let source_cp = SourceCheckpoint::new(0);
131 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
132 assert!(restored.last_sequence.is_none());
133 assert_eq!(restored.watermark, 0);
134 }
135
136 #[test]
137 fn test_from_invalid_json() {
138 let mut source_cp = SourceCheckpoint::new(1);
139 source_cp.set_offset("websocket_state", "not valid json");
140 let restored = WebSocketSourceCheckpoint::from_source_checkpoint(&source_cp);
141 assert!(restored.last_sequence.is_none());
143 assert_eq!(restored.watermark, 0);
144 }
145
146 #[test]
147 fn test_epoch_preserved() {
148 let ws_cp = WebSocketSourceCheckpoint {
149 watermark: 100,
150 ..Default::default()
151 };
152 let source_cp = ws_cp.to_source_checkpoint(42);
153 assert_eq!(source_cp.epoch(), 42);
154 }
155
156 #[test]
157 fn test_checkpoint_key_in_offsets() {
158 let ws_cp = WebSocketSourceCheckpoint {
159 last_sequence: Some(99),
160 ..Default::default()
161 };
162 let source_cp = ws_cp.to_source_checkpoint(1);
163 let raw_json = source_cp.get_offset("websocket_state");
164 assert!(raw_json.is_some());
165 assert!(raw_json.unwrap().contains("99"));
166 }
167
168 #[test]
169 fn test_serde_json_roundtrip() {
170 let ws_cp = WebSocketSourceCheckpoint {
171 last_sequence: Some(7),
172 position_token: None,
173 last_event_time: Some(-1000),
174 watermark: -500,
175 active_url: Some("ws://localhost:8080".to_string()),
176 subscriptions: vec!["sub1".to_string()],
177 };
178
179 let json = serde_json::to_string(&ws_cp).unwrap();
180 let restored: WebSocketSourceCheckpoint = serde_json::from_str(&json).unwrap();
181
182 assert_eq!(restored.last_sequence, Some(7));
183 assert_eq!(restored.last_event_time, Some(-1000));
184 assert_eq!(restored.watermark, -500);
185 assert_eq!(restored.subscriptions, vec!["sub1"]);
186 }
187}