laminar_connectors/schema/traits.rs
1//! Capability traits for the connector schema framework.
2#![allow(clippy::disallowed_types)] // cold path: schema management
3
4use std::collections::HashMap;
5
6use arrow_array::RecordBatch;
7use arrow_schema::{DataType, SchemaRef};
8use async_trait::async_trait;
9
10use super::error::SchemaResult;
11use super::types::RawRecord;
12
13// ── FormatDecoder ──────────────────────────────────────────────────
14
15/// Decodes raw bytes into Arrow `RecordBatch`es.
16///
17/// Unlike [`RecordDeserializer`](crate::serde::RecordDeserializer) which
18/// takes `&[u8]` slices, `FormatDecoder` works with [`RawRecord`]s that
19/// carry metadata, headers, and timestamps alongside the payload.
20pub trait FormatDecoder: Send + Sync {
21 /// Returns the Arrow schema produced by this decoder.
22 fn output_schema(&self) -> SchemaRef;
23
24 /// Decodes a batch of raw records into an Arrow `RecordBatch`.
25 ///
26 /// # Errors
27 ///
28 /// Returns [`SchemaError::DecodeError`](super::error::SchemaError::DecodeError)
29 /// if the input cannot be parsed.
30 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch>;
31
32 /// Decodes a single raw record into an Arrow `RecordBatch` with one row.
33 ///
34 /// Default implementation delegates to [`decode_batch`](Self::decode_batch)
35 /// with a single-element slice.
36 ///
37 /// # Errors
38 ///
39 /// Returns [`SchemaError::DecodeError`](super::error::SchemaError::DecodeError)
40 /// if the input cannot be parsed.
41 fn decode_one(&self, record: &RawRecord) -> SchemaResult<RecordBatch> {
42 self.decode_batch(std::slice::from_ref(record))
43 }
44
45 /// Returns the name of the format this decoder handles (e.g., `"json"`).
46 fn format_name(&self) -> &str;
47}
48
49// ── FormatEncoder ──────────────────────────────────────────────────
50
51/// Encodes Arrow `RecordBatch`es into raw bytes.
52pub trait FormatEncoder: Send + Sync {
53 /// Returns the expected input schema.
54 fn input_schema(&self) -> SchemaRef;
55
56 /// Encodes a `RecordBatch` into a vector of byte records.
57 ///
58 /// Each element in the returned vector represents one serialized record.
59 ///
60 /// # Errors
61 ///
62 /// Returns a schema error if encoding fails.
63 fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>>;
64
65 /// Returns the name of the format this encoder produces (e.g., `"json"`).
66 fn format_name(&self) -> &str;
67}
68
69// ── SchemaProvider ─────────────────────────────────────────────────
70
71/// A connector that can provide its schema from the source system.
72///
73/// Examples: `PostgreSQL` CDC reading `information_schema`, a Kafka topic
74/// with an embedded Avro schema, or a file source with a header row.
75#[async_trait]
76pub trait SchemaProvider: Send + Sync {
77 /// Fetches the schema from the source system.
78 ///
79 /// # Errors
80 ///
81 /// Returns a schema error if the schema cannot be retrieved.
82 async fn provide_schema(&self) -> SchemaResult<SchemaRef>;
83
84 /// Returns `true` if this provider's schema is authoritative
85 /// (i.e., should take precedence over inference).
86 fn is_authoritative(&self) -> bool {
87 false
88 }
89
90 /// Returns per-field metadata for the provided schema.
91 ///
92 /// Default returns an empty map.
93 async fn field_metadata(&self) -> SchemaResult<HashMap<String, super::types::FieldMeta>> {
94 Ok(HashMap::new())
95 }
96}
97
98// ── Inference types ────────────────────────────────────────────────
99
100/// Configuration for schema inference.
101#[derive(Debug, Clone)]
102pub struct InferenceConfig {
103 /// Data format to use for inference.
104 pub format: String,
105
106 /// How to handle number type inference.
107 pub number_inference: NumberInference,
108
109 /// How to handle array type inference.
110 pub array_inference: ArrayInference,
111
112 /// Maximum number of samples to use.
113 pub max_samples: usize,
114
115 /// Minimum confidence threshold (0.0–1.0) for accepting an inferred type.
116 pub min_confidence: f64,
117
118 /// Type hints for specific fields.
119 pub type_hints: HashMap<String, DataType>,
120
121 /// Whether to treat empty strings as nulls.
122 pub empty_as_null: bool,
123}
124
125impl Default for InferenceConfig {
126 fn default() -> Self {
127 Self {
128 format: "json".to_string(),
129 number_inference: NumberInference::PreferLarger,
130 array_inference: ArrayInference::Utf8,
131 max_samples: 1000,
132 min_confidence: 0.8,
133 type_hints: HashMap::new(),
134 empty_as_null: false,
135 }
136 }
137}
138
139impl InferenceConfig {
140 /// Creates a new inference config for the given format.
141 #[must_use]
142 pub fn new(format: impl Into<String>) -> Self {
143 Self {
144 format: format.into(),
145 ..Self::default()
146 }
147 }
148
149 /// Sets the minimum confidence threshold.
150 #[must_use]
151 pub fn with_min_confidence(mut self, confidence: f64) -> Self {
152 self.min_confidence = confidence;
153 self
154 }
155
156 /// Sets the maximum number of samples.
157 #[must_use]
158 pub fn with_max_samples(mut self, n: usize) -> Self {
159 self.max_samples = n;
160 self
161 }
162
163 /// Adds a type hint for a specific field.
164 #[must_use]
165 pub fn with_type_hint(mut self, field: impl Into<String>, data_type: DataType) -> Self {
166 self.type_hints.insert(field.into(), data_type);
167 self
168 }
169
170 /// Enables treating empty strings as nulls.
171 #[must_use]
172 pub fn with_empty_as_null(mut self) -> Self {
173 self.empty_as_null = true;
174 self
175 }
176}
177
178/// How to infer numeric types.
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum NumberInference {
181 /// Prefer the smallest type that fits (i32 before i64).
182 PreferSmallest,
183 /// Prefer larger types (always i64, always f64).
184 PreferLarger,
185}
186
187/// How to infer array/object types in JSON.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum ArrayInference {
190 /// Store arrays/objects as JSON-encoded Utf8 strings.
191 Utf8,
192 /// Attempt to infer Arrow List / Struct types.
193 NativeArrow,
194}
195
196/// The result of schema inference.
197#[derive(Debug, Clone)]
198pub struct InferredSchema {
199 /// The inferred Arrow schema.
200 pub schema: SchemaRef,
201
202 /// Overall confidence score (0.0–1.0).
203 pub confidence: f64,
204
205 /// Number of samples that were analyzed.
206 pub sample_count: usize,
207
208 /// Per-field inference details.
209 pub field_details: Vec<FieldInferenceDetail>,
210
211 /// Warnings generated during inference.
212 pub warnings: Vec<InferenceWarning>,
213}
214
215/// Per-field detail from inference.
216#[derive(Debug, Clone)]
217pub struct FieldInferenceDetail {
218 /// The field name.
219 pub field_name: String,
220
221 /// The inferred Arrow data type.
222 pub inferred_type: DataType,
223
224 /// Confidence for this specific field (0.0–1.0).
225 pub confidence: f64,
226
227 /// Number of non-null samples seen for this field.
228 pub non_null_count: usize,
229
230 /// Total number of samples that included this field.
231 pub total_count: usize,
232
233 /// Whether a type hint was applied.
234 pub hint_applied: bool,
235}
236
237/// A warning generated during inference.
238#[derive(Debug, Clone)]
239pub struct InferenceWarning {
240 /// The field this warning relates to, if any.
241 pub field: Option<String>,
242
243 /// Warning message.
244 pub message: String,
245
246 /// Severity level.
247 pub severity: WarningSeverity,
248}
249
250pub use laminar_core::error_codes::WarningSeverity;
251
252// ── SchemaRegistryAware ────────────────────────────────────────────
253
254/// A connector that integrates with a schema registry.
255///
256/// Supports fetching, registering, and checking compatibility of schemas
257/// against an external registry (e.g., Confluent Schema Registry).
258#[async_trait]
259pub trait SchemaRegistryAware: Send + Sync {
260 /// Fetches the latest schema for a subject.
261 ///
262 /// # Errors
263 ///
264 /// Returns [`SchemaError::RegistryError`](super::error::SchemaError::RegistryError)
265 /// if the registry is unreachable or the subject is not found.
266 async fn fetch_schema(&self, subject: &str) -> SchemaResult<RegisteredSchema>;
267
268 /// Fetches a schema by its numeric ID.
269 ///
270 /// # Errors
271 ///
272 /// Returns a schema error if the ID is not found.
273 async fn fetch_schema_by_id(&self, schema_id: i32) -> SchemaResult<RegisteredSchema>;
274
275 /// Checks whether `proposed` is compatible with the existing schema
276 /// for `subject` under the configured compatibility mode.
277 ///
278 /// # Errors
279 ///
280 /// Returns a schema error if the compatibility check itself fails.
281 async fn check_compatibility(&self, subject: &str, proposed: &SchemaRef) -> SchemaResult<bool>;
282
283 /// Registers a new schema version for a subject.
284 ///
285 /// # Errors
286 ///
287 /// Returns a schema error if registration fails.
288 async fn register_schema(
289 &self,
290 subject: &str,
291 schema: &SchemaRef,
292 ) -> SchemaResult<RegisteredSchema>;
293
294 /// Builds a [`FormatDecoder`] configured for the registry schema.
295 ///
296 /// This is a sync method because the decoder construction itself
297 /// is not async — the registry lookup should happen beforehand.
298 ///
299 /// # Errors
300 ///
301 /// Returns a schema error if the decoder cannot be constructed
302 /// (e.g., unsupported schema type).
303 fn build_registry_decoder(
304 &self,
305 schema: &RegisteredSchema,
306 ) -> SchemaResult<Box<dyn FormatDecoder>>;
307}
308
309/// Configuration for connecting to a schema registry.
310#[derive(Debug, Clone)]
311pub struct RegistryConfig {
312 /// Registry URL.
313 pub url: String,
314
315 /// Schema type/format used by the registry.
316 pub schema_type: RegistrySchemaType,
317
318 /// Compatibility mode.
319 pub compatibility: CompatibilityMode,
320
321 /// Optional credentials.
322 pub credentials: Option<RegistryCredentials>,
323}
324
325/// Schema type stored in the registry.
326#[derive(Debug, Clone, Copy, PartialEq, Eq)]
327pub enum RegistrySchemaType {
328 /// Apache Avro.
329 Avro,
330 /// JSON Schema.
331 JsonSchema,
332 /// Protocol Buffers.
333 Protobuf,
334}
335
336/// Compatibility mode for schema evolution.
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
338pub enum CompatibilityMode {
339 /// No compatibility checks.
340 None,
341 /// New schema can read old data.
342 Backward,
343 /// Old schema can read new data.
344 Forward,
345 /// Both backward and forward compatible.
346 Full,
347 /// Backward compatible with all previous versions.
348 BackwardTransitive,
349 /// Forward compatible with all previous versions.
350 ForwardTransitive,
351 /// Fully compatible with all previous versions.
352 FullTransitive,
353}
354
355/// Credentials for schema registry authentication.
356#[derive(Debug, Clone)]
357pub struct RegistryCredentials {
358 /// Username or API key.
359 pub username: String,
360 /// Password or API secret.
361 pub password: String,
362}
363
364/// A schema registered in a schema registry.
365#[derive(Debug, Clone)]
366pub struct RegisteredSchema {
367 /// Numeric schema ID assigned by the registry.
368 pub id: i32,
369
370 /// Schema version within its subject.
371 pub version: i32,
372
373 /// The subject this schema belongs to.
374 pub subject: String,
375
376 /// The Arrow schema.
377 pub schema: SchemaRef,
378
379 /// The schema type in the registry.
380 pub schema_type: RegistrySchemaType,
381}
382
383/// A single schema change detected by schema diffing.
384#[derive(Debug, Clone, PartialEq)]
385pub enum SchemaChange {
386 /// A new column was added.
387 ColumnAdded {
388 /// Column name.
389 name: String,
390 /// The new column's data type.
391 data_type: DataType,
392 /// Whether the column is nullable.
393 nullable: bool,
394 },
395
396 /// An existing column was removed.
397 ColumnRemoved {
398 /// Column name.
399 name: String,
400 },
401
402 /// A column's data type changed.
403 TypeChanged {
404 /// Column name.
405 name: String,
406 /// Previous data type.
407 old_type: DataType,
408 /// New data type.
409 new_type: DataType,
410 },
411
412 /// A column's nullability changed.
413 NullabilityChanged {
414 /// Column name.
415 name: String,
416 /// Previous nullable flag.
417 was_nullable: bool,
418 /// New nullable flag.
419 now_nullable: bool,
420 },
421
422 /// A column was renamed.
423 ColumnRenamed {
424 /// Previous name.
425 old_name: String,
426 /// New name.
427 new_name: String,
428 },
429}
430
431/// The result of evaluating a set of schema changes.
432#[derive(Debug, Clone, PartialEq, Eq)]
433pub enum EvolutionVerdict {
434 /// All changes are compatible — evolution can proceed.
435 Compatible,
436
437 /// Changes require data migration but are feasible.
438 RequiresMigration,
439
440 /// Changes are incompatible — evolution is rejected.
441 Incompatible(String),
442}
443
444/// Describes how to project columns from the old schema to the new schema.
445#[derive(Debug, Clone)]
446pub struct ColumnProjection {
447 /// For each column in the new schema, the index in the old schema
448 /// (or `None` if the column is newly added and should be filled
449 /// with the default/null).
450 pub mappings: Vec<Option<usize>>,
451
452 /// The resulting schema after projection.
453 pub target_schema: SchemaRef,
454}
455
456// ── Connector config schema (self-describing connectors) ───────────
457
458/// Self-describing connector configuration schema.
459///
460/// Allows connectors to declare the configuration options they accept,
461/// enabling UI generation and validation.
462#[derive(Debug, Clone)]
463pub struct ConnectorConfigSchema {
464 /// The connector type name.
465 pub connector_type: String,
466
467 /// Configuration options.
468 pub options: Vec<ConfigOption>,
469}
470
471/// A single configuration option in a connector's config schema.
472#[derive(Debug, Clone)]
473pub struct ConfigOption {
474 /// The option key.
475 pub key: String,
476
477 /// Human-readable description.
478 pub description: String,
479
480 /// Whether this option is required.
481 pub required: bool,
482
483 /// The expected value type.
484 pub value_type: ConfigValueType,
485
486 /// Default value, if any.
487 pub default: Option<String>,
488}
489
490/// The expected type of a configuration value.
491#[derive(Debug, Clone, Copy, PartialEq, Eq)]
492pub enum ConfigValueType {
493 /// A string value.
494 String,
495 /// An integer value.
496 Integer,
497 /// A floating-point value.
498 Float,
499 /// A boolean value.
500 Boolean,
501 /// A duration (e.g., `"30s"`, `"5m"`).
502 Duration,
503 /// A URL.
504 Url,
505 /// A file path.
506 Path,
507}
508
509// ── Object-safety assertions ───────────────────────────────────────
510
511// Compile-time checks that all six traits are object-safe.
512const _: () = {
513 fn _assert_format_decoder_object_safe(_: &dyn FormatDecoder) {}
514 fn _assert_format_encoder_object_safe(_: &dyn FormatEncoder) {}
515 fn _assert_schema_provider_object_safe(_: &dyn SchemaProvider) {}
516
517 fn _assert_schema_registry_aware_object_safe(_: &dyn SchemaRegistryAware) {}
518};
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use std::sync::Arc;
524
525 use arrow_schema::{Field, Schema};
526
527 #[test]
528 fn test_inference_config_defaults() {
529 let cfg = InferenceConfig::default();
530 assert_eq!(cfg.format, "json");
531 assert_eq!(cfg.max_samples, 1000);
532 assert!((cfg.min_confidence - 0.8).abs() < f64::EPSILON);
533 assert!(cfg.type_hints.is_empty());
534 }
535
536 #[test]
537 fn test_inference_config_builder() {
538 let cfg = InferenceConfig::new("csv")
539 .with_min_confidence(0.9)
540 .with_max_samples(500)
541 .with_type_hint("id", DataType::Int32)
542 .with_empty_as_null();
543
544 assert_eq!(cfg.format, "csv");
545 assert!((cfg.min_confidence - 0.9).abs() < f64::EPSILON);
546 assert_eq!(cfg.max_samples, 500);
547 assert_eq!(cfg.type_hints.get("id"), Some(&DataType::Int32));
548 assert!(cfg.empty_as_null);
549 }
550
551 #[test]
552 fn test_inferred_schema() {
553 let schema = Arc::new(Schema::new(vec![
554 Field::new("id", DataType::Int64, false),
555 Field::new("name", DataType::Utf8, true),
556 ]));
557
558 let inferred = InferredSchema {
559 schema: schema.clone(),
560 confidence: 0.95,
561 sample_count: 100,
562 field_details: vec![
563 FieldInferenceDetail {
564 field_name: "id".into(),
565 inferred_type: DataType::Int64,
566 confidence: 1.0,
567 non_null_count: 100,
568 total_count: 100,
569 hint_applied: false,
570 },
571 FieldInferenceDetail {
572 field_name: "name".into(),
573 inferred_type: DataType::Utf8,
574 confidence: 0.9,
575 non_null_count: 90,
576 total_count: 100,
577 hint_applied: false,
578 },
579 ],
580 warnings: vec![],
581 };
582
583 assert_eq!(inferred.schema.fields().len(), 2);
584 assert!((inferred.confidence - 0.95).abs() < f64::EPSILON);
585 assert_eq!(inferred.field_details.len(), 2);
586 }
587
588 #[test]
589 fn test_schema_change_variants() {
590 let changes = [
591 SchemaChange::ColumnAdded {
592 name: "email".into(),
593 data_type: DataType::Utf8,
594 nullable: true,
595 },
596 SchemaChange::ColumnRemoved {
597 name: "legacy".into(),
598 },
599 SchemaChange::TypeChanged {
600 name: "age".into(),
601 old_type: DataType::Int32,
602 new_type: DataType::Int64,
603 },
604 SchemaChange::NullabilityChanged {
605 name: "name".into(),
606 was_nullable: false,
607 now_nullable: true,
608 },
609 SchemaChange::ColumnRenamed {
610 old_name: "fname".into(),
611 new_name: "first_name".into(),
612 },
613 ];
614 assert_eq!(changes.len(), 5);
615 }
616
617 #[test]
618 fn test_evolution_verdict() {
619 assert_eq!(EvolutionVerdict::Compatible, EvolutionVerdict::Compatible);
620 assert_ne!(
621 EvolutionVerdict::Compatible,
622 EvolutionVerdict::RequiresMigration
623 );
624 }
625
626 #[test]
627 fn test_column_projection() {
628 let schema = Arc::new(Schema::new(vec![
629 Field::new("a", DataType::Int64, false),
630 Field::new("b", DataType::Utf8, true),
631 Field::new("c", DataType::Float64, false),
632 ]));
633
634 let proj = ColumnProjection {
635 mappings: vec![Some(0), None, Some(1)],
636 target_schema: schema,
637 };
638
639 assert_eq!(proj.mappings.len(), 3);
640 assert_eq!(proj.mappings[0], Some(0));
641 assert_eq!(proj.mappings[1], None); // new column
642 assert_eq!(proj.mappings[2], Some(1));
643 }
644
645 #[test]
646 fn test_warning_severity() {
647 let w = InferenceWarning {
648 field: Some("price".into()),
649 message: "mixed int/float".into(),
650 severity: WarningSeverity::Warning,
651 };
652 assert_eq!(w.severity, WarningSeverity::Warning);
653 assert_eq!(w.field.as_deref(), Some("price"));
654 }
655
656 #[test]
657 fn test_registry_config() {
658 let cfg = RegistryConfig {
659 url: "http://localhost:8081".into(),
660 schema_type: RegistrySchemaType::Avro,
661 compatibility: CompatibilityMode::Backward,
662 credentials: Some(RegistryCredentials {
663 username: "user".into(),
664 password: "pass".into(),
665 }),
666 };
667 assert_eq!(cfg.schema_type, RegistrySchemaType::Avro);
668 assert_eq!(cfg.compatibility, CompatibilityMode::Backward);
669 assert!(cfg.credentials.is_some());
670 }
671
672 #[test]
673 fn test_connector_config_schema() {
674 let schema = ConnectorConfigSchema {
675 connector_type: "kafka".into(),
676 options: vec![
677 ConfigOption {
678 key: "bootstrap.servers".into(),
679 description: "Kafka broker addresses".into(),
680 required: true,
681 value_type: ConfigValueType::String,
682 default: None,
683 },
684 ConfigOption {
685 key: "batch.size".into(),
686 description: "Batch size".into(),
687 required: false,
688 value_type: ConfigValueType::Integer,
689 default: Some("1000".into()),
690 },
691 ],
692 };
693 assert_eq!(schema.options.len(), 2);
694 assert!(schema.options[0].required);
695 assert!(!schema.options[1].required);
696 }
697}