laminar_connectors/websocket/
protocol.rs1use serde::{Deserialize, Serialize};
8
9use super::sink_config::SinkFormat;
10
11#[derive(Debug, Deserialize)]
17#[serde(tag = "action")]
18pub enum ClientMessage {
19 #[serde(rename = "subscribe")]
21 Subscribe {
22 filter: Option<String>,
24 last_sequence: Option<u64>,
26 format: Option<SinkFormat>,
28 },
29 #[serde(rename = "unsubscribe")]
31 Unsubscribe {
32 subscription_id: String,
34 },
35 #[serde(rename = "ping")]
37 Ping,
38}
39
40#[derive(Debug, Clone, Serialize)]
46#[serde(tag = "type")]
47pub enum ServerMessage {
48 #[serde(rename = "data")]
50 Data {
51 subscription_id: String,
53 data: serde_json::Value,
55 sequence: u64,
57 watermark: Option<i64>,
59 },
60 #[serde(rename = "subscribed")]
62 Subscribed {
63 subscription_id: String,
65 },
66 #[serde(rename = "error")]
68 Error {
69 message: String,
71 },
72 #[serde(rename = "heartbeat")]
74 Heartbeat {
75 server_time: i64,
77 },
78 #[serde(rename = "backpressure")]
80 BackpressureWarning {
81 buffer_pct: u8,
83 },
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89
90 #[test]
93 fn test_deserialize_subscribe_full() {
94 let json = r#"{
95 "action": "subscribe",
96 "filter": "symbol = 'AAPL'",
97 "last_sequence": 42,
98 "format": "Json"
99 }"#;
100 let msg: ClientMessage = serde_json::from_str(json).unwrap();
101 if let ClientMessage::Subscribe {
102 filter,
103 last_sequence,
104 format,
105 } = msg
106 {
107 assert_eq!(filter.as_deref(), Some("symbol = 'AAPL'"));
108 assert_eq!(last_sequence, Some(42));
109 assert!(format.is_some());
110 } else {
111 panic!("expected Subscribe variant");
112 }
113 }
114
115 #[test]
116 fn test_deserialize_subscribe_minimal() {
117 let json = r#"{"action": "subscribe"}"#;
118 let msg: ClientMessage = serde_json::from_str(json).unwrap();
119 if let ClientMessage::Subscribe {
120 filter,
121 last_sequence,
122 format,
123 } = msg
124 {
125 assert!(filter.is_none());
126 assert!(last_sequence.is_none());
127 assert!(format.is_none());
128 } else {
129 panic!("expected Subscribe variant");
130 }
131 }
132
133 #[test]
134 fn test_deserialize_unsubscribe() {
135 let json = r#"{"action": "unsubscribe", "subscription_id": "sub-123"}"#;
136 let msg: ClientMessage = serde_json::from_str(json).unwrap();
137 if let ClientMessage::Unsubscribe { subscription_id } = msg {
138 assert_eq!(subscription_id, "sub-123");
139 } else {
140 panic!("expected Unsubscribe variant");
141 }
142 }
143
144 #[test]
145 fn test_deserialize_ping() {
146 let json = r#"{"action": "ping"}"#;
147 let msg: ClientMessage = serde_json::from_str(json).unwrap();
148 assert!(matches!(msg, ClientMessage::Ping));
149 }
150
151 #[test]
152 fn test_deserialize_unknown_action_fails() {
153 let json = r#"{"action": "unknown_action"}"#;
154 let result = serde_json::from_str::<ClientMessage>(json);
155 assert!(result.is_err());
156 }
157
158 #[test]
161 fn test_serialize_data() {
162 let msg = ServerMessage::Data {
163 subscription_id: "sub-1".to_string(),
164 data: serde_json::json!({"price": 150.5}),
165 sequence: 100,
166 watermark: Some(1_700_000_000_000),
167 };
168 let json = serde_json::to_value(&msg).unwrap();
169 assert_eq!(json["type"], "data");
170 assert_eq!(json["subscription_id"], "sub-1");
171 assert_eq!(json["sequence"], 100);
172 assert_eq!(json["watermark"], 1_700_000_000_000_i64);
173 assert_eq!(json["data"]["price"], 150.5);
174 }
175
176 #[test]
177 fn test_serialize_data_no_watermark() {
178 let msg = ServerMessage::Data {
179 subscription_id: "sub-2".to_string(),
180 data: serde_json::json!("hello"),
181 sequence: 1,
182 watermark: None,
183 };
184 let json = serde_json::to_value(&msg).unwrap();
185 assert_eq!(json["type"], "data");
186 assert!(json["watermark"].is_null());
187 }
188
189 #[test]
190 fn test_serialize_subscribed() {
191 let msg = ServerMessage::Subscribed {
192 subscription_id: "sub-abc".to_string(),
193 };
194 let json = serde_json::to_value(&msg).unwrap();
195 assert_eq!(json["type"], "subscribed");
196 assert_eq!(json["subscription_id"], "sub-abc");
197 }
198
199 #[test]
200 fn test_serialize_error() {
201 let msg = ServerMessage::Error {
202 message: "invalid filter expression".to_string(),
203 };
204 let json = serde_json::to_value(&msg).unwrap();
205 assert_eq!(json["type"], "error");
206 assert_eq!(json["message"], "invalid filter expression");
207 }
208
209 #[test]
210 fn test_serialize_heartbeat() {
211 let msg = ServerMessage::Heartbeat {
212 server_time: 1_700_000_000_000,
213 };
214 let json = serde_json::to_value(&msg).unwrap();
215 assert_eq!(json["type"], "heartbeat");
216 assert_eq!(json["server_time"], 1_700_000_000_000_i64);
217 }
218
219 #[test]
220 fn test_serialize_backpressure_warning() {
221 let msg = ServerMessage::BackpressureWarning { buffer_pct: 85 };
222 let json = serde_json::to_value(&msg).unwrap();
223 assert_eq!(json["type"], "backpressure");
224 assert_eq!(json["buffer_pct"], 85);
225 }
226
227 #[test]
228 fn test_server_message_clone() {
229 let msg = ServerMessage::Heartbeat { server_time: 12345 };
230 let cloned = msg.clone();
231 let json = serde_json::to_value(&cloned).unwrap();
232 assert_eq!(json["server_time"], 12345);
233 }
234
235 #[test]
236 fn test_server_message_debug() {
237 let msg = ServerMessage::Error {
238 message: "test".to_string(),
239 };
240 let debug = format!("{msg:?}");
241 assert!(debug.contains("Error"));
242 assert!(debug.contains("test"));
243 }
244}