Skip to main content

laminar_connectors/websocket/
mod.rs

1//! WebSocket source/sink connectors. Four modes: source-client (connect
2//! to a WS server), source-server (listen for clients), sink-server (fan
3//! out results to subscribers), sink-client (push to an external server).
4//!
5//! WebSocket is non-replayable — source connectors are at-most-once / best-effort;
6//! sinks have a bounded, best-effort replay buffer.
7
8pub mod backpressure;
9pub mod checkpoint;
10pub mod connection;
11pub mod fanout;
12pub mod metrics;
13pub mod parser;
14pub mod protocol;
15pub mod serializer;
16pub mod sink;
17pub mod sink_client;
18pub mod sink_config;
19pub mod sink_metrics;
20pub mod source;
21pub mod source_config;
22pub mod source_server;
23
24pub use backpressure::WsBackpressure;
25pub use checkpoint::WebSocketSourceCheckpoint;
26pub use metrics::WebSocketSourceMetrics;
27pub use protocol::{ClientMessage, ServerMessage};
28pub use sink::WebSocketSinkServer;
29pub use sink_client::WebSocketSinkClient;
30pub use sink_config::{SinkFormat, SinkMode, SlowClientPolicy, WebSocketSinkConfig};
31pub use sink_metrics::WebSocketSinkMetrics;
32pub use source::WebSocketSource;
33pub use source_config::{
34    EventTimeFormat, MessageFormat, ReconnectConfig, SourceMode, WebSocketSourceConfig,
35    WsAuthConfig,
36};
37pub use source_server::WebSocketSourceServer;
38
39use std::sync::Arc;
40
41use crate::config::{ConfigKeySpec, ConnectorInfo};
42use crate::registry::ConnectorRegistry;
43
44/// Registers the WebSocket source connector with the given registry.
45///
46/// After registration, the runtime can instantiate `WebSocketSource` by
47/// name when processing `CREATE SOURCE ... WITH (connector = 'websocket')`.
48pub fn register_websocket_source(registry: &ConnectorRegistry) {
49    let info = ConnectorInfo {
50        name: "websocket".to_string(),
51        display_name: "WebSocket Source".to_string(),
52        version: env!("CARGO_PKG_VERSION").to_string(),
53        is_source: true,
54        is_sink: false,
55        config_keys: websocket_source_config_keys(),
56    };
57
58    registry.register_source(
59        "websocket",
60        info,
61        Arc::new(|registry: Option<&prometheus::Registry>| {
62            use arrow_schema::{DataType, Field, Schema};
63
64            let default_schema = Arc::new(Schema::new(vec![
65                Field::new("key", DataType::Utf8, true),
66                Field::new("value", DataType::Utf8, false),
67            ]));
68            Box::new(WebSocketSource::new(
69                default_schema,
70                WebSocketSourceConfig::default(),
71                registry,
72            ))
73        }),
74    );
75}
76
77/// Registers the WebSocket sink connector with the given registry.
78///
79/// After registration, the runtime can instantiate `WebSocketSinkServer` by
80/// name when processing `CREATE SINK ... WITH (connector = 'websocket')`.
81pub fn register_websocket_sink(registry: &ConnectorRegistry) {
82    let info = ConnectorInfo {
83        name: "websocket".to_string(),
84        display_name: "WebSocket Sink".to_string(),
85        version: env!("CARGO_PKG_VERSION").to_string(),
86        is_source: false,
87        is_sink: true,
88        config_keys: websocket_sink_config_keys(),
89    };
90
91    registry.register_sink(
92        "websocket",
93        info,
94        Arc::new(|registry: Option<&prometheus::Registry>| {
95            use arrow_schema::{DataType, Field, Schema};
96
97            let default_schema = Arc::new(Schema::new(vec![
98                Field::new("key", DataType::Utf8, true),
99                Field::new("value", DataType::Utf8, false),
100            ]));
101            Box::new(WebSocketSinkServer::new(
102                default_schema,
103                WebSocketSinkConfig::default(),
104                registry,
105            ))
106        }),
107    );
108}
109
110fn websocket_source_config_keys() -> Vec<ConfigKeySpec> {
111    vec![
112        ConfigKeySpec::required("url", "WebSocket URL to connect to (ws:// or wss://)"),
113        ConfigKeySpec::optional("mode", "Operating mode (client/server)", "client"),
114        ConfigKeySpec::optional("format", "Message format (json/csv/binary)", "json"),
115        ConfigKeySpec::optional(
116            "subscribe.message",
117            "JSON subscription message to send after handshake",
118            "",
119        ),
120        ConfigKeySpec::optional("reconnect.enabled", "Enable automatic reconnection", "true"),
121        ConfigKeySpec::optional(
122            "reconnect.initial.delay.ms",
123            "Initial reconnect delay in ms",
124            "100",
125        ),
126        ConfigKeySpec::optional(
127            "reconnect.max.delay.ms",
128            "Maximum reconnect delay in ms",
129            "30000",
130        ),
131        ConfigKeySpec::optional("ping.interval.ms", "WebSocket ping interval in ms", "30000"),
132        ConfigKeySpec::optional("ping.timeout.ms", "Pong reply timeout in ms", "10000"),
133        ConfigKeySpec::optional("bind.address", "Socket address for server mode", ""),
134        ConfigKeySpec::optional(
135            "max.connections",
136            "Max concurrent connections (server mode)",
137            "1024",
138        ),
139        ConfigKeySpec::optional(
140            "on.backpressure",
141            "Backpressure strategy (block/drop)",
142            "block",
143        ),
144        ConfigKeySpec::optional(
145            "max.message.size",
146            "Max WebSocket message size in bytes",
147            "67108864",
148        ),
149        ConfigKeySpec::optional(
150            "event.time.field",
151            "JSON field path for event time extraction",
152            "",
153        ),
154        ConfigKeySpec::optional(
155            "event.time.format",
156            "Event time format (epoch_millis/iso8601)",
157            "",
158        ),
159        ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
160        ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
161    ]
162}
163
164fn websocket_sink_config_keys() -> Vec<ConfigKeySpec> {
165    vec![
166        ConfigKeySpec::required(
167            "bind.address",
168            "Socket address to bind (e.g., 0.0.0.0:8080)",
169        ),
170        ConfigKeySpec::optional("mode", "Operating mode (server/client)", "server"),
171        ConfigKeySpec::optional(
172            "format",
173            "Serialization format (json/jsonlines/arrow_ipc)",
174            "json",
175        ),
176        ConfigKeySpec::optional(
177            "max.connections",
178            "Max concurrent client connections",
179            "10000",
180        ),
181        ConfigKeySpec::optional(
182            "per.client.buffer",
183            "Per-client send buffer in bytes",
184            "262144",
185        ),
186        ConfigKeySpec::optional(
187            "slow.client.policy",
188            "Slow client policy (drop_oldest/disconnect)",
189            "drop_oldest",
190        ),
191        ConfigKeySpec::optional("ping.interval.ms", "Ping interval in ms", "30000"),
192        ConfigKeySpec::optional("ping.timeout.ms", "Pong timeout in ms", "10000"),
193        ConfigKeySpec::optional(
194            "replay.buffer.size",
195            "Messages to buffer for late joiners",
196            "",
197        ),
198        ConfigKeySpec::optional("url", "WebSocket URL for client mode", ""),
199        ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
200        ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
201    ]
202}