Skip to main content

laminar_connectors/websocket/
protocol.rs

1//! Subscription wire protocol for WebSocket sink server mode.
2//!
3//! Defines the JSON message types exchanged between WebSocket sink server
4//! and its connected clients. Messages are tagged with `"action"` (client)
5//! or `"type"` (server) for serde dispatch.
6
7use serde::{Deserialize, Serialize};
8
9use super::sink_config::SinkFormat;
10
11/// Messages sent from a client to the WebSocket sink server.
12///
13/// Clients use these messages to manage subscriptions and send
14/// application-level pings. The `action` field in the JSON object
15/// determines which variant is deserialized.
16#[derive(Debug, Deserialize)]
17#[serde(tag = "action")]
18pub enum ClientMessage {
19    /// Subscribe to streaming query results.
20    #[serde(rename = "subscribe")]
21    Subscribe {
22        /// Optional filter expression to apply server-side.
23        filter: Option<String>,
24        /// If provided, server attempts to replay from this sequence.
25        last_sequence: Option<u64>,
26        /// Preferred serialization format for data messages.
27        format: Option<SinkFormat>,
28    },
29    /// Unsubscribe from an active subscription.
30    #[serde(rename = "unsubscribe")]
31    Unsubscribe {
32        /// The subscription ID returned in the `Subscribed` response.
33        subscription_id: String,
34    },
35    /// Client ping (application-level keepalive).
36    #[serde(rename = "ping")]
37    Ping,
38}
39
40/// Messages sent from the WebSocket sink server to clients.
41///
42/// The `type` field in the JSON object determines which variant is
43/// serialized. Data messages include monotonically increasing sequence
44/// numbers for client-side gap detection.
45#[derive(Debug, Clone, Serialize)]
46#[serde(tag = "type")]
47pub enum ServerMessage {
48    /// Data delivery containing query results.
49    #[serde(rename = "data")]
50    Data {
51        /// Subscription this data belongs to.
52        subscription_id: String,
53        /// The payload (format depends on client's requested `SinkFormat`).
54        data: serde_json::Value,
55        /// Monotonically increasing sequence number for gap detection.
56        sequence: u64,
57        /// Current watermark (epoch millis), if available.
58        watermark: Option<i64>,
59    },
60    /// Subscription confirmed -- sent after a successful `Subscribe` request.
61    #[serde(rename = "subscribed")]
62    Subscribed {
63        /// Unique identifier for this subscription.
64        subscription_id: String,
65    },
66    /// Error message sent to the client.
67    #[serde(rename = "error")]
68    Error {
69        /// Human-readable error description.
70        message: String,
71    },
72    /// Periodic heartbeat from the server.
73    #[serde(rename = "heartbeat")]
74    Heartbeat {
75        /// Server timestamp (epoch millis).
76        server_time: i64,
77    },
78    /// Backpressure warning indicating the server's send buffer is filling up.
79    #[serde(rename = "backpressure")]
80    BackpressureWarning {
81        /// Percentage of the send buffer currently used (0-100).
82        buffer_pct: u8,
83    },
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    // ── ClientMessage tests ──
91
92    #[test]
93    fn test_deserialize_subscribe_full() {
94        let json = r#"{
95            "action": "subscribe",
96            "filter": "symbol = 'AAPL'",
97            "last_sequence": 42,
98            "format": "Json"
99        }"#;
100        let msg: ClientMessage = serde_json::from_str(json).unwrap();
101        if let ClientMessage::Subscribe {
102            filter,
103            last_sequence,
104            format,
105        } = msg
106        {
107            assert_eq!(filter.as_deref(), Some("symbol = 'AAPL'"));
108            assert_eq!(last_sequence, Some(42));
109            assert!(format.is_some());
110        } else {
111            panic!("expected Subscribe variant");
112        }
113    }
114
115    #[test]
116    fn test_deserialize_subscribe_minimal() {
117        let json = r#"{"action": "subscribe"}"#;
118        let msg: ClientMessage = serde_json::from_str(json).unwrap();
119        if let ClientMessage::Subscribe {
120            filter,
121            last_sequence,
122            format,
123        } = msg
124        {
125            assert!(filter.is_none());
126            assert!(last_sequence.is_none());
127            assert!(format.is_none());
128        } else {
129            panic!("expected Subscribe variant");
130        }
131    }
132
133    #[test]
134    fn test_deserialize_unsubscribe() {
135        let json = r#"{"action": "unsubscribe", "subscription_id": "sub-123"}"#;
136        let msg: ClientMessage = serde_json::from_str(json).unwrap();
137        if let ClientMessage::Unsubscribe { subscription_id } = msg {
138            assert_eq!(subscription_id, "sub-123");
139        } else {
140            panic!("expected Unsubscribe variant");
141        }
142    }
143
144    #[test]
145    fn test_deserialize_ping() {
146        let json = r#"{"action": "ping"}"#;
147        let msg: ClientMessage = serde_json::from_str(json).unwrap();
148        assert!(matches!(msg, ClientMessage::Ping));
149    }
150
151    #[test]
152    fn test_deserialize_unknown_action_fails() {
153        let json = r#"{"action": "unknown_action"}"#;
154        let result = serde_json::from_str::<ClientMessage>(json);
155        assert!(result.is_err());
156    }
157
158    // ── ServerMessage tests ──
159
160    #[test]
161    fn test_serialize_data() {
162        let msg = ServerMessage::Data {
163            subscription_id: "sub-1".to_string(),
164            data: serde_json::json!({"price": 150.5}),
165            sequence: 100,
166            watermark: Some(1_700_000_000_000),
167        };
168        let json = serde_json::to_value(&msg).unwrap();
169        assert_eq!(json["type"], "data");
170        assert_eq!(json["subscription_id"], "sub-1");
171        assert_eq!(json["sequence"], 100);
172        assert_eq!(json["watermark"], 1_700_000_000_000_i64);
173        assert_eq!(json["data"]["price"], 150.5);
174    }
175
176    #[test]
177    fn test_serialize_data_no_watermark() {
178        let msg = ServerMessage::Data {
179            subscription_id: "sub-2".to_string(),
180            data: serde_json::json!("hello"),
181            sequence: 1,
182            watermark: None,
183        };
184        let json = serde_json::to_value(&msg).unwrap();
185        assert_eq!(json["type"], "data");
186        assert!(json["watermark"].is_null());
187    }
188
189    #[test]
190    fn test_serialize_subscribed() {
191        let msg = ServerMessage::Subscribed {
192            subscription_id: "sub-abc".to_string(),
193        };
194        let json = serde_json::to_value(&msg).unwrap();
195        assert_eq!(json["type"], "subscribed");
196        assert_eq!(json["subscription_id"], "sub-abc");
197    }
198
199    #[test]
200    fn test_serialize_error() {
201        let msg = ServerMessage::Error {
202            message: "invalid filter expression".to_string(),
203        };
204        let json = serde_json::to_value(&msg).unwrap();
205        assert_eq!(json["type"], "error");
206        assert_eq!(json["message"], "invalid filter expression");
207    }
208
209    #[test]
210    fn test_serialize_heartbeat() {
211        let msg = ServerMessage::Heartbeat {
212            server_time: 1_700_000_000_000,
213        };
214        let json = serde_json::to_value(&msg).unwrap();
215        assert_eq!(json["type"], "heartbeat");
216        assert_eq!(json["server_time"], 1_700_000_000_000_i64);
217    }
218
219    #[test]
220    fn test_serialize_backpressure_warning() {
221        let msg = ServerMessage::BackpressureWarning { buffer_pct: 85 };
222        let json = serde_json::to_value(&msg).unwrap();
223        assert_eq!(json["type"], "backpressure");
224        assert_eq!(json["buffer_pct"], 85);
225    }
226
227    #[test]
228    fn test_server_message_clone() {
229        let msg = ServerMessage::Heartbeat { server_time: 12345 };
230        let cloned = msg.clone();
231        let json = serde_json::to_value(&cloned).unwrap();
232        assert_eq!(json["server_time"], 12345);
233    }
234
235    #[test]
236    fn test_server_message_debug() {
237        let msg = ServerMessage::Error {
238            message: "test".to_string(),
239        };
240        let debug = format!("{msg:?}");
241        assert!(debug.contains("Error"));
242        assert!(debug.contains("test"));
243    }
244}