laminar_connectors/websocket/
mod.rs1pub mod backpressure;
22pub mod checkpoint;
23pub mod connection;
24pub mod fanout;
25pub mod metrics;
26pub mod parser;
27pub mod protocol;
28pub mod serializer;
29pub mod sink;
30pub mod sink_client;
31pub mod sink_config;
32pub mod sink_metrics;
33pub mod source;
34pub mod source_config;
35pub mod source_server;
36
37pub use backpressure::BackpressureStrategy;
38pub use checkpoint::WebSocketSourceCheckpoint;
39pub use metrics::WebSocketSourceMetrics;
40pub use protocol::{ClientMessage, ServerMessage};
41pub use sink::WebSocketSinkServer;
42pub use sink_client::WebSocketSinkClient;
43pub use sink_config::{SinkFormat, SinkMode, SlowClientPolicy, WebSocketSinkConfig};
44pub use sink_metrics::WebSocketSinkMetrics;
45pub use source::WebSocketSource;
46pub use source_config::{
47 EventTimeFormat, MessageFormat, ReconnectConfig, SourceMode, WebSocketSourceConfig,
48 WsAuthConfig,
49};
50pub use source_server::WebSocketSourceServer;
51
52use std::sync::Arc;
53
54use crate::config::{ConfigKeySpec, ConnectorInfo};
55use crate::registry::ConnectorRegistry;
56
57pub fn register_websocket_source(registry: &ConnectorRegistry) {
62 let info = ConnectorInfo {
63 name: "websocket".to_string(),
64 display_name: "WebSocket Source".to_string(),
65 version: env!("CARGO_PKG_VERSION").to_string(),
66 is_source: true,
67 is_sink: false,
68 config_keys: websocket_source_config_keys(),
69 };
70
71 registry.register_source(
72 "websocket",
73 info,
74 Arc::new(|| {
75 use arrow_schema::{DataType, Field, Schema};
76
77 let default_schema = Arc::new(Schema::new(vec![
78 Field::new("key", DataType::Utf8, true),
79 Field::new("value", DataType::Utf8, false),
80 ]));
81 Box::new(WebSocketSource::new(
82 default_schema,
83 WebSocketSourceConfig::default(),
84 ))
85 }),
86 );
87}
88
89pub fn register_websocket_sink(registry: &ConnectorRegistry) {
94 let info = ConnectorInfo {
95 name: "websocket".to_string(),
96 display_name: "WebSocket Sink".to_string(),
97 version: env!("CARGO_PKG_VERSION").to_string(),
98 is_source: false,
99 is_sink: true,
100 config_keys: websocket_sink_config_keys(),
101 };
102
103 registry.register_sink(
104 "websocket",
105 info,
106 Arc::new(|| {
107 use arrow_schema::{DataType, Field, Schema};
108
109 let default_schema = Arc::new(Schema::new(vec![
110 Field::new("key", DataType::Utf8, true),
111 Field::new("value", DataType::Utf8, false),
112 ]));
113 Box::new(WebSocketSinkServer::new(
114 default_schema,
115 WebSocketSinkConfig::default(),
116 ))
117 }),
118 );
119}
120
121fn websocket_source_config_keys() -> Vec<ConfigKeySpec> {
122 vec![
123 ConfigKeySpec::required("url", "WebSocket URL to connect to (ws:// or wss://)"),
124 ConfigKeySpec::optional("mode", "Operating mode (client/server)", "client"),
125 ConfigKeySpec::optional("format", "Message format (json/csv/binary)", "json"),
126 ConfigKeySpec::optional(
127 "subscribe.message",
128 "JSON subscription message to send after handshake",
129 "",
130 ),
131 ConfigKeySpec::optional("reconnect.enabled", "Enable automatic reconnection", "true"),
132 ConfigKeySpec::optional(
133 "reconnect.initial.delay.ms",
134 "Initial reconnect delay in ms",
135 "100",
136 ),
137 ConfigKeySpec::optional(
138 "reconnect.max.delay.ms",
139 "Maximum reconnect delay in ms",
140 "30000",
141 ),
142 ConfigKeySpec::optional("ping.interval.ms", "WebSocket ping interval in ms", "30000"),
143 ConfigKeySpec::optional("ping.timeout.ms", "Pong reply timeout in ms", "10000"),
144 ConfigKeySpec::optional("bind.address", "Socket address for server mode", ""),
145 ConfigKeySpec::optional(
146 "max.connections",
147 "Max concurrent connections (server mode)",
148 "1024",
149 ),
150 ConfigKeySpec::optional(
151 "on.backpressure",
152 "Backpressure strategy (block/drop)",
153 "block",
154 ),
155 ConfigKeySpec::optional(
156 "max.message.size",
157 "Max WebSocket message size in bytes",
158 "67108864",
159 ),
160 ConfigKeySpec::optional(
161 "event.time.field",
162 "JSON field path for event time extraction",
163 "",
164 ),
165 ConfigKeySpec::optional(
166 "event.time.format",
167 "Event time format (epoch_millis/iso8601)",
168 "",
169 ),
170 ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
171 ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
172 ]
173}
174
175fn websocket_sink_config_keys() -> Vec<ConfigKeySpec> {
176 vec![
177 ConfigKeySpec::required(
178 "bind.address",
179 "Socket address to bind (e.g., 0.0.0.0:8080)",
180 ),
181 ConfigKeySpec::optional("mode", "Operating mode (server/client)", "server"),
182 ConfigKeySpec::optional(
183 "format",
184 "Serialization format (json/jsonlines/arrow_ipc)",
185 "json",
186 ),
187 ConfigKeySpec::optional(
188 "max.connections",
189 "Max concurrent client connections",
190 "10000",
191 ),
192 ConfigKeySpec::optional(
193 "per.client.buffer",
194 "Per-client send buffer in bytes",
195 "262144",
196 ),
197 ConfigKeySpec::optional(
198 "slow.client.policy",
199 "Slow client policy (drop_oldest/disconnect)",
200 "drop_oldest",
201 ),
202 ConfigKeySpec::optional("ping.interval.ms", "Ping interval in ms", "30000"),
203 ConfigKeySpec::optional("ping.timeout.ms", "Pong timeout in ms", "10000"),
204 ConfigKeySpec::optional(
205 "replay.buffer.size",
206 "Messages to buffer for late joiners",
207 "",
208 ),
209 ConfigKeySpec::optional("url", "WebSocket URL for client mode", ""),
210 ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
211 ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
212 ]
213}