Skip to main content

laminar_connectors/websocket/
mod.rs

1//! WebSocket source and sink connectors for LaminarDB.
2//!
3//! Provides four connector modes:
4//! - **Source client**: Connects to an external WebSocket server (e.g., exchange feeds)
5//! - **Source server**: Listens for incoming WebSocket connections (e.g., `IoT` sensors)
6//! - **Sink server**: Fans out streaming query results to connected subscribers
7//! - **Sink client**: Pushes streaming query output to an external WebSocket server
8//!
9//! # Delivery Guarantees
10//!
11//! WebSocket is a **non-replayable** transport. Source connectors provide
12//! **at-most-once** or **best-effort** delivery — there are no offsets to
13//! seek to on recovery. Sink connectors optionally support a replay buffer
14//! for client-side resume, but this is bounded and best-effort.
15//!
16//! # Architecture
17//!
18//! All WebSocket I/O runs in Ring 2 (Tokio tasks). Parsed Arrow
19//! `RecordBatch` data crosses to Ring 0 via bounded channels.
20
21pub mod backpressure;
22pub mod checkpoint;
23pub mod connection;
24pub mod fanout;
25pub mod metrics;
26pub mod parser;
27pub mod protocol;
28pub mod serializer;
29pub mod sink;
30pub mod sink_client;
31pub mod sink_config;
32pub mod sink_metrics;
33pub mod source;
34pub mod source_config;
35pub mod source_server;
36
37pub use backpressure::BackpressureStrategy;
38pub use checkpoint::WebSocketSourceCheckpoint;
39pub use metrics::WebSocketSourceMetrics;
40pub use protocol::{ClientMessage, ServerMessage};
41pub use sink::WebSocketSinkServer;
42pub use sink_client::WebSocketSinkClient;
43pub use sink_config::{SinkFormat, SinkMode, SlowClientPolicy, WebSocketSinkConfig};
44pub use sink_metrics::WebSocketSinkMetrics;
45pub use source::WebSocketSource;
46pub use source_config::{
47    EventTimeFormat, MessageFormat, ReconnectConfig, SourceMode, WebSocketSourceConfig,
48    WsAuthConfig,
49};
50pub use source_server::WebSocketSourceServer;
51
52use std::sync::Arc;
53
54use crate::config::{ConfigKeySpec, ConnectorInfo};
55use crate::registry::ConnectorRegistry;
56
57/// Registers the WebSocket source connector with the given registry.
58///
59/// After registration, the runtime can instantiate `WebSocketSource` by
60/// name when processing `CREATE SOURCE ... WITH (connector = 'websocket')`.
61pub fn register_websocket_source(registry: &ConnectorRegistry) {
62    let info = ConnectorInfo {
63        name: "websocket".to_string(),
64        display_name: "WebSocket Source".to_string(),
65        version: env!("CARGO_PKG_VERSION").to_string(),
66        is_source: true,
67        is_sink: false,
68        config_keys: websocket_source_config_keys(),
69    };
70
71    registry.register_source(
72        "websocket",
73        info,
74        Arc::new(|| {
75            use arrow_schema::{DataType, Field, Schema};
76
77            let default_schema = Arc::new(Schema::new(vec![
78                Field::new("key", DataType::Utf8, true),
79                Field::new("value", DataType::Utf8, false),
80            ]));
81            Box::new(WebSocketSource::new(
82                default_schema,
83                WebSocketSourceConfig::default(),
84            ))
85        }),
86    );
87}
88
89/// Registers the WebSocket sink connector with the given registry.
90///
91/// After registration, the runtime can instantiate `WebSocketSinkServer` by
92/// name when processing `CREATE SINK ... WITH (connector = 'websocket')`.
93pub fn register_websocket_sink(registry: &ConnectorRegistry) {
94    let info = ConnectorInfo {
95        name: "websocket".to_string(),
96        display_name: "WebSocket Sink".to_string(),
97        version: env!("CARGO_PKG_VERSION").to_string(),
98        is_source: false,
99        is_sink: true,
100        config_keys: websocket_sink_config_keys(),
101    };
102
103    registry.register_sink(
104        "websocket",
105        info,
106        Arc::new(|| {
107            use arrow_schema::{DataType, Field, Schema};
108
109            let default_schema = Arc::new(Schema::new(vec![
110                Field::new("key", DataType::Utf8, true),
111                Field::new("value", DataType::Utf8, false),
112            ]));
113            Box::new(WebSocketSinkServer::new(
114                default_schema,
115                WebSocketSinkConfig::default(),
116            ))
117        }),
118    );
119}
120
121fn websocket_source_config_keys() -> Vec<ConfigKeySpec> {
122    vec![
123        ConfigKeySpec::required("url", "WebSocket URL to connect to (ws:// or wss://)"),
124        ConfigKeySpec::optional("mode", "Operating mode (client/server)", "client"),
125        ConfigKeySpec::optional("format", "Message format (json/csv/binary)", "json"),
126        ConfigKeySpec::optional(
127            "subscribe.message",
128            "JSON subscription message to send after handshake",
129            "",
130        ),
131        ConfigKeySpec::optional("reconnect.enabled", "Enable automatic reconnection", "true"),
132        ConfigKeySpec::optional(
133            "reconnect.initial.delay.ms",
134            "Initial reconnect delay in ms",
135            "100",
136        ),
137        ConfigKeySpec::optional(
138            "reconnect.max.delay.ms",
139            "Maximum reconnect delay in ms",
140            "30000",
141        ),
142        ConfigKeySpec::optional("ping.interval.ms", "WebSocket ping interval in ms", "30000"),
143        ConfigKeySpec::optional("ping.timeout.ms", "Pong reply timeout in ms", "10000"),
144        ConfigKeySpec::optional("bind.address", "Socket address for server mode", ""),
145        ConfigKeySpec::optional(
146            "max.connections",
147            "Max concurrent connections (server mode)",
148            "1024",
149        ),
150        ConfigKeySpec::optional(
151            "on.backpressure",
152            "Backpressure strategy (block/drop)",
153            "block",
154        ),
155        ConfigKeySpec::optional(
156            "max.message.size",
157            "Max WebSocket message size in bytes",
158            "67108864",
159        ),
160        ConfigKeySpec::optional(
161            "event.time.field",
162            "JSON field path for event time extraction",
163            "",
164        ),
165        ConfigKeySpec::optional(
166            "event.time.format",
167            "Event time format (epoch_millis/iso8601)",
168            "",
169        ),
170        ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
171        ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
172    ]
173}
174
175fn websocket_sink_config_keys() -> Vec<ConfigKeySpec> {
176    vec![
177        ConfigKeySpec::required(
178            "bind.address",
179            "Socket address to bind (e.g., 0.0.0.0:8080)",
180        ),
181        ConfigKeySpec::optional("mode", "Operating mode (server/client)", "server"),
182        ConfigKeySpec::optional(
183            "format",
184            "Serialization format (json/jsonlines/arrow_ipc)",
185            "json",
186        ),
187        ConfigKeySpec::optional(
188            "max.connections",
189            "Max concurrent client connections",
190            "10000",
191        ),
192        ConfigKeySpec::optional(
193            "per.client.buffer",
194            "Per-client send buffer in bytes",
195            "262144",
196        ),
197        ConfigKeySpec::optional(
198            "slow.client.policy",
199            "Slow client policy (drop_oldest/disconnect)",
200            "drop_oldest",
201        ),
202        ConfigKeySpec::optional("ping.interval.ms", "Ping interval in ms", "30000"),
203        ConfigKeySpec::optional("ping.timeout.ms", "Pong timeout in ms", "10000"),
204        ConfigKeySpec::optional(
205            "replay.buffer.size",
206            "Messages to buffer for late joiners",
207            "",
208        ),
209        ConfigKeySpec::optional("url", "WebSocket URL for client mode", ""),
210        ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
211        ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
212    ]
213}