Skip to main content

laminar_connectors/schema/
traits.rs

1//! Capability traits for the connector schema framework.
2#![allow(clippy::disallowed_types)] // cold path: schema management
3
4use std::collections::HashMap;
5
6use arrow_array::RecordBatch;
7use arrow_schema::{DataType, SchemaRef};
8use async_trait::async_trait;
9
10use super::error::SchemaResult;
11use super::types::RawRecord;
12
13// ── FormatDecoder ──────────────────────────────────────────────────
14
15/// Decodes raw bytes into Arrow `RecordBatch`es.
16///
17/// Unlike [`RecordDeserializer`](crate::serde::RecordDeserializer) which
18/// takes `&[u8]` slices, `FormatDecoder` works with [`RawRecord`]s that
19/// carry metadata, headers, and timestamps alongside the payload.
20pub trait FormatDecoder: Send + Sync {
21    /// Returns the Arrow schema produced by this decoder.
22    fn output_schema(&self) -> SchemaRef;
23
24    /// Decodes a batch of raw records into an Arrow `RecordBatch`.
25    ///
26    /// # Errors
27    ///
28    /// Returns [`SchemaError::DecodeError`](super::error::SchemaError::DecodeError)
29    /// if the input cannot be parsed.
30    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch>;
31
32    /// Decodes a single raw record into an Arrow `RecordBatch` with one row.
33    ///
34    /// Default implementation delegates to [`decode_batch`](Self::decode_batch)
35    /// with a single-element slice.
36    ///
37    /// # Errors
38    ///
39    /// Returns [`SchemaError::DecodeError`](super::error::SchemaError::DecodeError)
40    /// if the input cannot be parsed.
41    fn decode_one(&self, record: &RawRecord) -> SchemaResult<RecordBatch> {
42        self.decode_batch(std::slice::from_ref(record))
43    }
44
45    /// Returns the name of the format this decoder handles (e.g., `"json"`).
46    fn format_name(&self) -> &str;
47}
48
49// ── FormatEncoder ──────────────────────────────────────────────────
50
51/// Encodes Arrow `RecordBatch`es into raw bytes.
52pub trait FormatEncoder: Send + Sync {
53    /// Returns the expected input schema.
54    fn input_schema(&self) -> SchemaRef;
55
56    /// Encodes a `RecordBatch` into a vector of byte records.
57    ///
58    /// Each element in the returned vector represents one serialized record.
59    ///
60    /// # Errors
61    ///
62    /// Returns a schema error if encoding fails.
63    fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>>;
64
65    /// Returns the name of the format this encoder produces (e.g., `"json"`).
66    fn format_name(&self) -> &str;
67}
68
69// ── SchemaProvider ─────────────────────────────────────────────────
70
71/// A connector that can provide its schema from the source system.
72///
73/// Examples: `PostgreSQL` CDC reading `information_schema`, a Kafka topic
74/// with an embedded Avro schema, or a file source with a header row.
75#[async_trait]
76pub trait SchemaProvider: Send + Sync {
77    /// Fetches the schema from the source system.
78    ///
79    /// # Errors
80    ///
81    /// Returns a schema error if the schema cannot be retrieved.
82    async fn provide_schema(&self) -> SchemaResult<SchemaRef>;
83
84    /// Returns `true` if this provider's schema is authoritative
85    /// (i.e., should take precedence over inference).
86    fn is_authoritative(&self) -> bool {
87        false
88    }
89
90    /// Returns per-field metadata for the provided schema.
91    ///
92    /// Default returns an empty map.
93    async fn field_metadata(&self) -> SchemaResult<HashMap<String, super::types::FieldMeta>> {
94        Ok(HashMap::new())
95    }
96}
97
98// ── Inference types ────────────────────────────────────────────────
99
100/// Configuration for schema inference.
101#[derive(Debug, Clone)]
102pub struct InferenceConfig {
103    /// Data format to use for inference.
104    pub format: String,
105
106    /// How to handle number type inference.
107    pub number_inference: NumberInference,
108
109    /// How to handle array type inference.
110    pub array_inference: ArrayInference,
111
112    /// Maximum number of samples to use.
113    pub max_samples: usize,
114
115    /// Minimum confidence threshold (0.0–1.0) for accepting an inferred type.
116    pub min_confidence: f64,
117
118    /// Type hints for specific fields.
119    pub type_hints: HashMap<String, DataType>,
120
121    /// Whether to treat empty strings as nulls.
122    pub empty_as_null: bool,
123}
124
125impl Default for InferenceConfig {
126    fn default() -> Self {
127        Self {
128            format: "json".to_string(),
129            number_inference: NumberInference::PreferLarger,
130            array_inference: ArrayInference::Utf8,
131            max_samples: 1000,
132            min_confidence: 0.8,
133            type_hints: HashMap::new(),
134            empty_as_null: false,
135        }
136    }
137}
138
139impl InferenceConfig {
140    /// Creates a new inference config for the given format.
141    #[must_use]
142    pub fn new(format: impl Into<String>) -> Self {
143        Self {
144            format: format.into(),
145            ..Self::default()
146        }
147    }
148
149    /// Sets the minimum confidence threshold.
150    #[must_use]
151    pub fn with_min_confidence(mut self, confidence: f64) -> Self {
152        self.min_confidence = confidence;
153        self
154    }
155
156    /// Sets the maximum number of samples.
157    #[must_use]
158    pub fn with_max_samples(mut self, n: usize) -> Self {
159        self.max_samples = n;
160        self
161    }
162
163    /// Adds a type hint for a specific field.
164    #[must_use]
165    pub fn with_type_hint(mut self, field: impl Into<String>, data_type: DataType) -> Self {
166        self.type_hints.insert(field.into(), data_type);
167        self
168    }
169
170    /// Enables treating empty strings as nulls.
171    #[must_use]
172    pub fn with_empty_as_null(mut self) -> Self {
173        self.empty_as_null = true;
174        self
175    }
176}
177
178/// How to infer numeric types.
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum NumberInference {
181    /// Prefer the smallest type that fits (i32 before i64).
182    PreferSmallest,
183    /// Prefer larger types (always i64, always f64).
184    PreferLarger,
185}
186
187/// How to infer array/object types in JSON.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum ArrayInference {
190    /// Store arrays/objects as JSON-encoded Utf8 strings.
191    Utf8,
192    /// Attempt to infer Arrow List / Struct types.
193    NativeArrow,
194}
195
196/// The result of schema inference.
197#[derive(Debug, Clone)]
198pub struct InferredSchema {
199    /// The inferred Arrow schema.
200    pub schema: SchemaRef,
201
202    /// Overall confidence score (0.0–1.0).
203    pub confidence: f64,
204
205    /// Number of samples that were analyzed.
206    pub sample_count: usize,
207
208    /// Per-field inference details.
209    pub field_details: Vec<FieldInferenceDetail>,
210
211    /// Warnings generated during inference.
212    pub warnings: Vec<InferenceWarning>,
213}
214
215/// Per-field detail from inference.
216#[derive(Debug, Clone)]
217pub struct FieldInferenceDetail {
218    /// The field name.
219    pub field_name: String,
220
221    /// The inferred Arrow data type.
222    pub inferred_type: DataType,
223
224    /// Confidence for this specific field (0.0–1.0).
225    pub confidence: f64,
226
227    /// Number of non-null samples seen for this field.
228    pub non_null_count: usize,
229
230    /// Total number of samples that included this field.
231    pub total_count: usize,
232
233    /// Whether a type hint was applied.
234    pub hint_applied: bool,
235}
236
237/// A warning generated during inference.
238#[derive(Debug, Clone)]
239pub struct InferenceWarning {
240    /// The field this warning relates to, if any.
241    pub field: Option<String>,
242
243    /// Warning message.
244    pub message: String,
245
246    /// Severity level.
247    pub severity: WarningSeverity,
248}
249
250pub use laminar_core::error_codes::WarningSeverity;
251
252// ── SchemaRegistryAware ────────────────────────────────────────────
253
254/// A connector that integrates with a schema registry.
255///
256/// Supports fetching, registering, and checking compatibility of schemas
257/// against an external registry (e.g., Confluent Schema Registry).
258#[async_trait]
259pub trait SchemaRegistryAware: Send + Sync {
260    /// Fetches the latest schema for a subject.
261    ///
262    /// # Errors
263    ///
264    /// Returns [`SchemaError::RegistryError`](super::error::SchemaError::RegistryError)
265    /// if the registry is unreachable or the subject is not found.
266    async fn fetch_schema(&self, subject: &str) -> SchemaResult<RegisteredSchema>;
267
268    /// Fetches a schema by its numeric ID.
269    ///
270    /// # Errors
271    ///
272    /// Returns a schema error if the ID is not found.
273    async fn fetch_schema_by_id(&self, schema_id: i32) -> SchemaResult<RegisteredSchema>;
274
275    /// Checks whether `proposed` is compatible with the existing schema
276    /// for `subject` under the configured compatibility mode.
277    ///
278    /// # Errors
279    ///
280    /// Returns a schema error if the compatibility check itself fails.
281    async fn check_compatibility(&self, subject: &str, proposed: &SchemaRef) -> SchemaResult<bool>;
282
283    /// Registers a new schema version for a subject.
284    ///
285    /// # Errors
286    ///
287    /// Returns a schema error if registration fails.
288    async fn register_schema(
289        &self,
290        subject: &str,
291        schema: &SchemaRef,
292    ) -> SchemaResult<RegisteredSchema>;
293
294    /// Builds a [`FormatDecoder`] configured for the registry schema.
295    ///
296    /// This is a sync method because the decoder construction itself
297    /// is not async — the registry lookup should happen beforehand.
298    ///
299    /// # Errors
300    ///
301    /// Returns a schema error if the decoder cannot be constructed
302    /// (e.g., unsupported schema type).
303    fn build_registry_decoder(
304        &self,
305        schema: &RegisteredSchema,
306    ) -> SchemaResult<Box<dyn FormatDecoder>>;
307}
308
309/// Configuration for connecting to a schema registry.
310#[derive(Debug, Clone)]
311pub struct RegistryConfig {
312    /// Registry URL.
313    pub url: String,
314
315    /// Schema type/format used by the registry.
316    pub schema_type: RegistrySchemaType,
317
318    /// Compatibility mode.
319    pub compatibility: CompatibilityMode,
320
321    /// Optional credentials.
322    pub credentials: Option<RegistryCredentials>,
323}
324
325/// Schema type stored in the registry.
326#[derive(Debug, Clone, Copy, PartialEq, Eq)]
327pub enum RegistrySchemaType {
328    /// Apache Avro.
329    Avro,
330    /// JSON Schema.
331    JsonSchema,
332    /// Protocol Buffers.
333    Protobuf,
334}
335
336/// Compatibility mode for schema evolution.
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
338pub enum CompatibilityMode {
339    /// No compatibility checks.
340    None,
341    /// New schema can read old data.
342    Backward,
343    /// Old schema can read new data.
344    Forward,
345    /// Both backward and forward compatible.
346    Full,
347    /// Backward compatible with all previous versions.
348    BackwardTransitive,
349    /// Forward compatible with all previous versions.
350    ForwardTransitive,
351    /// Fully compatible with all previous versions.
352    FullTransitive,
353}
354
355/// Credentials for schema registry authentication.
356#[derive(Debug, Clone)]
357pub struct RegistryCredentials {
358    /// Username or API key.
359    pub username: String,
360    /// Password or API secret.
361    pub password: String,
362}
363
364/// A schema registered in a schema registry.
365#[derive(Debug, Clone)]
366pub struct RegisteredSchema {
367    /// Numeric schema ID assigned by the registry.
368    pub id: i32,
369
370    /// Schema version within its subject.
371    pub version: i32,
372
373    /// The subject this schema belongs to.
374    pub subject: String,
375
376    /// The Arrow schema.
377    pub schema: SchemaRef,
378
379    /// The schema type in the registry.
380    pub schema_type: RegistrySchemaType,
381}
382
383/// A single schema change detected by schema diffing.
384#[derive(Debug, Clone, PartialEq)]
385pub enum SchemaChange {
386    /// A new column was added.
387    ColumnAdded {
388        /// Column name.
389        name: String,
390        /// The new column's data type.
391        data_type: DataType,
392        /// Whether the column is nullable.
393        nullable: bool,
394    },
395
396    /// An existing column was removed.
397    ColumnRemoved {
398        /// Column name.
399        name: String,
400    },
401
402    /// A column's data type changed.
403    TypeChanged {
404        /// Column name.
405        name: String,
406        /// Previous data type.
407        old_type: DataType,
408        /// New data type.
409        new_type: DataType,
410    },
411
412    /// A column's nullability changed.
413    NullabilityChanged {
414        /// Column name.
415        name: String,
416        /// Previous nullable flag.
417        was_nullable: bool,
418        /// New nullable flag.
419        now_nullable: bool,
420    },
421
422    /// A column was renamed.
423    ColumnRenamed {
424        /// Previous name.
425        old_name: String,
426        /// New name.
427        new_name: String,
428    },
429}
430
431/// The result of evaluating a set of schema changes.
432#[derive(Debug, Clone, PartialEq, Eq)]
433pub enum EvolutionVerdict {
434    /// All changes are compatible — evolution can proceed.
435    Compatible,
436
437    /// Changes require data migration but are feasible.
438    RequiresMigration,
439
440    /// Changes are incompatible — evolution is rejected.
441    Incompatible(String),
442}
443
444/// Describes how to project columns from the old schema to the new schema.
445#[derive(Debug, Clone)]
446pub struct ColumnProjection {
447    /// For each column in the new schema, the index in the old schema
448    /// (or `None` if the column is newly added and should be filled
449    /// with the default/null).
450    pub mappings: Vec<Option<usize>>,
451
452    /// The resulting schema after projection.
453    pub target_schema: SchemaRef,
454}
455
456// ── Connector config schema (self-describing connectors) ───────────
457
458/// Self-describing connector configuration schema.
459///
460/// Allows connectors to declare the configuration options they accept,
461/// enabling UI generation and validation.
462#[derive(Debug, Clone)]
463pub struct ConnectorConfigSchema {
464    /// The connector type name.
465    pub connector_type: String,
466
467    /// Configuration options.
468    pub options: Vec<ConfigOption>,
469}
470
471/// A single configuration option in a connector's config schema.
472#[derive(Debug, Clone)]
473pub struct ConfigOption {
474    /// The option key.
475    pub key: String,
476
477    /// Human-readable description.
478    pub description: String,
479
480    /// Whether this option is required.
481    pub required: bool,
482
483    /// The expected value type.
484    pub value_type: ConfigValueType,
485
486    /// Default value, if any.
487    pub default: Option<String>,
488}
489
490/// The expected type of a configuration value.
491#[derive(Debug, Clone, Copy, PartialEq, Eq)]
492pub enum ConfigValueType {
493    /// A string value.
494    String,
495    /// An integer value.
496    Integer,
497    /// A floating-point value.
498    Float,
499    /// A boolean value.
500    Boolean,
501    /// A duration (e.g., `"30s"`, `"5m"`).
502    Duration,
503    /// A URL.
504    Url,
505    /// A file path.
506    Path,
507}
508
509// ── Object-safety assertions ───────────────────────────────────────
510
511// Compile-time checks that all six traits are object-safe.
512const _: () = {
513    fn _assert_format_decoder_object_safe(_: &dyn FormatDecoder) {}
514    fn _assert_format_encoder_object_safe(_: &dyn FormatEncoder) {}
515    fn _assert_schema_provider_object_safe(_: &dyn SchemaProvider) {}
516
517    fn _assert_schema_registry_aware_object_safe(_: &dyn SchemaRegistryAware) {}
518};
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use std::sync::Arc;
524
525    use arrow_schema::{Field, Schema};
526
527    #[test]
528    fn test_inference_config_defaults() {
529        let cfg = InferenceConfig::default();
530        assert_eq!(cfg.format, "json");
531        assert_eq!(cfg.max_samples, 1000);
532        assert!((cfg.min_confidence - 0.8).abs() < f64::EPSILON);
533        assert!(cfg.type_hints.is_empty());
534    }
535
536    #[test]
537    fn test_inference_config_builder() {
538        let cfg = InferenceConfig::new("csv")
539            .with_min_confidence(0.9)
540            .with_max_samples(500)
541            .with_type_hint("id", DataType::Int32)
542            .with_empty_as_null();
543
544        assert_eq!(cfg.format, "csv");
545        assert!((cfg.min_confidence - 0.9).abs() < f64::EPSILON);
546        assert_eq!(cfg.max_samples, 500);
547        assert_eq!(cfg.type_hints.get("id"), Some(&DataType::Int32));
548        assert!(cfg.empty_as_null);
549    }
550
551    #[test]
552    fn test_inferred_schema() {
553        let schema = Arc::new(Schema::new(vec![
554            Field::new("id", DataType::Int64, false),
555            Field::new("name", DataType::Utf8, true),
556        ]));
557
558        let inferred = InferredSchema {
559            schema: schema.clone(),
560            confidence: 0.95,
561            sample_count: 100,
562            field_details: vec![
563                FieldInferenceDetail {
564                    field_name: "id".into(),
565                    inferred_type: DataType::Int64,
566                    confidence: 1.0,
567                    non_null_count: 100,
568                    total_count: 100,
569                    hint_applied: false,
570                },
571                FieldInferenceDetail {
572                    field_name: "name".into(),
573                    inferred_type: DataType::Utf8,
574                    confidence: 0.9,
575                    non_null_count: 90,
576                    total_count: 100,
577                    hint_applied: false,
578                },
579            ],
580            warnings: vec![],
581        };
582
583        assert_eq!(inferred.schema.fields().len(), 2);
584        assert!((inferred.confidence - 0.95).abs() < f64::EPSILON);
585        assert_eq!(inferred.field_details.len(), 2);
586    }
587
588    #[test]
589    fn test_schema_change_variants() {
590        let changes = [
591            SchemaChange::ColumnAdded {
592                name: "email".into(),
593                data_type: DataType::Utf8,
594                nullable: true,
595            },
596            SchemaChange::ColumnRemoved {
597                name: "legacy".into(),
598            },
599            SchemaChange::TypeChanged {
600                name: "age".into(),
601                old_type: DataType::Int32,
602                new_type: DataType::Int64,
603            },
604            SchemaChange::NullabilityChanged {
605                name: "name".into(),
606                was_nullable: false,
607                now_nullable: true,
608            },
609            SchemaChange::ColumnRenamed {
610                old_name: "fname".into(),
611                new_name: "first_name".into(),
612            },
613        ];
614        assert_eq!(changes.len(), 5);
615    }
616
617    #[test]
618    fn test_evolution_verdict() {
619        assert_eq!(EvolutionVerdict::Compatible, EvolutionVerdict::Compatible);
620        assert_ne!(
621            EvolutionVerdict::Compatible,
622            EvolutionVerdict::RequiresMigration
623        );
624    }
625
626    #[test]
627    fn test_column_projection() {
628        let schema = Arc::new(Schema::new(vec![
629            Field::new("a", DataType::Int64, false),
630            Field::new("b", DataType::Utf8, true),
631            Field::new("c", DataType::Float64, false),
632        ]));
633
634        let proj = ColumnProjection {
635            mappings: vec![Some(0), None, Some(1)],
636            target_schema: schema,
637        };
638
639        assert_eq!(proj.mappings.len(), 3);
640        assert_eq!(proj.mappings[0], Some(0));
641        assert_eq!(proj.mappings[1], None); // new column
642        assert_eq!(proj.mappings[2], Some(1));
643    }
644
645    #[test]
646    fn test_warning_severity() {
647        let w = InferenceWarning {
648            field: Some("price".into()),
649            message: "mixed int/float".into(),
650            severity: WarningSeverity::Warning,
651        };
652        assert_eq!(w.severity, WarningSeverity::Warning);
653        assert_eq!(w.field.as_deref(), Some("price"));
654    }
655
656    #[test]
657    fn test_registry_config() {
658        let cfg = RegistryConfig {
659            url: "http://localhost:8081".into(),
660            schema_type: RegistrySchemaType::Avro,
661            compatibility: CompatibilityMode::Backward,
662            credentials: Some(RegistryCredentials {
663                username: "user".into(),
664                password: "pass".into(),
665            }),
666        };
667        assert_eq!(cfg.schema_type, RegistrySchemaType::Avro);
668        assert_eq!(cfg.compatibility, CompatibilityMode::Backward);
669        assert!(cfg.credentials.is_some());
670    }
671
672    #[test]
673    fn test_connector_config_schema() {
674        let schema = ConnectorConfigSchema {
675            connector_type: "kafka".into(),
676            options: vec![
677                ConfigOption {
678                    key: "bootstrap.servers".into(),
679                    description: "Kafka broker addresses".into(),
680                    required: true,
681                    value_type: ConfigValueType::String,
682                    default: None,
683                },
684                ConfigOption {
685                    key: "batch.size".into(),
686                    description: "Batch size".into(),
687                    required: false,
688                    value_type: ConfigValueType::Integer,
689                    default: Some("1000".into()),
690                },
691            ],
692        };
693        assert_eq!(schema.options.len(), 2);
694        assert!(schema.options[0].required);
695        assert!(!schema.options[1].required);
696    }
697}