laminar_connectors/schema/
types.rs1#![allow(clippy::disallowed_types)] use std::any::Any;
15use std::collections::HashMap;
16
17use crate::config::ConnectorConfig;
18use crate::serde::Format;
19
20#[derive(Debug, Clone)]
25pub struct RawRecord {
26 pub key: Option<Vec<u8>>,
28
29 pub value: Vec<u8>,
31
32 pub timestamp: Option<i64>,
34
35 pub headers: HashMap<String, Vec<u8>>,
37
38 pub metadata: SourceMetadata,
40}
41
42impl RawRecord {
43 #[must_use]
45 pub fn new(value: Vec<u8>) -> Self {
46 Self {
47 key: None,
48 value,
49 timestamp: None,
50 headers: HashMap::new(),
51 metadata: SourceMetadata::empty(),
52 }
53 }
54
55 #[must_use]
57 pub fn with_key(mut self, key: Vec<u8>) -> Self {
58 self.key = Some(key);
59 self
60 }
61
62 #[must_use]
64 pub fn with_timestamp(mut self, ts: i64) -> Self {
65 self.timestamp = Some(ts);
66 self
67 }
68
69 #[must_use]
71 pub fn with_header(mut self, key: impl Into<String>, value: Vec<u8>) -> Self {
72 self.headers.insert(key.into(), value);
73 self
74 }
75
76 #[must_use]
78 pub fn with_metadata(mut self, metadata: SourceMetadata) -> Self {
79 self.metadata = metadata;
80 self
81 }
82}
83
84pub struct SourceMetadata {
89 inner: Option<Box<dyn Any + Send + Sync>>,
90}
91
92impl SourceMetadata {
93 #[must_use]
95 pub fn empty() -> Self {
96 Self { inner: None }
97 }
98
99 pub fn new<T: Any + Send + Sync>(value: T) -> Self {
101 Self {
102 inner: Some(Box::new(value)),
103 }
104 }
105
106 #[must_use]
108 pub fn is_empty(&self) -> bool {
109 self.inner.is_none()
110 }
111
112 #[must_use]
114 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
115 self.inner.as_ref()?.downcast_ref::<T>()
116 }
117}
118
119impl std::fmt::Debug for SourceMetadata {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 if self.inner.is_some() {
122 write!(f, "SourceMetadata(<opaque>)")
123 } else {
124 write!(f, "SourceMetadata(empty)")
125 }
126 }
127}
128
129impl Clone for SourceMetadata {
130 fn clone(&self) -> Self {
131 Self::empty()
133 }
134}
135
136#[derive(Debug, Clone, Default)]
141pub struct FieldMeta {
142 pub field_id: Option<u32>,
144
145 pub description: Option<String>,
147
148 pub source_type: Option<String>,
150
151 pub default_expr: Option<String>,
153
154 pub properties: HashMap<String, String>,
156}
157
158impl FieldMeta {
159 #[must_use]
161 pub fn new() -> Self {
162 Self::default()
163 }
164
165 #[must_use]
167 pub fn with_field_id(mut self, id: u32) -> Self {
168 self.field_id = Some(id);
169 self
170 }
171
172 #[must_use]
174 pub fn with_description(mut self, desc: impl Into<String>) -> Self {
175 self.description = Some(desc.into());
176 self
177 }
178
179 #[must_use]
181 pub fn with_source_type(mut self, src_type: impl Into<String>) -> Self {
182 self.source_type = Some(src_type.into());
183 self
184 }
185
186 #[must_use]
188 pub fn with_default(mut self, expr: impl Into<String>) -> Self {
189 self.default_expr = Some(expr.into());
190 self
191 }
192
193 #[must_use]
195 pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
196 self.properties.insert(key.into(), value.into());
197 self
198 }
199}
200
201#[derive(Debug, Clone)]
206pub struct SourceConfig {
207 pub connector_type: String,
209
210 pub format: Format,
212
213 pub options: HashMap<String, String>,
215}
216
217impl SourceConfig {
218 #[must_use]
220 pub fn new(connector_type: impl Into<String>, format: Format) -> Self {
221 Self {
222 connector_type: connector_type.into(),
223 format,
224 options: HashMap::new(),
225 }
226 }
227
228 #[must_use]
230 pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
231 self.options.insert(key.into(), value.into());
232 self
233 }
234
235 #[must_use]
237 pub fn get_option(&self, key: &str) -> Option<&str> {
238 self.options.get(key).map(String::as_str)
239 }
240
241 #[must_use]
243 pub fn as_connector_config(&self) -> ConnectorConfig {
244 let mut props = self.options.clone();
245 props.insert("format".to_string(), self.format.to_string());
246 ConnectorConfig::with_properties(&self.connector_type, props)
247 }
248}
249
250#[derive(Debug, Clone)]
254pub struct SinkConfig {
255 pub connector_type: String,
257
258 pub format: Format,
260
261 pub options: HashMap<String, String>,
263}
264
265impl SinkConfig {
266 #[must_use]
268 pub fn new(connector_type: impl Into<String>, format: Format) -> Self {
269 Self {
270 connector_type: connector_type.into(),
271 format,
272 options: HashMap::new(),
273 }
274 }
275
276 #[must_use]
278 pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
279 self.options.insert(key.into(), value.into());
280 self
281 }
282
283 #[must_use]
285 pub fn get_option(&self, key: &str) -> Option<&str> {
286 self.options.get(key).map(String::as_str)
287 }
288
289 #[must_use]
291 pub fn as_connector_config(&self) -> ConnectorConfig {
292 let mut props = self.options.clone();
293 props.insert("format".to_string(), self.format.to_string());
294 ConnectorConfig::with_properties(&self.connector_type, props)
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn test_raw_record_builder() {
304 let record = RawRecord::new(b"hello".to_vec())
305 .with_key(b"k1".to_vec())
306 .with_timestamp(1000)
307 .with_header("content-type", b"application/json".to_vec());
308
309 assert_eq!(record.key.as_deref(), Some(b"k1".as_slice()));
310 assert_eq!(record.value, b"hello");
311 assert_eq!(record.timestamp, Some(1000));
312 assert!(record.headers.contains_key("content-type"));
313 }
314
315 #[test]
316 fn test_source_metadata_empty() {
317 let meta = SourceMetadata::empty();
318 assert!(meta.is_empty());
319 assert!(meta.downcast_ref::<String>().is_none());
320 }
321
322 #[test]
323 fn test_source_metadata_typed() {
324 let meta = SourceMetadata::new(42u64);
325 assert!(!meta.is_empty());
326 assert_eq!(meta.downcast_ref::<u64>(), Some(&42u64));
327 assert!(meta.downcast_ref::<String>().is_none());
328 }
329
330 #[test]
331 fn test_source_metadata_debug() {
332 let empty = SourceMetadata::empty();
333 assert!(format!("{empty:?}").contains("empty"));
334
335 let full = SourceMetadata::new("data");
336 assert!(format!("{full:?}").contains("opaque"));
337 }
338
339 #[test]
340 fn test_field_meta_builder() {
341 let meta = FieldMeta::new()
342 .with_field_id(1)
343 .with_description("User ID")
344 .with_source_type("BIGINT")
345 .with_default("0")
346 .with_property("pii", "true");
347
348 assert_eq!(meta.field_id, Some(1));
349 assert_eq!(meta.description.as_deref(), Some("User ID"));
350 assert_eq!(meta.source_type.as_deref(), Some("BIGINT"));
351 assert_eq!(meta.default_expr.as_deref(), Some("0"));
352 assert_eq!(meta.properties.get("pii").map(String::as_str), Some("true"));
353 }
354
355 #[test]
356 fn test_source_config() {
357 let cfg = SourceConfig::new("kafka", Format::Json)
358 .with_option("topic", "events")
359 .with_option("group.id", "my-group");
360
361 assert_eq!(cfg.connector_type, "kafka");
362 assert_eq!(cfg.format, Format::Json);
363 assert_eq!(cfg.get_option("topic"), Some("events"));
364
365 let cc = cfg.as_connector_config();
366 assert_eq!(cc.connector_type(), "kafka");
367 assert_eq!(cc.get("format"), Some("json"));
368 assert_eq!(cc.get("topic"), Some("events"));
369 }
370
371 #[test]
372 fn test_sink_config() {
373 let cfg = SinkConfig::new("postgres", Format::Json).with_option("table", "output");
374
375 assert_eq!(cfg.connector_type, "postgres");
376 assert_eq!(cfg.get_option("table"), Some("output"));
377
378 let cc = cfg.as_connector_config();
379 assert_eq!(cc.get("format"), Some("json"));
380 }
381}