1use std::sync::Arc;
17
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19
20use super::error::{SchemaError, SchemaResult};
21use super::traits::InferenceConfig;
22use crate::connector::SourceConnector;
23
24#[derive(Debug, Clone)]
26pub struct DeclaredSchema {
27 pub columns: Vec<DeclaredColumn>,
29
30 pub has_wildcard: bool,
33
34 pub wildcard_prefix: Option<String>,
37}
38
39impl DeclaredSchema {
40 #[must_use]
42 pub fn full(columns: Vec<DeclaredColumn>) -> Self {
43 Self {
44 columns,
45 has_wildcard: false,
46 wildcard_prefix: None,
47 }
48 }
49
50 #[must_use]
52 pub fn with_wildcard(columns: Vec<DeclaredColumn>) -> Self {
53 Self {
54 columns,
55 has_wildcard: true,
56 wildcard_prefix: None,
57 }
58 }
59
60 #[must_use]
62 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
63 self.wildcard_prefix = Some(prefix.into());
64 self
65 }
66
67 #[must_use]
69 pub fn is_empty(&self) -> bool {
70 self.columns.is_empty() && !self.has_wildcard
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct DeclaredColumn {
77 pub name: String,
79
80 pub data_type: DataType,
82
83 pub nullable: bool,
85
86 pub default: Option<String>,
88}
89
90impl DeclaredColumn {
91 #[must_use]
93 pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
94 Self {
95 name: name.into(),
96 data_type,
97 nullable,
98 default: None,
99 }
100 }
101
102 #[must_use]
104 pub fn with_default(mut self, expr: impl Into<String>) -> Self {
105 self.default = Some(expr.into());
106 self
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct ResolvedSchema {
113 pub schema: SchemaRef,
115
116 pub kind: ResolutionKind,
118
119 pub field_origins: Vec<FieldOrigin>,
121
122 pub warnings: Vec<String>,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum ResolutionKind {
129 Declared,
131
132 SourceProvided,
134
135 Registry {
137 schema_id: i32,
139 },
140
141 Inferred {
143 sample_count: usize,
145
146 warnings: Vec<String>,
148 },
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum FieldOrigin {
154 UserDeclared,
156
157 AutoResolved,
159
160 WildcardInferred,
162
163 DefaultAdded,
165}
166
167impl std::fmt::Display for FieldOrigin {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 FieldOrigin::UserDeclared => write!(f, "DECLARED"),
171 FieldOrigin::AutoResolved => write!(f, "AUTO"),
172 FieldOrigin::WildcardInferred => write!(f, "WILDCARD"),
173 FieldOrigin::DefaultAdded => write!(f, "DEFAULT"),
174 }
175 }
176}
177
178pub struct SchemaResolver;
182
183impl SchemaResolver {
184 pub async fn resolve(
200 declared: &DeclaredSchema,
201 connector: &dyn SourceConnector,
202 inference_config: &InferenceConfig,
203 ) -> SchemaResult<ResolvedSchema> {
204 if !declared.columns.is_empty() && !declared.has_wildcard {
206 let schema = Self::declared_to_arrow(declared)?;
207 let origins = vec![FieldOrigin::UserDeclared; schema.fields().len()];
208 return Ok(ResolvedSchema {
209 schema,
210 kind: ResolutionKind::Declared,
211 field_origins: origins,
212 warnings: vec![],
213 });
214 }
215
216 if let Some(registry) = connector.as_schema_registry_aware() {
218 let subject = format!("{}-value", inference_config.format);
220 if let Ok(registered) = registry.fetch_schema(&subject).await {
221 let resolved_schema = registered.schema.clone();
222 let kind = ResolutionKind::Registry {
223 schema_id: registered.id,
224 };
225
226 if declared.has_wildcard {
227 return Self::merge_with_declared(declared, &resolved_schema, kind);
228 }
229
230 let origins = vec![FieldOrigin::AutoResolved; resolved_schema.fields().len()];
231 return Ok(ResolvedSchema {
232 schema: resolved_schema,
233 kind,
234 field_origins: origins,
235 warnings: vec![],
236 });
237 }
238 }
239
240 if let Some(provider) = connector.as_schema_provider() {
242 if let Ok(source_schema) = provider.provide_schema().await {
243 if declared.has_wildcard {
244 return Self::merge_with_declared(
245 declared,
246 &source_schema,
247 ResolutionKind::SourceProvided,
248 );
249 }
250
251 let origins = vec![FieldOrigin::AutoResolved; source_schema.fields().len()];
252 return Ok(ResolvedSchema {
253 schema: source_schema,
254 kind: ResolutionKind::SourceProvided,
255 field_origins: origins,
256 warnings: vec![],
257 });
258 }
259 }
260
261 if let Some(inferable) = connector.as_schema_inferable() {
263 let samples = inferable
264 .sample_records(inference_config.max_samples)
265 .await?;
266
267 if !samples.is_empty() {
268 let inferred = inferable
269 .infer_from_samples(&samples, inference_config)
270 .await?;
271
272 let inf_warnings: Vec<String> = inferred
273 .warnings
274 .iter()
275 .map(|w| w.message.clone())
276 .collect();
277
278 let kind = ResolutionKind::Inferred {
279 sample_count: inferred.sample_count,
280 warnings: inf_warnings.clone(),
281 };
282
283 if declared.has_wildcard {
284 let mut resolved = Self::merge_with_declared(declared, &inferred.schema, kind)?;
285 resolved.warnings.extend(inf_warnings);
286 return Ok(resolved);
287 }
288
289 let origins = vec![FieldOrigin::AutoResolved; inferred.schema.fields().len()];
290 return Ok(ResolvedSchema {
291 schema: inferred.schema,
292 kind,
293 field_origins: origins,
294 warnings: inf_warnings,
295 });
296 }
297 }
298
299 Err(SchemaError::InferenceFailed(
301 "no schema could be resolved: declare a schema, configure a registry, \
302 or ensure the connector supports schema inference"
303 .into(),
304 ))
305 }
306
307 pub fn merge_with_declared(
323 declared: &DeclaredSchema,
324 resolved: &SchemaRef,
325 kind: ResolutionKind,
326 ) -> SchemaResult<ResolvedSchema> {
327 let mut fields: Vec<Field> = Vec::new();
328 let mut origins: Vec<FieldOrigin> = Vec::new();
329 let mut warnings: Vec<String> = Vec::new();
330
331 let declared_names: Vec<&str> = declared.columns.iter().map(|c| c.name.as_str()).collect();
333
334 for col in &declared.columns {
335 fields.push(Field::new(&col.name, col.data_type.clone(), col.nullable));
336 origins.push(FieldOrigin::UserDeclared);
337 }
338
339 for field in resolved.fields() {
341 let name = field.name();
342 if declared_names.contains(&name.as_str()) {
343 let declared_col = declared.columns.iter().find(|c| c.name == *name).unwrap();
345 if declared_col.data_type != *field.data_type() {
346 warnings.push(format!(
347 "field '{}': declared type {} overrides resolved type {}",
348 name,
349 declared_col.data_type,
350 field.data_type()
351 ));
352 }
353 continue;
354 }
355
356 let field_name = if let Some(ref prefix) = declared.wildcard_prefix {
358 format!("{prefix}{name}")
359 } else {
360 name.clone()
361 };
362
363 fields.push(Field::new(
364 &field_name,
365 field.data_type().clone(),
366 field.is_nullable(),
367 ));
368 origins.push(FieldOrigin::WildcardInferred);
369 }
370
371 Ok(ResolvedSchema {
372 schema: Arc::new(Schema::new(fields)),
373 kind,
374 field_origins: origins,
375 warnings,
376 })
377 }
378
379 pub fn validate_wildcard(
393 declared: &DeclaredSchema,
394 connector: &dyn SourceConnector,
395 ) -> SchemaResult<()> {
396 if !declared.has_wildcard {
397 return Ok(());
398 }
399
400 let has_provider = connector.as_schema_provider().is_some();
402 let has_registry = connector.as_schema_registry_aware().is_some();
403 let has_inference = connector.as_schema_inferable().is_some();
404
405 if !has_provider && !has_registry && !has_inference {
406 return Err(SchemaError::WildcardWithoutInference);
407 }
408
409 Ok(())
410 }
411
412 pub fn check_prefix_collision(
422 declared: &DeclaredSchema,
423 resolved: &SchemaRef,
424 ) -> SchemaResult<()> {
425 let Some(prefix) = &declared.wildcard_prefix else {
426 return Ok(());
427 };
428
429 let declared_names: Vec<&str> = declared.columns.iter().map(|c| c.name.as_str()).collect();
430
431 for field in resolved.fields() {
432 let name = field.name();
433 if declared_names.contains(&name.as_str()) {
434 continue; }
436 let prefixed = format!("{prefix}{name}");
437 if declared_names.contains(&prefixed.as_str()) {
438 return Err(SchemaError::WildcardPrefixCollision(prefixed));
439 }
440 }
441
442 Ok(())
443 }
444
445 pub fn declared_to_arrow(declared: &DeclaredSchema) -> SchemaResult<SchemaRef> {
452 if declared.columns.is_empty() {
453 return Err(SchemaError::InferenceFailed(
454 "declared schema has no columns".into(),
455 ));
456 }
457
458 let fields: Vec<Field> = declared
459 .columns
460 .iter()
461 .map(|c| Field::new(&c.name, c.data_type.clone(), c.nullable))
462 .collect();
463
464 Ok(Arc::new(Schema::new(fields)))
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 fn sample_resolved_schema() -> SchemaRef {
473 Arc::new(Schema::new(vec![
474 Field::new("id", DataType::Int64, false),
475 Field::new("name", DataType::Utf8, true),
476 Field::new("age", DataType::Int64, true),
477 ]))
478 }
479
480 #[test]
483 fn test_declared_schema_full() {
484 let declared = DeclaredSchema::full(vec![
485 DeclaredColumn::new("id", DataType::Int64, false),
486 DeclaredColumn::new("name", DataType::Utf8, true),
487 ]);
488 assert!(!declared.has_wildcard);
489 assert!(!declared.is_empty());
490 }
491
492 #[test]
493 fn test_declared_schema_with_wildcard() {
494 let declared =
495 DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("id", DataType::Int64, false)]);
496 assert!(declared.has_wildcard);
497 }
498
499 #[test]
500 fn test_declared_schema_empty() {
501 let declared = DeclaredSchema::full(vec![]);
502 assert!(declared.is_empty());
503 }
504
505 #[test]
506 fn test_declared_column_with_default() {
507 let col = DeclaredColumn::new("status", DataType::Utf8, false).with_default("active");
508 assert_eq!(col.default.as_deref(), Some("active"));
509 }
510
511 #[test]
514 fn test_declared_to_arrow() {
515 let declared = DeclaredSchema::full(vec![
516 DeclaredColumn::new("id", DataType::Int64, false),
517 DeclaredColumn::new("name", DataType::Utf8, true),
518 ]);
519
520 let schema = SchemaResolver::declared_to_arrow(&declared).unwrap();
521 assert_eq!(schema.fields().len(), 2);
522 assert_eq!(schema.field(0).name(), "id");
523 assert_eq!(schema.field(0).data_type(), &DataType::Int64);
524 assert!(!schema.field(0).is_nullable());
525 assert_eq!(schema.field(1).name(), "name");
526 assert!(schema.field(1).is_nullable());
527 }
528
529 #[test]
530 fn test_declared_to_arrow_empty_error() {
531 let declared = DeclaredSchema::full(vec![]);
532 assert!(SchemaResolver::declared_to_arrow(&declared).is_err());
533 }
534
535 #[test]
538 fn test_merge_no_overlap() {
539 let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
540 "extra",
541 DataType::Boolean,
542 false,
543 )]);
544
545 let resolved = sample_resolved_schema();
546 let result = SchemaResolver::merge_with_declared(
547 &declared,
548 &resolved,
549 ResolutionKind::SourceProvided,
550 )
551 .unwrap();
552
553 assert_eq!(result.schema.fields().len(), 4);
555 assert_eq!(result.schema.field(0).name(), "extra");
556 assert_eq!(result.schema.field(1).name(), "id");
557 assert_eq!(result.schema.field(2).name(), "name");
558 assert_eq!(result.schema.field(3).name(), "age");
559
560 assert_eq!(result.field_origins[0], FieldOrigin::UserDeclared);
561 assert_eq!(result.field_origins[1], FieldOrigin::WildcardInferred);
562 }
563
564 #[test]
565 fn test_merge_with_overlap() {
566 let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
567 "id",
568 DataType::Int32, false,
570 )]);
571
572 let resolved = sample_resolved_schema();
573 let result = SchemaResolver::merge_with_declared(
574 &declared,
575 &resolved,
576 ResolutionKind::SourceProvided,
577 )
578 .unwrap();
579
580 assert_eq!(result.schema.fields().len(), 3);
582 assert_eq!(result.schema.field(0).name(), "id");
583 assert_eq!(result.schema.field(0).data_type(), &DataType::Int32);
584 assert_eq!(result.schema.field(1).name(), "name");
585 assert_eq!(result.schema.field(2).name(), "age");
586
587 assert!(!result.warnings.is_empty());
589 assert!(result.warnings[0].contains("Int32"));
590 }
591
592 #[test]
593 fn test_merge_with_prefix() {
594 let declared =
595 DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("pk", DataType::Int64, false)])
596 .with_prefix("src_");
597
598 let resolved = sample_resolved_schema();
599 let result = SchemaResolver::merge_with_declared(
600 &declared,
601 &resolved,
602 ResolutionKind::SourceProvided,
603 )
604 .unwrap();
605
606 assert_eq!(result.schema.fields().len(), 4);
608 assert_eq!(result.schema.field(0).name(), "pk");
609 assert_eq!(result.schema.field(1).name(), "src_id");
610 assert_eq!(result.schema.field(2).name(), "src_name");
611 assert_eq!(result.schema.field(3).name(), "src_age");
612 }
613
614 #[test]
617 fn test_resolution_kind_eq() {
618 assert_eq!(ResolutionKind::Declared, ResolutionKind::Declared);
619 assert_ne!(ResolutionKind::Declared, ResolutionKind::SourceProvided);
620 assert_eq!(
621 ResolutionKind::Registry { schema_id: 1 },
622 ResolutionKind::Registry { schema_id: 1 }
623 );
624 }
625
626 #[test]
629 fn test_field_origin_variants() {
630 let origins = [
631 FieldOrigin::UserDeclared,
632 FieldOrigin::AutoResolved,
633 FieldOrigin::WildcardInferred,
634 FieldOrigin::DefaultAdded,
635 ];
636 assert_eq!(origins.len(), 4);
637 assert_ne!(FieldOrigin::UserDeclared, FieldOrigin::AutoResolved);
638 }
639
640 #[test]
643 fn test_resolved_schema_declared() {
644 let declared = DeclaredSchema::full(vec![
645 DeclaredColumn::new("x", DataType::Float64, false),
646 DeclaredColumn::new("y", DataType::Float64, false),
647 ]);
648
649 let schema = SchemaResolver::declared_to_arrow(&declared).unwrap();
650 let resolved = ResolvedSchema {
651 schema,
652 kind: ResolutionKind::Declared,
653 field_origins: vec![FieldOrigin::UserDeclared; 2],
654 warnings: vec![],
655 };
656
657 assert_eq!(resolved.kind, ResolutionKind::Declared);
658 assert_eq!(resolved.field_origins.len(), 2);
659 assert!(resolved.warnings.is_empty());
660 }
661
662 #[test]
665 fn test_prefix_collision_none() {
666 let declared =
667 DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("pk", DataType::Int64, false)]);
668 let resolved = sample_resolved_schema();
669 assert!(SchemaResolver::check_prefix_collision(&declared, &resolved).is_ok());
670 }
671
672 #[test]
673 fn test_prefix_collision_detected() {
674 let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
675 "src_name",
676 DataType::Utf8,
677 true,
678 )])
679 .with_prefix("src_");
680
681 let resolved = sample_resolved_schema(); let result = SchemaResolver::check_prefix_collision(&declared, &resolved);
683 assert!(result.is_err());
684 let err = result.unwrap_err();
685 assert!(err.to_string().contains("src_name"));
686 }
687
688 #[test]
689 fn test_prefix_collision_no_prefix_ok() {
690 let declared =
691 DeclaredSchema::with_wildcard(vec![DeclaredColumn::new("name", DataType::Utf8, true)]);
692 let resolved = sample_resolved_schema();
693 assert!(SchemaResolver::check_prefix_collision(&declared, &resolved).is_ok());
695 }
696
697 #[test]
698 fn test_prefix_collision_skip_declared_overlap() {
699 let declared = DeclaredSchema::with_wildcard(vec![DeclaredColumn::new(
702 "src_id",
703 DataType::Int64,
704 false,
705 )])
706 .with_prefix("src_");
707
708 let resolved = sample_resolved_schema();
711 let result = SchemaResolver::check_prefix_collision(&declared, &resolved);
712 assert!(result.is_err());
713 }
714}