1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::SystemTime;
17
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19
20use super::error::{SchemaError, SchemaResult};
21use super::traits::{
22 ColumnProjection, CompatibilityMode, EvolutionVerdict, SchemaChange, SchemaEvolvable,
23};
24
25#[must_use]
29pub fn is_safe_widening(from: &DataType, to: &DataType) -> bool {
30 matches!(
31 (from, to),
32 (
33 DataType::Int8,
34 DataType::Int16 | DataType::Int32 | DataType::Int64
35 ) | (DataType::Int16, DataType::Int32 | DataType::Int64)
36 | (DataType::Int32, DataType::Int64 | DataType::Float64)
37 | (
38 DataType::UInt8,
39 DataType::UInt16 | DataType::UInt32 | DataType::UInt64
40 )
41 | (DataType::UInt16, DataType::UInt32 | DataType::UInt64)
42 | (DataType::UInt32, DataType::UInt64)
43 | (
44 DataType::Float16 | DataType::Int8 | DataType::Int16,
45 DataType::Float32 | DataType::Float64
46 )
47 | (DataType::Float32, DataType::Float64)
48 | (DataType::Utf8, DataType::LargeUtf8)
49 | (DataType::Binary, DataType::LargeBinary)
50 )
51}
52
53#[derive(Debug, Clone)]
61pub struct DefaultSchemaEvolver {
62 pub compatibility: CompatibilityMode,
64}
65
66impl DefaultSchemaEvolver {
67 #[must_use]
69 pub fn new(compatibility: CompatibilityMode) -> Self {
70 Self { compatibility }
71 }
72}
73
74impl Default for DefaultSchemaEvolver {
75 fn default() -> Self {
76 Self {
77 compatibility: CompatibilityMode::None,
78 }
79 }
80}
81
82impl SchemaEvolvable for DefaultSchemaEvolver {
83 fn diff_schemas(&self, old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
84 diff_schemas_by_name(old, new)
85 }
86
87 fn evaluate_evolution(&self, changes: &[SchemaChange]) -> EvolutionVerdict {
88 evaluate_changes(changes, self.compatibility)
89 }
90
91 fn apply_evolution(
92 &self,
93 old: &SchemaRef,
94 changes: &[SchemaChange],
95 ) -> SchemaResult<ColumnProjection> {
96 apply_changes(old, changes)
97 }
98}
99
100#[must_use]
107pub fn diff_schemas_by_name(old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
108 let mut changes = Vec::new();
109
110 let old_fields: HashMap<&str, &Field> = old
111 .fields()
112 .iter()
113 .map(|f| (f.name().as_str(), f.as_ref()))
114 .collect();
115
116 let new_fields: HashMap<&str, &Field> = new
117 .fields()
118 .iter()
119 .map(|f| (f.name().as_str(), f.as_ref()))
120 .collect();
121
122 for field in old.fields() {
124 if !new_fields.contains_key(field.name().as_str()) {
125 changes.push(SchemaChange::ColumnRemoved {
126 name: field.name().clone(),
127 });
128 }
129 }
130
131 for field in new.fields() {
133 if !old_fields.contains_key(field.name().as_str()) {
134 changes.push(SchemaChange::ColumnAdded {
135 name: field.name().clone(),
136 data_type: field.data_type().clone(),
137 nullable: field.is_nullable(),
138 });
139 }
140 }
141
142 for field in new.fields() {
144 if let Some(old_field) = old_fields.get(field.name().as_str()) {
145 if old_field.data_type() != field.data_type() {
146 changes.push(SchemaChange::TypeChanged {
147 name: field.name().clone(),
148 old_type: old_field.data_type().clone(),
149 new_type: field.data_type().clone(),
150 });
151 }
152 if old_field.is_nullable() != field.is_nullable() {
153 changes.push(SchemaChange::NullabilityChanged {
154 name: field.name().clone(),
155 was_nullable: old_field.is_nullable(),
156 now_nullable: field.is_nullable(),
157 });
158 }
159 }
160 }
161
162 changes
163}
164
165#[must_use]
169pub fn evaluate_changes(changes: &[SchemaChange], mode: CompatibilityMode) -> EvolutionVerdict {
170 if changes.is_empty() {
171 return EvolutionVerdict::Compatible;
172 }
173
174 if mode == CompatibilityMode::None {
176 return EvolutionVerdict::Compatible;
177 }
178
179 let requires_backward = matches!(
180 mode,
181 CompatibilityMode::Backward
182 | CompatibilityMode::Full
183 | CompatibilityMode::BackwardTransitive
184 | CompatibilityMode::FullTransitive
185 );
186 let requires_forward = matches!(
187 mode,
188 CompatibilityMode::Forward
189 | CompatibilityMode::Full
190 | CompatibilityMode::ForwardTransitive
191 | CompatibilityMode::FullTransitive
192 );
193
194 let mut needs_migration = false;
195
196 for change in changes {
197 match change {
198 SchemaChange::ColumnAdded { nullable, .. } => {
199 if requires_backward && !nullable {
204 return EvolutionVerdict::Incompatible(format!(
205 "adding non-nullable column '{}' is not backward-compatible",
206 change_name(change)
207 ));
208 }
209 }
210 SchemaChange::ColumnRemoved { name } => {
211 if requires_forward {
218 return EvolutionVerdict::Incompatible(format!(
219 "removing column '{name}' is not forward-compatible"
220 ));
221 }
222 needs_migration = true;
223 }
224 SchemaChange::TypeChanged {
225 name,
226 old_type,
227 new_type,
228 } => {
229 if is_safe_widening(old_type, new_type) {
230 if requires_forward {
234 return EvolutionVerdict::Incompatible(format!(
235 "widening column '{name}' from {old_type:?} to {new_type:?} \
236 is not forward-compatible"
237 ));
238 }
239 } else if is_safe_widening(new_type, old_type) {
240 if requires_backward {
242 return EvolutionVerdict::Incompatible(format!(
243 "narrowing column '{name}' from {old_type:?} to {new_type:?} \
244 is not backward-compatible"
245 ));
246 }
247 needs_migration = true;
248 } else {
249 return EvolutionVerdict::Incompatible(format!(
251 "changing column '{name}' from {old_type:?} to {new_type:?} \
252 is incompatible"
253 ));
254 }
255 }
256 SchemaChange::NullabilityChanged {
257 name,
258 was_nullable,
259 now_nullable,
260 } => {
261 if *was_nullable && !now_nullable {
262 if requires_backward {
264 return EvolutionVerdict::Incompatible(format!(
265 "making column '{name}' non-nullable is not backward-compatible"
266 ));
267 }
268 needs_migration = true;
269 }
270 }
272 SchemaChange::ColumnRenamed { old_name, .. } => {
273 return EvolutionVerdict::Incompatible(format!(
276 "renaming column '{old_name}' is not supported without field IDs"
277 ));
278 }
279 }
280 }
281
282 if needs_migration {
283 EvolutionVerdict::RequiresMigration
284 } else {
285 EvolutionVerdict::Compatible
286 }
287}
288
289fn change_name(change: &SchemaChange) -> &str {
290 match change {
291 SchemaChange::ColumnAdded { name, .. }
292 | SchemaChange::ColumnRemoved { name }
293 | SchemaChange::TypeChanged { name, .. }
294 | SchemaChange::NullabilityChanged { name, .. } => name,
295 SchemaChange::ColumnRenamed { old_name, .. } => old_name,
296 }
297}
298
299pub fn apply_changes(old: &SchemaRef, changes: &[SchemaChange]) -> SchemaResult<ColumnProjection> {
308 let mut fields: Vec<Field> = old.fields().iter().map(|f| f.as_ref().clone()).collect();
309 let mut old_index_map: Vec<Option<usize>> = (0..fields.len()).map(Some).collect();
310
311 let mut remove_indices = Vec::new();
313
314 for change in changes {
315 match change {
316 SchemaChange::ColumnAdded {
317 name,
318 data_type,
319 nullable,
320 } => {
321 fields.push(Field::new(name, data_type.clone(), *nullable));
322 old_index_map.push(None); }
324 SchemaChange::ColumnRemoved { name } => {
325 if let Some(idx) = fields.iter().position(|f| f.name() == name) {
326 remove_indices.push(idx);
327 }
328 }
329 SchemaChange::TypeChanged { name, new_type, .. } => {
330 if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
331 *field = Field::new(field.name(), new_type.clone(), field.is_nullable());
332 }
333 }
334 SchemaChange::NullabilityChanged {
335 name, now_nullable, ..
336 } => {
337 if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
338 *field = Field::new(field.name(), field.data_type().clone(), *now_nullable);
339 }
340 }
341 SchemaChange::ColumnRenamed {
342 old_name, new_name, ..
343 } => {
344 if let Some(field) = fields.iter_mut().find(|f| f.name() == old_name) {
345 *field = Field::new(new_name, field.data_type().clone(), field.is_nullable());
346 }
347 }
348 }
349 }
350
351 remove_indices.sort_unstable();
353 remove_indices.dedup();
354 for &idx in remove_indices.iter().rev() {
355 fields.remove(idx);
356 old_index_map.remove(idx);
357 }
358
359 let new_schema = Arc::new(Schema::new(fields));
360
361 Ok(ColumnProjection {
362 mappings: old_index_map,
363 target_schema: new_schema,
364 })
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
371pub enum EvolutionTrigger {
372 Ddl,
374 Registry {
376 schema_id: u32,
378 },
379 ManualRefresh,
381}
382
383#[derive(Debug, Clone)]
385pub struct SchemaHistoryEntry {
386 pub source_name: String,
388 pub version: u64,
390 pub schema: SchemaRef,
392 pub changes: Vec<SchemaChange>,
394 pub applied_at: SystemTime,
396 pub trigger: EvolutionTrigger,
398}
399
400#[derive(Debug, Default)]
402pub struct SchemaHistory {
403 entries: HashMap<String, Vec<SchemaHistoryEntry>>,
404}
405
406impl SchemaHistory {
407 #[must_use]
409 pub fn new() -> Self {
410 Self::default()
411 }
412
413 pub fn record(
415 &mut self,
416 source_name: &str,
417 schema: SchemaRef,
418 changes: Vec<SchemaChange>,
419 trigger: EvolutionTrigger,
420 ) -> u64 {
421 let entries = self.entries.entry(source_name.to_string()).or_default();
422 let version = entries.last().map_or(1, |e| e.version + 1);
423 entries.push(SchemaHistoryEntry {
424 source_name: source_name.to_string(),
425 version,
426 schema,
427 changes,
428 applied_at: SystemTime::now(),
429 trigger,
430 });
431 version
432 }
433
434 #[must_use]
436 pub fn versions(&self, source_name: &str) -> &[SchemaHistoryEntry] {
437 self.entries.get(source_name).map_or(&[], |v| v.as_slice())
438 }
439
440 #[must_use]
442 pub fn latest_version(&self, source_name: &str) -> Option<u64> {
443 self.entries.get(source_name)?.last().map(|e| e.version)
444 }
445}
446
447#[derive(Debug)]
451pub enum EvolutionResult {
452 NoChange,
454 Applied {
456 new_schema: SchemaRef,
458 projection: ColumnProjection,
460 version: u64,
462 changes: Vec<SchemaChange>,
464 },
465}
466
467#[derive(Debug)]
474pub struct SchemaEvolutionEngine {
475 pub history: SchemaHistory,
477 pub default_compatibility: CompatibilityMode,
479}
480
481impl SchemaEvolutionEngine {
482 #[must_use]
484 pub fn new(default_compatibility: CompatibilityMode) -> Self {
485 Self {
486 history: SchemaHistory::new(),
487 default_compatibility,
488 }
489 }
490
491 pub fn evolve(
497 &mut self,
498 source_name: &str,
499 evolvable: &dyn SchemaEvolvable,
500 current: &SchemaRef,
501 proposed: &SchemaRef,
502 trigger: EvolutionTrigger,
503 ) -> SchemaResult<EvolutionResult> {
504 let changes = evolvable.diff_schemas(current, proposed);
505
506 if changes.is_empty() {
507 return Ok(EvolutionResult::NoChange);
508 }
509
510 let verdict = evolvable.evaluate_evolution(&changes);
511
512 match verdict {
513 EvolutionVerdict::Compatible | EvolutionVerdict::RequiresMigration => {
514 let projection = evolvable.apply_evolution(current, &changes)?;
515 let version = self.history.record(
516 source_name,
517 projection.target_schema.clone(),
518 changes.clone(),
519 trigger,
520 );
521 Ok(EvolutionResult::Applied {
522 new_schema: projection.target_schema.clone(),
523 projection,
524 version,
525 changes,
526 })
527 }
528 EvolutionVerdict::Incompatible(reason) => Err(SchemaError::EvolutionRejected(reason)),
529 }
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536
537 fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
538 Arc::new(Schema::new(
539 fields
540 .iter()
541 .map(|(name, dt, nullable)| Field::new(*name, dt.clone(), *nullable))
542 .collect::<Vec<_>>(),
543 ))
544 }
545
546 #[test]
549 fn test_int_widening() {
550 assert!(is_safe_widening(&DataType::Int8, &DataType::Int16));
551 assert!(is_safe_widening(&DataType::Int8, &DataType::Int32));
552 assert!(is_safe_widening(&DataType::Int8, &DataType::Int64));
553 assert!(is_safe_widening(&DataType::Int16, &DataType::Int32));
554 assert!(is_safe_widening(&DataType::Int16, &DataType::Int64));
555 assert!(is_safe_widening(&DataType::Int32, &DataType::Int64));
556 }
557
558 #[test]
559 fn test_uint_widening() {
560 assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt16));
561 assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt32));
562 assert!(is_safe_widening(&DataType::UInt32, &DataType::UInt64));
563 }
564
565 #[test]
566 fn test_float_widening() {
567 assert!(is_safe_widening(&DataType::Float16, &DataType::Float32));
568 assert!(is_safe_widening(&DataType::Float16, &DataType::Float64));
569 assert!(is_safe_widening(&DataType::Float32, &DataType::Float64));
570 }
571
572 #[test]
573 fn test_int_to_float_widening() {
574 assert!(is_safe_widening(&DataType::Int8, &DataType::Float32));
575 assert!(is_safe_widening(&DataType::Int16, &DataType::Float64));
576 assert!(is_safe_widening(&DataType::Int32, &DataType::Float64));
577 }
578
579 #[test]
580 fn test_string_binary_widening() {
581 assert!(is_safe_widening(&DataType::Utf8, &DataType::LargeUtf8));
582 assert!(is_safe_widening(&DataType::Binary, &DataType::LargeBinary));
583 }
584
585 #[test]
586 fn test_narrowing_not_safe() {
587 assert!(!is_safe_widening(&DataType::Int64, &DataType::Int32));
588 assert!(!is_safe_widening(&DataType::Float64, &DataType::Float32));
589 assert!(!is_safe_widening(&DataType::LargeUtf8, &DataType::Utf8));
590 }
591
592 #[test]
593 fn test_unrelated_types() {
594 assert!(!is_safe_widening(&DataType::Int64, &DataType::Utf8));
595 assert!(!is_safe_widening(&DataType::Boolean, &DataType::Int32));
596 }
597
598 #[test]
601 fn test_diff_identical() {
602 let s = schema(&[("a", DataType::Int64, false)]);
603 let changes = diff_schemas_by_name(&s, &s);
604 assert!(changes.is_empty());
605 }
606
607 #[test]
608 fn test_diff_column_added() {
609 let old = schema(&[("a", DataType::Int64, false)]);
610 let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
611 let changes = diff_schemas_by_name(&old, &new);
612 assert_eq!(changes.len(), 1);
613 assert!(matches!(&changes[0], SchemaChange::ColumnAdded { name, .. } if name == "b"));
614 }
615
616 #[test]
617 fn test_diff_column_removed() {
618 let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
619 let new = schema(&[("a", DataType::Int64, false)]);
620 let changes = diff_schemas_by_name(&old, &new);
621 assert_eq!(changes.len(), 1);
622 assert!(matches!(&changes[0], SchemaChange::ColumnRemoved { name } if name == "b"));
623 }
624
625 #[test]
626 fn test_diff_type_changed() {
627 let old = schema(&[("a", DataType::Int32, false)]);
628 let new = schema(&[("a", DataType::Int64, false)]);
629 let changes = diff_schemas_by_name(&old, &new);
630 assert_eq!(changes.len(), 1);
631 assert!(matches!(
632 &changes[0],
633 SchemaChange::TypeChanged {
634 name,
635 old_type: DataType::Int32,
636 new_type: DataType::Int64,
637 } if name == "a"
638 ));
639 }
640
641 #[test]
642 fn test_diff_nullability_changed() {
643 let old = schema(&[("a", DataType::Int64, false)]);
644 let new = schema(&[("a", DataType::Int64, true)]);
645 let changes = diff_schemas_by_name(&old, &new);
646 assert_eq!(changes.len(), 1);
647 assert!(matches!(
648 &changes[0],
649 SchemaChange::NullabilityChanged {
650 name,
651 was_nullable: false,
652 now_nullable: true,
653 } if name == "a"
654 ));
655 }
656
657 #[test]
658 fn test_diff_multiple_changes() {
659 let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
660 let new = schema(&[
661 ("a", DataType::Int64, true), ("c", DataType::Float64, false), ]);
664 let changes = diff_schemas_by_name(&old, &new);
665 assert_eq!(changes.len(), 3);
667 }
668
669 #[test]
672 fn test_evaluate_no_changes() {
673 let verdict = evaluate_changes(&[], CompatibilityMode::Full);
674 assert_eq!(verdict, EvolutionVerdict::Compatible);
675 }
676
677 #[test]
678 fn test_evaluate_none_mode_allows_all() {
679 let changes = vec![
680 SchemaChange::ColumnRemoved { name: "x".into() },
681 SchemaChange::TypeChanged {
682 name: "y".into(),
683 old_type: DataType::Int64,
684 new_type: DataType::Utf8, },
686 ];
687 let verdict = evaluate_changes(&changes, CompatibilityMode::None);
688 assert_eq!(verdict, EvolutionVerdict::Compatible);
689 }
690
691 #[test]
692 fn test_evaluate_backward_add_nullable_ok() {
693 let changes = vec![SchemaChange::ColumnAdded {
694 name: "email".into(),
695 data_type: DataType::Utf8,
696 nullable: true,
697 }];
698 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
699 assert_eq!(verdict, EvolutionVerdict::Compatible);
700 }
701
702 #[test]
703 fn test_evaluate_backward_add_non_nullable_rejected() {
704 let changes = vec![SchemaChange::ColumnAdded {
705 name: "email".into(),
706 data_type: DataType::Utf8,
707 nullable: false,
708 }];
709 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
710 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
711 }
712
713 #[test]
714 fn test_evaluate_forward_drop_rejected() {
715 let changes = vec![SchemaChange::ColumnRemoved {
716 name: "legacy".into(),
717 }];
718 let verdict = evaluate_changes(&changes, CompatibilityMode::Forward);
719 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
720 }
721
722 #[test]
723 fn test_evaluate_backward_drop_ok() {
724 let changes = vec![SchemaChange::ColumnRemoved {
725 name: "legacy".into(),
726 }];
727 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
728 assert_eq!(verdict, EvolutionVerdict::RequiresMigration);
729 }
730
731 #[test]
732 fn test_evaluate_backward_widening_ok() {
733 let changes = vec![SchemaChange::TypeChanged {
734 name: "count".into(),
735 old_type: DataType::Int32,
736 new_type: DataType::Int64,
737 }];
738 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
739 assert_eq!(verdict, EvolutionVerdict::Compatible);
740 }
741
742 #[test]
743 fn test_evaluate_full_widening_rejected() {
744 let changes = vec![SchemaChange::TypeChanged {
745 name: "count".into(),
746 old_type: DataType::Int32,
747 new_type: DataType::Int64,
748 }];
749 let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
750 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
751 }
752
753 #[test]
754 fn test_evaluate_unrelated_type_change_rejected() {
755 let changes = vec![SchemaChange::TypeChanged {
756 name: "val".into(),
757 old_type: DataType::Int64,
758 new_type: DataType::Utf8,
759 }];
760 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
761 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
762 }
763
764 #[test]
765 fn test_evaluate_make_nullable_ok() {
766 let changes = vec![SchemaChange::NullabilityChanged {
767 name: "field".into(),
768 was_nullable: false,
769 now_nullable: true,
770 }];
771 let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
772 assert_eq!(verdict, EvolutionVerdict::Compatible);
773 }
774
775 #[test]
776 fn test_evaluate_make_non_nullable_backward_rejected() {
777 let changes = vec![SchemaChange::NullabilityChanged {
778 name: "field".into(),
779 was_nullable: true,
780 now_nullable: false,
781 }];
782 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
783 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
784 }
785
786 #[test]
787 fn test_evaluate_rename_rejected() {
788 let changes = vec![SchemaChange::ColumnRenamed {
789 old_name: "fname".into(),
790 new_name: "first_name".into(),
791 }];
792 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
793 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
794 }
795
796 #[test]
799 fn test_apply_add_column() {
800 let old = schema(&[("a", DataType::Int64, false)]);
801 let changes = vec![SchemaChange::ColumnAdded {
802 name: "b".into(),
803 data_type: DataType::Utf8,
804 nullable: true,
805 }];
806 let proj = apply_changes(&old, &changes).unwrap();
807 assert_eq!(proj.target_schema.fields().len(), 2);
808 assert_eq!(proj.mappings, vec![Some(0), None]);
809 }
810
811 #[test]
812 fn test_apply_remove_column() {
813 let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
814 let changes = vec![SchemaChange::ColumnRemoved { name: "b".into() }];
815 let proj = apply_changes(&old, &changes).unwrap();
816 assert_eq!(proj.target_schema.fields().len(), 1);
817 assert_eq!(proj.target_schema.field(0).name(), "a");
818 assert_eq!(proj.mappings, vec![Some(0)]);
819 }
820
821 #[test]
822 fn test_apply_widen_type() {
823 let old = schema(&[("val", DataType::Int32, false)]);
824 let changes = vec![SchemaChange::TypeChanged {
825 name: "val".into(),
826 old_type: DataType::Int32,
827 new_type: DataType::Int64,
828 }];
829 let proj = apply_changes(&old, &changes).unwrap();
830 assert_eq!(proj.target_schema.field(0).data_type(), &DataType::Int64);
831 assert_eq!(proj.mappings, vec![Some(0)]);
832 }
833
834 #[test]
835 fn test_apply_change_nullability() {
836 let old = schema(&[("val", DataType::Int64, false)]);
837 let changes = vec![SchemaChange::NullabilityChanged {
838 name: "val".into(),
839 was_nullable: false,
840 now_nullable: true,
841 }];
842 let proj = apply_changes(&old, &changes).unwrap();
843 assert!(proj.target_schema.field(0).is_nullable());
844 }
845
846 #[test]
847 fn test_apply_rename() {
848 let old = schema(&[("fname", DataType::Utf8, false)]);
849 let changes = vec![SchemaChange::ColumnRenamed {
850 old_name: "fname".into(),
851 new_name: "first_name".into(),
852 }];
853 let proj = apply_changes(&old, &changes).unwrap();
854 assert_eq!(proj.target_schema.field(0).name(), "first_name");
855 assert_eq!(proj.mappings, vec![Some(0)]);
856 }
857
858 #[test]
859 fn test_apply_multi_change() {
860 let old = schema(&[
861 ("a", DataType::Int64, false),
862 ("b", DataType::Int32, true),
863 ("c", DataType::Utf8, false),
864 ]);
865 let changes = vec![
866 SchemaChange::ColumnRemoved { name: "c".into() },
867 SchemaChange::ColumnAdded {
868 name: "d".into(),
869 data_type: DataType::Float64,
870 nullable: true,
871 },
872 SchemaChange::TypeChanged {
873 name: "b".into(),
874 old_type: DataType::Int32,
875 new_type: DataType::Int64,
876 },
877 ];
878 let proj = apply_changes(&old, &changes).unwrap();
879 assert_eq!(proj.target_schema.fields().len(), 3); assert_eq!(proj.target_schema.field(0).name(), "a");
881 assert_eq!(proj.target_schema.field(1).name(), "b");
882 assert_eq!(proj.target_schema.field(1).data_type(), &DataType::Int64);
883 assert_eq!(proj.target_schema.field(2).name(), "d");
884 assert_eq!(proj.mappings, vec![Some(0), Some(1), None]);
886 }
887
888 #[test]
891 fn test_history_record_and_query() {
892 let mut history = SchemaHistory::new();
893 let s1 = schema(&[("a", DataType::Int64, false)]);
894 let s2 = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
895
896 let v1 = history.record("test_source", s1, vec![], EvolutionTrigger::Ddl);
897 assert_eq!(v1, 1);
898
899 let v2 = history.record(
900 "test_source",
901 s2,
902 vec![SchemaChange::ColumnAdded {
903 name: "b".into(),
904 data_type: DataType::Utf8,
905 nullable: true,
906 }],
907 EvolutionTrigger::Ddl,
908 );
909 assert_eq!(v2, 2);
910
911 assert_eq!(history.versions("test_source").len(), 2);
912 assert_eq!(history.latest_version("test_source"), Some(2));
913 assert_eq!(history.latest_version("unknown"), None);
914 }
915
916 #[test]
919 fn test_engine_no_change() {
920 let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
921 let evolver = DefaultSchemaEvolver::new(CompatibilityMode::Backward);
922 let s = schema(&[("a", DataType::Int64, false)]);
923
924 let result = engine
925 .evolve("src", &evolver, &s, &s, EvolutionTrigger::Ddl)
926 .unwrap();
927
928 assert!(matches!(result, EvolutionResult::NoChange));
929 }
930
931 #[test]
932 fn test_engine_add_nullable_column() {
933 let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
934 let evolver = DefaultSchemaEvolver::new(CompatibilityMode::Backward);
935 let old = schema(&[("a", DataType::Int64, false)]);
936 let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
937
938 let result = engine
939 .evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl)
940 .unwrap();
941
942 match result {
943 EvolutionResult::Applied {
944 new_schema,
945 version,
946 changes,
947 ..
948 } => {
949 assert_eq!(new_schema.fields().len(), 2);
950 assert_eq!(version, 1);
951 assert_eq!(changes.len(), 1);
952 }
953 EvolutionResult::NoChange => panic!("expected Applied"),
954 }
955 }
956
957 #[test]
958 fn test_engine_incompatible_rejected() {
959 let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Full);
960 let evolver = DefaultSchemaEvolver::new(CompatibilityMode::Full);
961 let old = schema(&[("a", DataType::Int32, false)]);
962 let new = schema(&[("a", DataType::Int64, false)]); let result = engine.evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl);
965 assert!(result.is_err());
966 }
967
968 #[test]
971 fn test_default_evolver_diff_and_apply() {
972 let evolver = DefaultSchemaEvolver::default();
973 let old = schema(&[("id", DataType::Int64, false)]);
974 let new = schema(&[
975 ("id", DataType::Int64, false),
976 ("email", DataType::Utf8, true),
977 ]);
978
979 let changes = evolver.diff_schemas(&old, &new);
980 assert_eq!(changes.len(), 1);
981
982 let verdict = evolver.evaluate_evolution(&changes);
983 assert_eq!(verdict, EvolutionVerdict::Compatible);
984
985 let proj = evolver.apply_evolution(&old, &changes).unwrap();
986 assert_eq!(proj.target_schema.fields().len(), 2);
987 assert_eq!(proj.mappings, vec![Some(0), None]);
988 }
989}