laminar_connectors/websocket/
mod.rs1pub mod backpressure;
9pub mod checkpoint;
10pub mod connection;
11pub mod fanout;
12pub mod metrics;
13pub mod parser;
14pub mod protocol;
15pub mod serializer;
16pub mod sink;
17pub mod sink_client;
18pub mod sink_config;
19pub mod sink_metrics;
20pub mod source;
21pub mod source_config;
22pub mod source_server;
23
24pub use backpressure::WsBackpressure;
25pub use checkpoint::WebSocketSourceCheckpoint;
26pub use metrics::WebSocketSourceMetrics;
27pub use protocol::{ClientMessage, ServerMessage};
28pub use sink::WebSocketSinkServer;
29pub use sink_client::WebSocketSinkClient;
30pub use sink_config::{SinkFormat, SinkMode, SlowClientPolicy, WebSocketSinkConfig};
31pub use sink_metrics::WebSocketSinkMetrics;
32pub use source::WebSocketSource;
33pub use source_config::{
34 EventTimeFormat, MessageFormat, ReconnectConfig, SourceMode, WebSocketSourceConfig,
35 WsAuthConfig,
36};
37pub use source_server::WebSocketSourceServer;
38
39use std::sync::Arc;
40
41use crate::config::{ConfigKeySpec, ConnectorInfo};
42use crate::registry::ConnectorRegistry;
43
44pub fn register_websocket_source(registry: &ConnectorRegistry) {
49 let info = ConnectorInfo {
50 name: "websocket".to_string(),
51 display_name: "WebSocket Source".to_string(),
52 version: env!("CARGO_PKG_VERSION").to_string(),
53 is_source: true,
54 is_sink: false,
55 config_keys: websocket_source_config_keys(),
56 };
57
58 registry.register_source(
59 "websocket",
60 info,
61 Arc::new(|registry: Option<&prometheus::Registry>| {
62 use arrow_schema::{DataType, Field, Schema};
63
64 let default_schema = Arc::new(Schema::new(vec![
65 Field::new("key", DataType::Utf8, true),
66 Field::new("value", DataType::Utf8, false),
67 ]));
68 Box::new(WebSocketSource::new(
69 default_schema,
70 WebSocketSourceConfig::default(),
71 registry,
72 ))
73 }),
74 );
75}
76
77pub fn register_websocket_sink(registry: &ConnectorRegistry) {
82 let info = ConnectorInfo {
83 name: "websocket".to_string(),
84 display_name: "WebSocket Sink".to_string(),
85 version: env!("CARGO_PKG_VERSION").to_string(),
86 is_source: false,
87 is_sink: true,
88 config_keys: websocket_sink_config_keys(),
89 };
90
91 registry.register_sink(
92 "websocket",
93 info,
94 Arc::new(|registry: Option<&prometheus::Registry>| {
95 use arrow_schema::{DataType, Field, Schema};
96
97 let default_schema = Arc::new(Schema::new(vec![
98 Field::new("key", DataType::Utf8, true),
99 Field::new("value", DataType::Utf8, false),
100 ]));
101 Box::new(WebSocketSinkServer::new(
102 default_schema,
103 WebSocketSinkConfig::default(),
104 registry,
105 ))
106 }),
107 );
108}
109
110fn websocket_source_config_keys() -> Vec<ConfigKeySpec> {
111 vec![
112 ConfigKeySpec::required("url", "WebSocket URL to connect to (ws:// or wss://)"),
113 ConfigKeySpec::optional("mode", "Operating mode (client/server)", "client"),
114 ConfigKeySpec::optional("format", "Message format (json/csv/binary)", "json"),
115 ConfigKeySpec::optional(
116 "subscribe.message",
117 "JSON subscription message to send after handshake",
118 "",
119 ),
120 ConfigKeySpec::optional("reconnect.enabled", "Enable automatic reconnection", "true"),
121 ConfigKeySpec::optional(
122 "reconnect.initial.delay.ms",
123 "Initial reconnect delay in ms",
124 "100",
125 ),
126 ConfigKeySpec::optional(
127 "reconnect.max.delay.ms",
128 "Maximum reconnect delay in ms",
129 "30000",
130 ),
131 ConfigKeySpec::optional("ping.interval.ms", "WebSocket ping interval in ms", "30000"),
132 ConfigKeySpec::optional("ping.timeout.ms", "Pong reply timeout in ms", "10000"),
133 ConfigKeySpec::optional("bind.address", "Socket address for server mode", ""),
134 ConfigKeySpec::optional(
135 "max.connections",
136 "Max concurrent connections (server mode)",
137 "1024",
138 ),
139 ConfigKeySpec::optional(
140 "on.backpressure",
141 "Backpressure strategy (block/drop)",
142 "block",
143 ),
144 ConfigKeySpec::optional(
145 "max.message.size",
146 "Max WebSocket message size in bytes",
147 "67108864",
148 ),
149 ConfigKeySpec::optional(
150 "event.time.field",
151 "JSON field path for event time extraction",
152 "",
153 ),
154 ConfigKeySpec::optional(
155 "event.time.format",
156 "Event time format (epoch_millis/iso8601)",
157 "",
158 ),
159 ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
160 ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
161 ]
162}
163
164fn websocket_sink_config_keys() -> Vec<ConfigKeySpec> {
165 vec![
166 ConfigKeySpec::required(
167 "bind.address",
168 "Socket address to bind (e.g., 0.0.0.0:8080)",
169 ),
170 ConfigKeySpec::optional("mode", "Operating mode (server/client)", "server"),
171 ConfigKeySpec::optional(
172 "format",
173 "Serialization format (json/jsonlines/arrow_ipc)",
174 "json",
175 ),
176 ConfigKeySpec::optional(
177 "max.connections",
178 "Max concurrent client connections",
179 "10000",
180 ),
181 ConfigKeySpec::optional(
182 "per.client.buffer",
183 "Per-client send buffer in bytes",
184 "262144",
185 ),
186 ConfigKeySpec::optional(
187 "slow.client.policy",
188 "Slow client policy (drop_oldest/disconnect)",
189 "drop_oldest",
190 ),
191 ConfigKeySpec::optional("ping.interval.ms", "Ping interval in ms", "30000"),
192 ConfigKeySpec::optional("ping.timeout.ms", "Pong timeout in ms", "10000"),
193 ConfigKeySpec::optional(
194 "replay.buffer.size",
195 "Messages to buffer for late joiners",
196 "",
197 ),
198 ConfigKeySpec::optional("url", "WebSocket URL for client mode", ""),
199 ConfigKeySpec::optional("auth.type", "Authentication type (bearer/basic/hmac)", ""),
200 ConfigKeySpec::optional("auth.token", "Bearer token for authentication", ""),
201 ]
202}