Skip to main content

laminar_connectors/schema/
types.rs

1//! Schema types used across the connector framework.
2//!
3//! Defines the core data structures for schema inference, resolution,
4//! and connector configuration:
5//!
6//! - [`RawRecord`]: A raw record with key, value, timestamp, and headers
7//! - [`SourceMetadata`]: Type-erased metadata from a source connector
8//! - [`FieldMeta`]: Per-field metadata for schema annotations
9//! - [`SourceConfig`]: Configuration for a source connector (schema module)
10//! - [`SinkConfig`]: Configuration for a sink connector (schema module)
11
12#![allow(clippy::disallowed_types)] // cold path: schema management
13
14use std::any::Any;
15use std::collections::HashMap;
16
17use crate::config::ConnectorConfig;
18use crate::serde::Format;
19
20/// A raw record read from a source before schema application.
21///
22/// Carries the key, value, optional timestamp, headers, and arbitrary
23/// source-specific metadata.
24#[derive(Debug, Clone)]
25pub struct RawRecord {
26    /// Optional record key (e.g., Kafka message key).
27    pub key: Option<Vec<u8>>,
28
29    /// Record value (payload bytes).
30    pub value: Vec<u8>,
31
32    /// Optional event-time timestamp in milliseconds since epoch.
33    pub timestamp: Option<i64>,
34
35    /// Optional key-value headers (e.g., Kafka headers).
36    pub headers: HashMap<String, Vec<u8>>,
37
38    /// Source-specific metadata (e.g., partition, offset, topic).
39    pub metadata: SourceMetadata,
40}
41
42impl RawRecord {
43    /// Creates a new raw record with only a value.
44    #[must_use]
45    pub fn new(value: Vec<u8>) -> Self {
46        Self {
47            key: None,
48            value,
49            timestamp: None,
50            headers: HashMap::new(),
51            metadata: SourceMetadata::empty(),
52        }
53    }
54
55    /// Sets the record key.
56    #[must_use]
57    pub fn with_key(mut self, key: Vec<u8>) -> Self {
58        self.key = Some(key);
59        self
60    }
61
62    /// Sets the event-time timestamp.
63    #[must_use]
64    pub fn with_timestamp(mut self, ts: i64) -> Self {
65        self.timestamp = Some(ts);
66        self
67    }
68
69    /// Adds a header.
70    #[must_use]
71    pub fn with_header(mut self, key: impl Into<String>, value: Vec<u8>) -> Self {
72        self.headers.insert(key.into(), value);
73        self
74    }
75
76    /// Sets the source metadata.
77    #[must_use]
78    pub fn with_metadata(mut self, metadata: SourceMetadata) -> Self {
79        self.metadata = metadata;
80        self
81    }
82}
83
84/// Type-erased metadata from a source connector.
85///
86/// Wraps a `Box<dyn Any + Send + Sync>` to allow connectors to attach
87/// arbitrary metadata (e.g., Kafka offset, CDC LSN) to raw records.
88pub struct SourceMetadata {
89    inner: Option<Box<dyn Any + Send + Sync>>,
90}
91
92impl SourceMetadata {
93    /// Creates empty metadata.
94    #[must_use]
95    pub fn empty() -> Self {
96        Self { inner: None }
97    }
98
99    /// Creates metadata from a typed value.
100    pub fn new<T: Any + Send + Sync>(value: T) -> Self {
101        Self {
102            inner: Some(Box::new(value)),
103        }
104    }
105
106    /// Returns `true` if no metadata is present.
107    #[must_use]
108    pub fn is_empty(&self) -> bool {
109        self.inner.is_none()
110    }
111
112    /// Attempts to downcast the metadata to a concrete type.
113    #[must_use]
114    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
115        self.inner.as_ref()?.downcast_ref::<T>()
116    }
117}
118
119impl std::fmt::Debug for SourceMetadata {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        if self.inner.is_some() {
122            write!(f, "SourceMetadata(<opaque>)")
123        } else {
124            write!(f, "SourceMetadata(empty)")
125        }
126    }
127}
128
129impl Clone for SourceMetadata {
130    fn clone(&self) -> Self {
131        // Metadata is not cloneable in general; cloning produces empty metadata.
132        Self::empty()
133    }
134}
135
136/// Per-field metadata for schema annotations.
137///
138/// Provides additional information about a field beyond what Arrow's
139/// `Field` captures (description, original source type, etc.).
140#[derive(Debug, Clone, Default)]
141pub struct FieldMeta {
142    /// Optional stable field identifier (for evolution tracking).
143    pub field_id: Option<u32>,
144
145    /// Human-readable description of the field.
146    pub description: Option<String>,
147
148    /// Original type name in the source system (e.g., `"VARCHAR(255)"`).
149    pub source_type: Option<String>,
150
151    /// Default expression if the field is missing (e.g., `"0"`, `"now()"`).
152    pub default_expr: Option<String>,
153
154    /// Arbitrary key-value properties.
155    pub properties: HashMap<String, String>,
156}
157
158impl FieldMeta {
159    /// Creates empty field metadata.
160    #[must_use]
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Sets the field ID.
166    #[must_use]
167    pub fn with_field_id(mut self, id: u32) -> Self {
168        self.field_id = Some(id);
169        self
170    }
171
172    /// Sets the description.
173    #[must_use]
174    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
175        self.description = Some(desc.into());
176        self
177    }
178
179    /// Sets the original source type.
180    #[must_use]
181    pub fn with_source_type(mut self, src_type: impl Into<String>) -> Self {
182        self.source_type = Some(src_type.into());
183        self
184    }
185
186    /// Sets the default expression.
187    #[must_use]
188    pub fn with_default(mut self, expr: impl Into<String>) -> Self {
189        self.default_expr = Some(expr.into());
190        self
191    }
192
193    /// Sets an arbitrary property.
194    #[must_use]
195    pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
196        self.properties.insert(key.into(), value.into());
197        self
198    }
199}
200
201/// Configuration for a source connector in the schema module.
202///
203/// This is distinct from [`crate::config::ConnectorConfig`] — it adds
204/// format awareness and typed accessors for schema-related options.
205#[derive(Debug, Clone)]
206pub struct SourceConfig {
207    /// Connector type identifier (e.g., `"kafka"`, `"postgres-cdc"`).
208    pub connector_type: String,
209
210    /// Data format (e.g., JSON, CSV, Avro).
211    pub format: Format,
212
213    /// Arbitrary key-value options.
214    pub options: HashMap<String, String>,
215}
216
217impl SourceConfig {
218    /// Creates a new source config.
219    #[must_use]
220    pub fn new(connector_type: impl Into<String>, format: Format) -> Self {
221        Self {
222            connector_type: connector_type.into(),
223            format,
224            options: HashMap::new(),
225        }
226    }
227
228    /// Sets an option.
229    #[must_use]
230    pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
231        self.options.insert(key.into(), value.into());
232        self
233    }
234
235    /// Gets an option value.
236    #[must_use]
237    pub fn get_option(&self, key: &str) -> Option<&str> {
238        self.options.get(key).map(String::as_str)
239    }
240
241    /// Converts to a [`ConnectorConfig`] for use with existing connector APIs.
242    #[must_use]
243    pub fn as_connector_config(&self) -> ConnectorConfig {
244        let mut props = self.options.clone();
245        props.insert("format".to_string(), self.format.to_string());
246        ConnectorConfig::with_properties(&self.connector_type, props)
247    }
248}
249
250/// Configuration for a sink connector in the schema module.
251///
252/// Mirror of [`SourceConfig`] for sink connectors.
253#[derive(Debug, Clone)]
254pub struct SinkConfig {
255    /// Connector type identifier (e.g., `"kafka"`, `"postgres"`).
256    pub connector_type: String,
257
258    /// Output data format.
259    pub format: Format,
260
261    /// Arbitrary key-value options.
262    pub options: HashMap<String, String>,
263}
264
265impl SinkConfig {
266    /// Creates a new sink config.
267    #[must_use]
268    pub fn new(connector_type: impl Into<String>, format: Format) -> Self {
269        Self {
270            connector_type: connector_type.into(),
271            format,
272            options: HashMap::new(),
273        }
274    }
275
276    /// Sets an option.
277    #[must_use]
278    pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
279        self.options.insert(key.into(), value.into());
280        self
281    }
282
283    /// Gets an option value.
284    #[must_use]
285    pub fn get_option(&self, key: &str) -> Option<&str> {
286        self.options.get(key).map(String::as_str)
287    }
288
289    /// Converts to a [`ConnectorConfig`] for use with existing connector APIs.
290    #[must_use]
291    pub fn as_connector_config(&self) -> ConnectorConfig {
292        let mut props = self.options.clone();
293        props.insert("format".to_string(), self.format.to_string());
294        ConnectorConfig::with_properties(&self.connector_type, props)
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn test_raw_record_builder() {
304        let record = RawRecord::new(b"hello".to_vec())
305            .with_key(b"k1".to_vec())
306            .with_timestamp(1000)
307            .with_header("content-type", b"application/json".to_vec());
308
309        assert_eq!(record.key.as_deref(), Some(b"k1".as_slice()));
310        assert_eq!(record.value, b"hello");
311        assert_eq!(record.timestamp, Some(1000));
312        assert!(record.headers.contains_key("content-type"));
313    }
314
315    #[test]
316    fn test_source_metadata_empty() {
317        let meta = SourceMetadata::empty();
318        assert!(meta.is_empty());
319        assert!(meta.downcast_ref::<String>().is_none());
320    }
321
322    #[test]
323    fn test_source_metadata_typed() {
324        let meta = SourceMetadata::new(42u64);
325        assert!(!meta.is_empty());
326        assert_eq!(meta.downcast_ref::<u64>(), Some(&42u64));
327        assert!(meta.downcast_ref::<String>().is_none());
328    }
329
330    #[test]
331    fn test_source_metadata_debug() {
332        let empty = SourceMetadata::empty();
333        assert!(format!("{empty:?}").contains("empty"));
334
335        let full = SourceMetadata::new("data");
336        assert!(format!("{full:?}").contains("opaque"));
337    }
338
339    #[test]
340    fn test_field_meta_builder() {
341        let meta = FieldMeta::new()
342            .with_field_id(1)
343            .with_description("User ID")
344            .with_source_type("BIGINT")
345            .with_default("0")
346            .with_property("pii", "true");
347
348        assert_eq!(meta.field_id, Some(1));
349        assert_eq!(meta.description.as_deref(), Some("User ID"));
350        assert_eq!(meta.source_type.as_deref(), Some("BIGINT"));
351        assert_eq!(meta.default_expr.as_deref(), Some("0"));
352        assert_eq!(meta.properties.get("pii").map(String::as_str), Some("true"));
353    }
354
355    #[test]
356    fn test_source_config() {
357        let cfg = SourceConfig::new("kafka", Format::Json)
358            .with_option("topic", "events")
359            .with_option("group.id", "my-group");
360
361        assert_eq!(cfg.connector_type, "kafka");
362        assert_eq!(cfg.format, Format::Json);
363        assert_eq!(cfg.get_option("topic"), Some("events"));
364
365        let cc = cfg.as_connector_config();
366        assert_eq!(cc.connector_type(), "kafka");
367        assert_eq!(cc.get("format"), Some("json"));
368        assert_eq!(cc.get("topic"), Some("events"));
369    }
370
371    #[test]
372    fn test_sink_config() {
373        let cfg = SinkConfig::new("postgres", Format::Json).with_option("table", "output");
374
375        assert_eq!(cfg.connector_type, "postgres");
376        assert_eq!(cfg.get_option("table"), Some("output"));
377
378        let cc = cfg.as_connector_config();
379        assert_eq!(cc.get("format"), Some("json"));
380    }
381}