Skip to main content

laminar_connectors/schema/
resolver.rs

1//! Schema resolver and merge engine.
2//!
3//! The [`SchemaResolver`] implements a five-level priority chain for
4//! determining the Arrow schema of a source connector:
5//!
6//! 1. **Full DDL** — user-declared schema without wildcards
7//! 2. **Schema registry** — via [`SchemaRegistryAware`](super::traits::SchemaRegistryAware)
8//! 3. **Source provider** — via [`SchemaProvider`](super::traits::SchemaProvider)
9//! 4. **Sample inference** — via [`SchemaInferable`](super::traits::SchemaInferable)
10//! 5. **Error** — no schema could be determined
11//!
12//! When partial DDL is provided (with wildcard `*`), the resolver merges
13//! user-declared columns with the resolved columns, preserving user-declared
14//! columns first.
15
16use std::sync::Arc;
17
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19
20use super::error::{SchemaError, SchemaResult};
21use super::traits::InferenceConfig;
22use crate::connector::SourceConnector;
23
24/// A user-declared schema from DDL (e.g., `CREATE SOURCE ... (col1 INT, ...)`).
25#[derive(Debug, Clone)]
26pub struct DeclaredSchema {
27    /// Explicitly declared columns.
28    pub columns: Vec<DeclaredColumn>,
29
30    /// Whether the DDL includes a wildcard (`*`) that should be expanded
31    /// with columns from the source.
32    pub has_wildcard: bool,
33
34    /// Optional prefix applied to wildcard-expanded columns
35    /// (e.g., `src_` → `src_field_name`).
36    pub wildcard_prefix: Option<String>,
37}
38
39impl DeclaredSchema {
40    /// Creates a fully-declared schema (no wildcard).
41    #[must_use]
42    pub fn full(columns: Vec<DeclaredColumn>) -> Self {
43        Self {
44            columns,
45            has_wildcard: false,
46            wildcard_prefix: None,
47        }
48    }
49
50    /// Creates a schema with a wildcard for additional columns.
51    #[must_use]
52    pub fn with_wildcard(columns: Vec<DeclaredColumn>) -> Self {
53        Self {
54            columns,
55            has_wildcard: true,
56            wildcard_prefix: None,
57        }
58    }
59
60    /// Sets the wildcard prefix.
61    #[must_use]
62    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
63        self.wildcard_prefix = Some(prefix.into());
64        self
65    }
66
67    /// Returns `true` if no columns are declared and no wildcard is set.
68    #[must_use]
69    pub fn is_empty(&self) -> bool {
70        self.columns.is_empty() && !self.has_wildcard
71    }
72}
73
74/// A single column declared in DDL.
75#[derive(Debug, Clone)]
76pub struct DeclaredColumn {
77    /// Column name.
78    pub name: String,
79
80    /// Arrow data type.
81    pub data_type: DataType,
82
83    /// Whether the column is nullable.
84    pub nullable: bool,
85
86    /// Optional default expression (e.g., `"0"`, `"CURRENT_TIMESTAMP"`).
87    pub default: Option<String>,
88}
89
90impl DeclaredColumn {
91    /// Creates a new declared column.
92    #[must_use]
93    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
94        Self {
95            name: name.into(),
96            data_type,
97            nullable,
98            default: None,
99        }
100    }
101
102    /// Sets the default expression.
103    #[must_use]
104    pub fn with_default(mut self, expr: impl Into<String>) -> Self {
105        self.default = Some(expr.into());
106        self
107    }
108}
109
110/// The result of schema resolution.
111#[derive(Debug, Clone)]
112pub struct ResolvedSchema {
113    /// The final Arrow schema.
114    pub schema: SchemaRef,
115
116    /// How the schema was resolved.
117    pub kind: ResolutionKind,
118
119    /// Per-field origin information.
120    pub field_origins: Vec<FieldOrigin>,
121
122    /// Any warnings generated during resolution.
123    pub warnings: Vec<String>,
124}
125
126/// How the schema was resolved.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum ResolutionKind {
129    /// Fully declared in DDL with no wildcard.
130    Declared,
131
132    /// Schema came from the source's [`SchemaProvider`](super::traits::SchemaProvider).
133    SourceProvided,
134
135    /// Schema came from a schema registry.
136    Registry {
137        /// The registered schema ID.
138        schema_id: i32,
139    },
140
141    /// Schema was inferred from samples.
142    Inferred {
143        /// Number of samples used.
144        sample_count: usize,
145
146        /// Warnings from inference.
147        warnings: Vec<String>,
148    },
149}
150
151/// Origin of a single field in a resolved schema.
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum FieldOrigin {
154    /// The field was explicitly declared in user DDL.
155    UserDeclared,
156
157    /// The field was automatically resolved from the source or registry.
158    AutoResolved,
159
160    /// The field was added by wildcard expansion.
161    WildcardInferred,
162
163    /// The field was added from a default expression.
164    DefaultAdded,
165}
166
167impl std::fmt::Display for FieldOrigin {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        match self {
170            FieldOrigin::UserDeclared => write!(f, "DECLARED"),
171            FieldOrigin::AutoResolved => write!(f, "AUTO"),
172            FieldOrigin::WildcardInferred => write!(f, "WILDCARD"),
173            FieldOrigin::DefaultAdded => write!(f, "DEFAULT"),
174        }
175    }
176}
177
178/// Stateless schema resolver.
179///
180/// Implements the five-level priority chain and wildcard merge logic.
181pub struct SchemaResolver;
182
183impl SchemaResolver {
184    /// Resolves the schema for a source connector.
185    ///
186    /// Applies the five-level priority chain:
187    /// 1. Full DDL without wildcard → `Declared`
188    /// 2. Schema registry → `Registry`
189    /// 3. Schema provider → `SourceProvided`
190    /// 4. Sample inference → `Inferred`
191    /// 5. Error
192    ///
193    /// When the DDL contains a wildcard, the resolver merges declared
194    /// columns with the resolved schema via [`merge_with_declared`](Self::merge_with_declared).
195    ///
196    /// # Errors
197    ///
198    /// Returns [`SchemaError`] if no resolution strategy succeeds.
199    pub async fn resolve(
200        declared: &DeclaredSchema,
201        connector: &dyn SourceConnector,
202        inference_config: &InferenceConfig,
203    ) -> SchemaResult<ResolvedSchema> {
204        // Level 1: Full DDL without wildcard.
205        if !declared.columns.is_empty() && !declared.has_wildcard {
206            let schema = Self::declared_to_arrow(declared)?;
207            let origins = vec![FieldOrigin::UserDeclared; schema.fields().len()];
208            return Ok(ResolvedSchema {
209                schema,
210                kind: ResolutionKind::Declared,
211                field_origins: origins,
212                warnings: vec![],
213            });
214        }
215
216        // Level 2: Schema registry.
217        if let Some(registry) = connector.as_schema_registry_aware() {
218            // Use connector type as subject (convention).
219            let subject = format!("{}-value", inference_config.format);
220            if let Ok(registered) = registry.fetch_schema(&subject).await {
221                let resolved_schema = registered.schema.clone();
222                let kind = ResolutionKind::Registry {
223                    schema_id: registered.id,
224                };
225
226                if declared.has_wildcard {
227                    return Self::merge_with_declared(declared, &resolved_schema, kind);
228                }
229
230                let origins = vec![FieldOrigin::AutoResolved; resolved_schema.fields().len()];
231                return Ok(ResolvedSchema {
232                    schema: resolved_schema,
233                    kind,
234                    field_origins: origins,
235                    warnings: vec![],
236                });
237            }
238        }
239
240        // Level 3: Source-provided schema.
241        if let Some(provider) = connector.as_schema_provider() {
242            if let Ok(source_schema) = provider.provide_schema().await {
243                if declared.has_wildcard {
244                    return Self::merge_with_declared(
245                        declared,
246                        &source_schema,
247                        ResolutionKind::SourceProvided,
248                    );
249                }
250
251                let origins = vec![FieldOrigin::AutoResolved; source_schema.fields().len()];
252                return Ok(ResolvedSchema {
253                    schema: source_schema,
254                    kind: ResolutionKind::SourceProvided,
255                    field_origins: origins,
256                    warnings: vec![],
257                });
258            }
259        }
260
261        // Level 4: Sample-based inference.
262        if let Some(inferable) = connector.as_schema_inferable() {
263            let samples = inferable
264                .sample_records(inference_config.max_samples)
265                .await?;
266
267            if !samples.is_empty() {
268                let inferred = inferable
269                    .infer_from_samples(&samples, inference_config)
270                    .await?;
271
272                let inf_warnings: Vec<String> = inferred
273                    .warnings
274                    .iter()
275                    .map(|w| w.message.clone())
276                    .collect();
277
278                let kind = ResolutionKind::Inferred {
279                    sample_count: inferred.sample_count,
280                    warnings: inf_warnings.clone(),
281                };
282
283                if declared.has_wildcard {
284                    let mut resolved = Self::merge_with_declared(declared, &inferred.schema, kind)?;
285                    resolved.warnings.extend(inf_warnings);
286                    return Ok(resolved);
287                }
288
289                let origins = vec![FieldOrigin::AutoResolved; inferred.schema.fields().len()];
290                return Ok(ResolvedSchema {
291                    schema: inferred.schema,
292                    kind,
293                    field_origins: origins,
294                    warnings: inf_warnings,
295                });
296            }
297        }
298
299        // Level 5: Error.
300        Err(SchemaError::InferenceFailed(
301            "no schema could be resolved: declare a schema, configure a registry, \
302             or ensure the connector supports schema inference"
303                .into(),
304        ))
305    }
306
307    /// Merges user-declared columns with a resolved schema.
308    ///
309    /// User-declared columns come first and take precedence. Resolved
310    /// columns that don't conflict are appended, optionally with a
311    /// wildcard prefix.
312    ///
313    /// # Errors
314    ///
315    /// Currently infallible, but returns `SchemaResult` for future
316    /// extensibility (e.g., type coercion failures).
317    ///
318    /// # Panics
319    ///
320    /// Panics if a declared column name exists in `declared_names` but
321    /// cannot be found in `declared.columns` (this is logically impossible).
322    pub fn merge_with_declared(
323        declared: &DeclaredSchema,
324        resolved: &SchemaRef,
325        kind: ResolutionKind,
326    ) -> SchemaResult<ResolvedSchema> {
327        let mut fields: Vec<Field> = Vec::new();
328        let mut origins: Vec<FieldOrigin> = Vec::new();
329        let mut warnings: Vec<String> = Vec::new();
330
331        // Declared columns go first.
332        let declared_names: Vec<&str> = declared.columns.iter().map(|c| c.name.as_str()).collect();
333
334        for col in &declared.columns {
335            fields.push(Field::new(&col.name, col.data_type.clone(), col.nullable));
336            origins.push(FieldOrigin::UserDeclared);
337        }
338
339        // Append non-conflicting resolved columns.
340        for field in resolved.fields() {
341            let name = field.name();
342            if declared_names.contains(&name.as_str()) {
343                // Declared column takes precedence — warn if types differ.
344                let declared_col = declared.columns.iter().find(|c| c.name == *name).unwrap();
345                if declared_col.data_type != *field.data_type() {
346                    warnings.push(format!(
347                        "field '{}': declared type {} overrides resolved type {}",
348                        name,
349                        declared_col.data_type,
350                        field.data_type()
351                    ));
352                }
353                continue;
354            }
355
356            // Apply wildcard prefix if set.
357            let field_name = if let Some(ref prefix) = declared.wildcard_prefix {
358                format!("{prefix}{name}")
359            } else {
360                name.clone()
361            };
362
363            fields.push(Field::new(
364                &field_name,
365                field.data_type().clone(),
366                field.is_nullable(),
367            ));
368            origins.push(FieldOrigin::WildcardInferred);
369        }
370
371        Ok(ResolvedSchema {
372            schema: Arc::new(Schema::new(fields)),
373            kind,
374            field_origins: origins,
375            warnings,
376        })
377    }
378
379    /// Validates that a wildcard declaration is consistent and usable.
380    ///
381    /// Checks:
382    /// - The connector supports at least one schema resolution path
383    ///   (provider, registry, or inference).
384    /// - If a prefix is set, no prefixed column name collides with a
385    ///   declared column name.
386    ///
387    /// # Errors
388    ///
389    /// Returns [`SchemaError::WildcardWithoutInference`] if no resolution
390    /// path is available, or [`SchemaError::WildcardPrefixCollision`] if
391    /// a prefixed name collides with a declared column.
392    pub fn validate_wildcard(
393        declared: &DeclaredSchema,
394        connector: &dyn SourceConnector,
395    ) -> SchemaResult<()> {
396        if !declared.has_wildcard {
397            return Ok(());
398        }
399
400        // At least one resolution path must be available.
401        let has_provider = connector.as_schema_provider().is_some();
402        let has_registry = connector.as_schema_registry_aware().is_some();
403        let has_inference = connector.as_schema_inferable().is_some();
404
405        if !has_provider && !has_registry && !has_inference {
406            return Err(SchemaError::WildcardWithoutInference);
407        }
408
409        Ok(())
410    }
411
412    /// Checks that wildcard-prefixed column names don't collide with
413    /// declared columns.
414    ///
415    /// Call this after schema resolution when the resolved field names
416    /// are known.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`SchemaError::WildcardPrefixCollision`] on collision.
421    pub fn check_prefix_collision(
422        declared: &DeclaredSchema,
423        resolved: &SchemaRef,
424    ) -> SchemaResult<()> {
425        let Some(prefix) = &declared.wildcard_prefix else {
426            return Ok(());
427        };
428
429        let declared_names: Vec<&str> = declared.columns.iter().map(|c| c.name.as_str()).collect();
430
431        for field in resolved.fields() {
432            let name = field.name();
433            if declared_names.contains(&name.as_str()) {
434                continue; // Will be skipped during merge anyway.
435            }
436            let prefixed = format!("{prefix}{name}");
437            if declared_names.contains(&prefixed.as_str()) {
438                return Err(SchemaError::WildcardPrefixCollision(prefixed));
439            }
440        }
441
442        Ok(())
443    }
444
445    /// Converts a [`DeclaredSchema`] into an Arrow [`SchemaRef`].
446    ///
447    /// # Errors
448    ///
449    /// Returns [`SchemaError::InferenceFailed`] if the declared schema
450    /// has no columns.
451    pub fn declared_to_arrow(declared: &DeclaredSchema) -> SchemaResult<SchemaRef> {
452        if declared.columns.is_empty() {
453            return Err(SchemaError::InferenceFailed(
454                "declared schema has no columns".into(),
455            ));
456        }
457
458        let fields: Vec<Field> = declared
459            .columns
460            .iter()
461            .map(|c| Field::new(&c.name, c.data_type.clone(), c.nullable))
462            .collect();
463
464        Ok(Arc::new(Schema::new(fields)))
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    fn sample_resolved_schema() -> SchemaRef {
473        Arc::new(Schema::new(vec![
474            Field::new("id", DataType::Int64, false),
475            Field::new("name", DataType::Utf8, true),
476            Field::new("age", DataType::Int64, true),
477        ]))
478    }
479
480    // ── DeclaredSchema tests ───────────────────────────────────
481
482    #[test]
483    fn test_declared_schema_full() {
484        let declared = DeclaredSchema::full(vec![
485            DeclaredColumn::new("id", DataType::Int64, false),
486            DeclaredColumn::new("name", DataType::Utf8, true),
487        ]);
488        assert!(!declared.has_wildcard);
489        assert!(!declared.is_empty());
490    }
491
492    #[test]
493    fn test_declared_schema_with_wildcard() {
494        let declared =
495            DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("id", DataType::Int64, false)]);
496        assert!(declared.has_wildcard);
497    }
498
499    #[test]
500    fn test_declared_schema_empty() {
501        let declared = DeclaredSchema::full(vec![]);
502        assert!(declared.is_empty());
503    }
504
505    #[test]
506    fn test_declared_column_with_default() {
507        let col = DeclaredColumn::new("status", DataType::Utf8, false).with_default("active");
508        assert_eq!(col.default.as_deref(), Some("active"));
509    }
510
511    // ── declared_to_arrow tests ────────────────────────────────
512
513    #[test]
514    fn test_declared_to_arrow() {
515        let declared = DeclaredSchema::full(vec![
516            DeclaredColumn::new("id", DataType::Int64, false),
517            DeclaredColumn::new("name", DataType::Utf8, true),
518        ]);
519
520        let schema = SchemaResolver::declared_to_arrow(&declared).unwrap();
521        assert_eq!(schema.fields().len(), 2);
522        assert_eq!(schema.field(0).name(), "id");
523        assert_eq!(schema.field(0).data_type(), &DataType::Int64);
524        assert!(!schema.field(0).is_nullable());
525        assert_eq!(schema.field(1).name(), "name");
526        assert!(schema.field(1).is_nullable());
527    }
528
529    #[test]
530    fn test_declared_to_arrow_empty_error() {
531        let declared = DeclaredSchema::full(vec![]);
532        assert!(SchemaResolver::declared_to_arrow(&declared).is_err());
533    }
534
535    // ── merge_with_declared tests ──────────────────────────────
536
537    #[test]
538    fn test_merge_no_overlap() {
539        let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
540            "extra",
541            DataType::Boolean,
542            false,
543        )]);
544
545        let resolved = sample_resolved_schema();
546        let result = SchemaResolver::merge_with_declared(
547            &declared,
548            &resolved,
549            ResolutionKind::SourceProvided,
550        )
551        .unwrap();
552
553        // "extra" first, then id, name, age
554        assert_eq!(result.schema.fields().len(), 4);
555        assert_eq!(result.schema.field(0).name(), "extra");
556        assert_eq!(result.schema.field(1).name(), "id");
557        assert_eq!(result.schema.field(2).name(), "name");
558        assert_eq!(result.schema.field(3).name(), "age");
559
560        assert_eq!(result.field_origins[0], FieldOrigin::UserDeclared);
561        assert_eq!(result.field_origins[1], FieldOrigin::WildcardInferred);
562    }
563
564    #[test]
565    fn test_merge_with_overlap() {
566        let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
567            "id",
568            DataType::Int32, // different type than resolved
569            false,
570        )]);
571
572        let resolved = sample_resolved_schema();
573        let result = SchemaResolver::merge_with_declared(
574            &declared,
575            &resolved,
576            ResolutionKind::SourceProvided,
577        )
578        .unwrap();
579
580        // "id" from declared (Int32), then name, age from resolved.
581        assert_eq!(result.schema.fields().len(), 3);
582        assert_eq!(result.schema.field(0).name(), "id");
583        assert_eq!(result.schema.field(0).data_type(), &DataType::Int32);
584        assert_eq!(result.schema.field(1).name(), "name");
585        assert_eq!(result.schema.field(2).name(), "age");
586
587        // Should warn about type mismatch.
588        assert!(!result.warnings.is_empty());
589        assert!(result.warnings[0].contains("Int32"));
590    }
591
592    #[test]
593    fn test_merge_with_prefix() {
594        let declared =
595            DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("pk", DataType::Int64, false)])
596                .with_prefix("src_");
597
598        let resolved = sample_resolved_schema();
599        let result = SchemaResolver::merge_with_declared(
600            &declared,
601            &resolved,
602            ResolutionKind::SourceProvided,
603        )
604        .unwrap();
605
606        // "pk" first, then src_id, src_name, src_age
607        assert_eq!(result.schema.fields().len(), 4);
608        assert_eq!(result.schema.field(0).name(), "pk");
609        assert_eq!(result.schema.field(1).name(), "src_id");
610        assert_eq!(result.schema.field(2).name(), "src_name");
611        assert_eq!(result.schema.field(3).name(), "src_age");
612    }
613
614    // ── ResolutionKind tests ───────────────────────────────────
615
616    #[test]
617    fn test_resolution_kind_eq() {
618        assert_eq!(ResolutionKind::Declared, ResolutionKind::Declared);
619        assert_ne!(ResolutionKind::Declared, ResolutionKind::SourceProvided);
620        assert_eq!(
621            ResolutionKind::Registry { schema_id: 1 },
622            ResolutionKind::Registry { schema_id: 1 }
623        );
624    }
625
626    // ── FieldOrigin tests ──────────────────────────────────────
627
628    #[test]
629    fn test_field_origin_variants() {
630        let origins = [
631            FieldOrigin::UserDeclared,
632            FieldOrigin::AutoResolved,
633            FieldOrigin::WildcardInferred,
634            FieldOrigin::DefaultAdded,
635        ];
636        assert_eq!(origins.len(), 4);
637        assert_ne!(FieldOrigin::UserDeclared, FieldOrigin::AutoResolved);
638    }
639
640    // ── ResolvedSchema tests ───────────────────────────────────
641
642    #[test]
643    fn test_resolved_schema_declared() {
644        let declared = DeclaredSchema::full(vec![
645            DeclaredColumn::new("x", DataType::Float64, false),
646            DeclaredColumn::new("y", DataType::Float64, false),
647        ]);
648
649        let schema = SchemaResolver::declared_to_arrow(&declared).unwrap();
650        let resolved = ResolvedSchema {
651            schema,
652            kind: ResolutionKind::Declared,
653            field_origins: vec![FieldOrigin::UserDeclared; 2],
654            warnings: vec![],
655        };
656
657        assert_eq!(resolved.kind, ResolutionKind::Declared);
658        assert_eq!(resolved.field_origins.len(), 2);
659        assert!(resolved.warnings.is_empty());
660    }
661
662    // ── check_prefix_collision tests ────────────────────────
663
664    #[test]
665    fn test_prefix_collision_none() {
666        let declared =
667            DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("pk", DataType::Int64, false)]);
668        let resolved = sample_resolved_schema();
669        assert!(SchemaResolver::check_prefix_collision(&declared, &resolved).is_ok());
670    }
671
672    #[test]
673    fn test_prefix_collision_detected() {
674        let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
675            "src_name",
676            DataType::Utf8,
677            true,
678        )])
679        .with_prefix("src_");
680
681        let resolved = sample_resolved_schema(); // has "name"
682        let result = SchemaResolver::check_prefix_collision(&declared, &resolved);
683        assert!(result.is_err());
684        let err = result.unwrap_err();
685        assert!(err.to_string().contains("src_name"));
686    }
687
688    #[test]
689    fn test_prefix_collision_no_prefix_ok() {
690        let declared =
691            DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("name", DataType::Utf8, true)]);
692        let resolved = sample_resolved_schema();
693        // No prefix set, so no collision check needed.
694        assert!(SchemaResolver::check_prefix_collision(&declared, &resolved).is_ok());
695    }
696
697    #[test]
698    fn test_prefix_collision_skip_declared_overlap() {
699        // If a resolved field overlaps with a declared field (same name without prefix),
700        // it won't be expanded, so no collision.
701        let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
702            "src_id",
703            DataType::Int64,
704            false,
705        )])
706        .with_prefix("src_");
707
708        // "id" -> "src_id" would collide, BUT "id" is not the same as "src_id" (the declared name),
709        // so it IS a collision.
710        let resolved = sample_resolved_schema();
711        let result = SchemaResolver::check_prefix_collision(&declared, &resolved);
712        assert!(result.is_err());
713    }
714}