1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::SystemTime;
16
17use arrow_schema::{DataType, Field, Schema, SchemaRef};
18
19use super::error::{SchemaError, SchemaResult};
20use super::traits::{ColumnProjection, CompatibilityMode, EvolutionVerdict, SchemaChange};
21
22#[must_use]
26pub fn is_safe_widening(from: &DataType, to: &DataType) -> bool {
27 matches!(
28 (from, to),
29 (
30 DataType::Int8,
31 DataType::Int16 | DataType::Int32 | DataType::Int64
32 ) | (DataType::Int16, DataType::Int32 | DataType::Int64)
33 | (DataType::Int32, DataType::Int64 | DataType::Float64)
34 | (
35 DataType::UInt8,
36 DataType::UInt16 | DataType::UInt32 | DataType::UInt64
37 )
38 | (DataType::UInt16, DataType::UInt32 | DataType::UInt64)
39 | (DataType::UInt32, DataType::UInt64)
40 | (
41 DataType::Float16 | DataType::Int8 | DataType::Int16,
42 DataType::Float32 | DataType::Float64
43 )
44 | (DataType::Float32, DataType::Float64)
45 | (DataType::Utf8, DataType::LargeUtf8)
46 | (DataType::Binary, DataType::LargeBinary)
47 )
48}
49
50#[derive(Debug, Clone)]
58pub struct SchemaEvolution {
59 pub compatibility: CompatibilityMode,
61}
62
63impl SchemaEvolution {
64 #[must_use]
66 pub fn new(compatibility: CompatibilityMode) -> Self {
67 Self { compatibility }
68 }
69
70 #[must_use]
72 pub fn diff_schemas(&self, old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
73 diff_schemas_by_name(old, new)
74 }
75
76 #[must_use]
78 pub fn evaluate_evolution(&self, changes: &[SchemaChange]) -> EvolutionVerdict {
79 evaluate_changes(changes, self.compatibility)
80 }
81
82 pub fn apply_evolution(
88 &self,
89 old: &SchemaRef,
90 changes: &[SchemaChange],
91 ) -> SchemaResult<ColumnProjection> {
92 apply_changes(old, changes)
93 }
94}
95
96impl Default for SchemaEvolution {
97 fn default() -> Self {
98 Self {
99 compatibility: CompatibilityMode::None,
100 }
101 }
102}
103
104#[must_use]
111pub fn diff_schemas_by_name(old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
112 let mut changes = Vec::new();
113
114 let old_fields: HashMap<&str, &Field> = old
115 .fields()
116 .iter()
117 .map(|f| (f.name().as_str(), f.as_ref()))
118 .collect();
119
120 let new_fields: HashMap<&str, &Field> = new
121 .fields()
122 .iter()
123 .map(|f| (f.name().as_str(), f.as_ref()))
124 .collect();
125
126 for field in old.fields() {
128 if !new_fields.contains_key(field.name().as_str()) {
129 changes.push(SchemaChange::ColumnRemoved {
130 name: field.name().clone(),
131 });
132 }
133 }
134
135 for field in new.fields() {
137 if !old_fields.contains_key(field.name().as_str()) {
138 changes.push(SchemaChange::ColumnAdded {
139 name: field.name().clone(),
140 data_type: field.data_type().clone(),
141 nullable: field.is_nullable(),
142 });
143 }
144 }
145
146 for field in new.fields() {
148 if let Some(old_field) = old_fields.get(field.name().as_str()) {
149 if old_field.data_type() != field.data_type() {
150 changes.push(SchemaChange::TypeChanged {
151 name: field.name().clone(),
152 old_type: old_field.data_type().clone(),
153 new_type: field.data_type().clone(),
154 });
155 }
156 if old_field.is_nullable() != field.is_nullable() {
157 changes.push(SchemaChange::NullabilityChanged {
158 name: field.name().clone(),
159 was_nullable: old_field.is_nullable(),
160 now_nullable: field.is_nullable(),
161 });
162 }
163 }
164 }
165
166 changes
167}
168
169#[must_use]
173pub fn evaluate_changes(changes: &[SchemaChange], mode: CompatibilityMode) -> EvolutionVerdict {
174 if changes.is_empty() {
175 return EvolutionVerdict::Compatible;
176 }
177
178 if mode == CompatibilityMode::None {
180 return EvolutionVerdict::Compatible;
181 }
182
183 let requires_backward = matches!(
184 mode,
185 CompatibilityMode::Backward
186 | CompatibilityMode::Full
187 | CompatibilityMode::BackwardTransitive
188 | CompatibilityMode::FullTransitive
189 );
190 let requires_forward = matches!(
191 mode,
192 CompatibilityMode::Forward
193 | CompatibilityMode::Full
194 | CompatibilityMode::ForwardTransitive
195 | CompatibilityMode::FullTransitive
196 );
197
198 let mut needs_migration = false;
199
200 for change in changes {
201 match change {
202 SchemaChange::ColumnAdded { nullable, .. } => {
203 if requires_backward && !nullable {
208 return EvolutionVerdict::Incompatible(format!(
209 "adding non-nullable column '{}' is not backward-compatible",
210 change_name(change)
211 ));
212 }
213 }
214 SchemaChange::ColumnRemoved { name } => {
215 if requires_forward {
222 return EvolutionVerdict::Incompatible(format!(
223 "removing column '{name}' is not forward-compatible"
224 ));
225 }
226 needs_migration = true;
227 }
228 SchemaChange::TypeChanged {
229 name,
230 old_type,
231 new_type,
232 } => {
233 if is_safe_widening(old_type, new_type) {
234 if requires_forward {
238 return EvolutionVerdict::Incompatible(format!(
239 "widening column '{name}' from {old_type:?} to {new_type:?} \
240 is not forward-compatible"
241 ));
242 }
243 } else if is_safe_widening(new_type, old_type) {
244 if requires_backward {
246 return EvolutionVerdict::Incompatible(format!(
247 "narrowing column '{name}' from {old_type:?} to {new_type:?} \
248 is not backward-compatible"
249 ));
250 }
251 needs_migration = true;
252 } else {
253 return EvolutionVerdict::Incompatible(format!(
255 "changing column '{name}' from {old_type:?} to {new_type:?} \
256 is incompatible"
257 ));
258 }
259 }
260 SchemaChange::NullabilityChanged {
261 name,
262 was_nullable,
263 now_nullable,
264 } => {
265 if *was_nullable && !now_nullable {
266 if requires_backward {
268 return EvolutionVerdict::Incompatible(format!(
269 "making column '{name}' non-nullable is not backward-compatible"
270 ));
271 }
272 needs_migration = true;
273 }
274 }
276 SchemaChange::ColumnRenamed { old_name, .. } => {
277 return EvolutionVerdict::Incompatible(format!(
280 "renaming column '{old_name}' is not supported without field IDs"
281 ));
282 }
283 }
284 }
285
286 if needs_migration {
287 EvolutionVerdict::RequiresMigration
288 } else {
289 EvolutionVerdict::Compatible
290 }
291}
292
293fn change_name(change: &SchemaChange) -> &str {
294 match change {
295 SchemaChange::ColumnAdded { name, .. }
296 | SchemaChange::ColumnRemoved { name }
297 | SchemaChange::TypeChanged { name, .. }
298 | SchemaChange::NullabilityChanged { name, .. } => name,
299 SchemaChange::ColumnRenamed { old_name, .. } => old_name,
300 }
301}
302
303pub fn apply_changes(old: &SchemaRef, changes: &[SchemaChange]) -> SchemaResult<ColumnProjection> {
312 let mut fields: Vec<Field> = old.fields().iter().map(|f| f.as_ref().clone()).collect();
313 let mut old_index_map: Vec<Option<usize>> = (0..fields.len()).map(Some).collect();
314
315 let mut remove_indices = Vec::new();
317
318 for change in changes {
319 match change {
320 SchemaChange::ColumnAdded {
321 name,
322 data_type,
323 nullable,
324 } => {
325 fields.push(Field::new(name, data_type.clone(), *nullable));
326 old_index_map.push(None); }
328 SchemaChange::ColumnRemoved { name } => {
329 if let Some(idx) = fields.iter().position(|f| f.name() == name) {
330 remove_indices.push(idx);
331 }
332 }
333 SchemaChange::TypeChanged { name, new_type, .. } => {
334 if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
335 *field = Field::new(field.name(), new_type.clone(), field.is_nullable());
336 }
337 }
338 SchemaChange::NullabilityChanged {
339 name, now_nullable, ..
340 } => {
341 if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
342 *field = Field::new(field.name(), field.data_type().clone(), *now_nullable);
343 }
344 }
345 SchemaChange::ColumnRenamed {
346 old_name, new_name, ..
347 } => {
348 if let Some(field) = fields.iter_mut().find(|f| f.name() == old_name) {
349 *field = Field::new(new_name, field.data_type().clone(), field.is_nullable());
350 }
351 }
352 }
353 }
354
355 remove_indices.sort_unstable();
357 remove_indices.dedup();
358 for &idx in remove_indices.iter().rev() {
359 fields.remove(idx);
360 old_index_map.remove(idx);
361 }
362
363 let new_schema = Arc::new(Schema::new(fields));
364
365 Ok(ColumnProjection {
366 mappings: old_index_map,
367 target_schema: new_schema,
368 })
369}
370
371#[derive(Debug, Clone, PartialEq, Eq)]
375pub enum EvolutionTrigger {
376 Ddl,
378 Registry {
380 schema_id: u32,
382 },
383 ManualRefresh,
385}
386
387#[derive(Debug, Clone)]
389pub struct SchemaHistoryEntry {
390 pub source_name: String,
392 pub version: u64,
394 pub schema: SchemaRef,
396 pub changes: Vec<SchemaChange>,
398 pub applied_at: SystemTime,
400 pub trigger: EvolutionTrigger,
402}
403
404#[derive(Debug, Default)]
406pub struct SchemaHistory {
407 entries: HashMap<String, Vec<SchemaHistoryEntry>>,
408}
409
410impl SchemaHistory {
411 #[must_use]
413 pub fn new() -> Self {
414 Self::default()
415 }
416
417 pub fn record(
419 &mut self,
420 source_name: &str,
421 schema: SchemaRef,
422 changes: Vec<SchemaChange>,
423 trigger: EvolutionTrigger,
424 ) -> u64 {
425 let entries = self.entries.entry(source_name.to_string()).or_default();
426 let version = entries.last().map_or(1, |e| e.version + 1);
427 entries.push(SchemaHistoryEntry {
428 source_name: source_name.to_string(),
429 version,
430 schema,
431 changes,
432 applied_at: SystemTime::now(),
433 trigger,
434 });
435 version
436 }
437
438 #[must_use]
440 pub fn versions(&self, source_name: &str) -> &[SchemaHistoryEntry] {
441 self.entries.get(source_name).map_or(&[], |v| v.as_slice())
442 }
443
444 #[must_use]
446 pub fn latest_version(&self, source_name: &str) -> Option<u64> {
447 self.entries.get(source_name)?.last().map(|e| e.version)
448 }
449}
450
451#[derive(Debug)]
455pub enum EvolutionResult {
456 NoChange,
458 Applied {
460 new_schema: SchemaRef,
462 projection: ColumnProjection,
464 version: u64,
466 changes: Vec<SchemaChange>,
468 },
469}
470
471#[derive(Debug)]
478pub struct SchemaEvolutionEngine {
479 pub history: SchemaHistory,
481 pub default_compatibility: CompatibilityMode,
483}
484
485impl SchemaEvolutionEngine {
486 #[must_use]
488 pub fn new(default_compatibility: CompatibilityMode) -> Self {
489 Self {
490 history: SchemaHistory::new(),
491 default_compatibility,
492 }
493 }
494
495 pub fn evolve(
501 &mut self,
502 source_name: &str,
503 evolvable: &SchemaEvolution,
504 current: &SchemaRef,
505 proposed: &SchemaRef,
506 trigger: EvolutionTrigger,
507 ) -> SchemaResult<EvolutionResult> {
508 let changes = evolvable.diff_schemas(current, proposed);
509
510 if changes.is_empty() {
511 return Ok(EvolutionResult::NoChange);
512 }
513
514 let verdict = evolvable.evaluate_evolution(&changes);
515
516 match verdict {
517 EvolutionVerdict::Compatible | EvolutionVerdict::RequiresMigration => {
518 let projection = evolvable.apply_evolution(current, &changes)?;
519 let version = self.history.record(
520 source_name,
521 projection.target_schema.clone(),
522 changes.clone(),
523 trigger,
524 );
525 Ok(EvolutionResult::Applied {
526 new_schema: projection.target_schema.clone(),
527 projection,
528 version,
529 changes,
530 })
531 }
532 EvolutionVerdict::Incompatible(reason) => Err(SchemaError::EvolutionRejected(reason)),
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
542 Arc::new(Schema::new(
543 fields
544 .iter()
545 .map(|(name, dt, nullable)| Field::new(*name, dt.clone(), *nullable))
546 .collect::<Vec<_>>(),
547 ))
548 }
549
550 #[test]
553 fn test_int_widening() {
554 assert!(is_safe_widening(&DataType::Int8, &DataType::Int16));
555 assert!(is_safe_widening(&DataType::Int8, &DataType::Int32));
556 assert!(is_safe_widening(&DataType::Int8, &DataType::Int64));
557 assert!(is_safe_widening(&DataType::Int16, &DataType::Int32));
558 assert!(is_safe_widening(&DataType::Int16, &DataType::Int64));
559 assert!(is_safe_widening(&DataType::Int32, &DataType::Int64));
560 }
561
562 #[test]
563 fn test_uint_widening() {
564 assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt16));
565 assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt32));
566 assert!(is_safe_widening(&DataType::UInt32, &DataType::UInt64));
567 }
568
569 #[test]
570 fn test_float_widening() {
571 assert!(is_safe_widening(&DataType::Float16, &DataType::Float32));
572 assert!(is_safe_widening(&DataType::Float16, &DataType::Float64));
573 assert!(is_safe_widening(&DataType::Float32, &DataType::Float64));
574 }
575
576 #[test]
577 fn test_int_to_float_widening() {
578 assert!(is_safe_widening(&DataType::Int8, &DataType::Float32));
579 assert!(is_safe_widening(&DataType::Int16, &DataType::Float64));
580 assert!(is_safe_widening(&DataType::Int32, &DataType::Float64));
581 }
582
583 #[test]
584 fn test_string_binary_widening() {
585 assert!(is_safe_widening(&DataType::Utf8, &DataType::LargeUtf8));
586 assert!(is_safe_widening(&DataType::Binary, &DataType::LargeBinary));
587 }
588
589 #[test]
590 fn test_narrowing_not_safe() {
591 assert!(!is_safe_widening(&DataType::Int64, &DataType::Int32));
592 assert!(!is_safe_widening(&DataType::Float64, &DataType::Float32));
593 assert!(!is_safe_widening(&DataType::LargeUtf8, &DataType::Utf8));
594 }
595
596 #[test]
597 fn test_unrelated_types() {
598 assert!(!is_safe_widening(&DataType::Int64, &DataType::Utf8));
599 assert!(!is_safe_widening(&DataType::Boolean, &DataType::Int32));
600 }
601
602 #[test]
605 fn test_diff_identical() {
606 let s = schema(&[("a", DataType::Int64, false)]);
607 let changes = diff_schemas_by_name(&s, &s);
608 assert!(changes.is_empty());
609 }
610
611 #[test]
612 fn test_diff_column_added() {
613 let old = schema(&[("a", DataType::Int64, false)]);
614 let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
615 let changes = diff_schemas_by_name(&old, &new);
616 assert_eq!(changes.len(), 1);
617 assert!(matches!(&changes[0], SchemaChange::ColumnAdded { name, .. } if name == "b"));
618 }
619
620 #[test]
621 fn test_diff_column_removed() {
622 let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
623 let new = schema(&[("a", DataType::Int64, false)]);
624 let changes = diff_schemas_by_name(&old, &new);
625 assert_eq!(changes.len(), 1);
626 assert!(matches!(&changes[0], SchemaChange::ColumnRemoved { name } if name == "b"));
627 }
628
629 #[test]
630 fn test_diff_type_changed() {
631 let old = schema(&[("a", DataType::Int32, false)]);
632 let new = schema(&[("a", DataType::Int64, false)]);
633 let changes = diff_schemas_by_name(&old, &new);
634 assert_eq!(changes.len(), 1);
635 assert!(matches!(
636 &changes[0],
637 SchemaChange::TypeChanged {
638 name,
639 old_type: DataType::Int32,
640 new_type: DataType::Int64,
641 } if name == "a"
642 ));
643 }
644
645 #[test]
646 fn test_diff_nullability_changed() {
647 let old = schema(&[("a", DataType::Int64, false)]);
648 let new = schema(&[("a", DataType::Int64, true)]);
649 let changes = diff_schemas_by_name(&old, &new);
650 assert_eq!(changes.len(), 1);
651 assert!(matches!(
652 &changes[0],
653 SchemaChange::NullabilityChanged {
654 name,
655 was_nullable: false,
656 now_nullable: true,
657 } if name == "a"
658 ));
659 }
660
661 #[test]
662 fn test_diff_multiple_changes() {
663 let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
664 let new = schema(&[
665 ("a", DataType::Int64, true), ("c", DataType::Float64, false), ]);
668 let changes = diff_schemas_by_name(&old, &new);
669 assert_eq!(changes.len(), 3);
671 }
672
673 #[test]
676 fn test_evaluate_no_changes() {
677 let verdict = evaluate_changes(&[], CompatibilityMode::Full);
678 assert_eq!(verdict, EvolutionVerdict::Compatible);
679 }
680
681 #[test]
682 fn test_evaluate_none_mode_allows_all() {
683 let changes = vec![
684 SchemaChange::ColumnRemoved { name: "x".into() },
685 SchemaChange::TypeChanged {
686 name: "y".into(),
687 old_type: DataType::Int64,
688 new_type: DataType::Utf8, },
690 ];
691 let verdict = evaluate_changes(&changes, CompatibilityMode::None);
692 assert_eq!(verdict, EvolutionVerdict::Compatible);
693 }
694
695 #[test]
696 fn test_evaluate_backward_add_nullable_ok() {
697 let changes = vec![SchemaChange::ColumnAdded {
698 name: "email".into(),
699 data_type: DataType::Utf8,
700 nullable: true,
701 }];
702 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
703 assert_eq!(verdict, EvolutionVerdict::Compatible);
704 }
705
706 #[test]
707 fn test_evaluate_backward_add_non_nullable_rejected() {
708 let changes = vec![SchemaChange::ColumnAdded {
709 name: "email".into(),
710 data_type: DataType::Utf8,
711 nullable: false,
712 }];
713 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
714 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
715 }
716
717 #[test]
718 fn test_evaluate_forward_drop_rejected() {
719 let changes = vec![SchemaChange::ColumnRemoved {
720 name: "legacy".into(),
721 }];
722 let verdict = evaluate_changes(&changes, CompatibilityMode::Forward);
723 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
724 }
725
726 #[test]
727 fn test_evaluate_backward_drop_ok() {
728 let changes = vec![SchemaChange::ColumnRemoved {
729 name: "legacy".into(),
730 }];
731 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
732 assert_eq!(verdict, EvolutionVerdict::RequiresMigration);
733 }
734
735 #[test]
736 fn test_evaluate_backward_widening_ok() {
737 let changes = vec![SchemaChange::TypeChanged {
738 name: "count".into(),
739 old_type: DataType::Int32,
740 new_type: DataType::Int64,
741 }];
742 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
743 assert_eq!(verdict, EvolutionVerdict::Compatible);
744 }
745
746 #[test]
747 fn test_evaluate_full_widening_rejected() {
748 let changes = vec![SchemaChange::TypeChanged {
749 name: "count".into(),
750 old_type: DataType::Int32,
751 new_type: DataType::Int64,
752 }];
753 let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
754 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
755 }
756
757 #[test]
758 fn test_evaluate_unrelated_type_change_rejected() {
759 let changes = vec![SchemaChange::TypeChanged {
760 name: "val".into(),
761 old_type: DataType::Int64,
762 new_type: DataType::Utf8,
763 }];
764 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
765 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
766 }
767
768 #[test]
769 fn test_evaluate_make_nullable_ok() {
770 let changes = vec![SchemaChange::NullabilityChanged {
771 name: "field".into(),
772 was_nullable: false,
773 now_nullable: true,
774 }];
775 let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
776 assert_eq!(verdict, EvolutionVerdict::Compatible);
777 }
778
779 #[test]
780 fn test_evaluate_make_non_nullable_backward_rejected() {
781 let changes = vec![SchemaChange::NullabilityChanged {
782 name: "field".into(),
783 was_nullable: true,
784 now_nullable: false,
785 }];
786 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
787 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
788 }
789
790 #[test]
791 fn test_evaluate_rename_rejected() {
792 let changes = vec![SchemaChange::ColumnRenamed {
793 old_name: "fname".into(),
794 new_name: "first_name".into(),
795 }];
796 let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
797 assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
798 }
799
800 #[test]
803 fn test_apply_add_column() {
804 let old = schema(&[("a", DataType::Int64, false)]);
805 let changes = vec![SchemaChange::ColumnAdded {
806 name: "b".into(),
807 data_type: DataType::Utf8,
808 nullable: true,
809 }];
810 let proj = apply_changes(&old, &changes).unwrap();
811 assert_eq!(proj.target_schema.fields().len(), 2);
812 assert_eq!(proj.mappings, vec![Some(0), None]);
813 }
814
815 #[test]
816 fn test_apply_remove_column() {
817 let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
818 let changes = vec![SchemaChange::ColumnRemoved { name: "b".into() }];
819 let proj = apply_changes(&old, &changes).unwrap();
820 assert_eq!(proj.target_schema.fields().len(), 1);
821 assert_eq!(proj.target_schema.field(0).name(), "a");
822 assert_eq!(proj.mappings, vec![Some(0)]);
823 }
824
825 #[test]
826 fn test_apply_widen_type() {
827 let old = schema(&[("val", DataType::Int32, false)]);
828 let changes = vec![SchemaChange::TypeChanged {
829 name: "val".into(),
830 old_type: DataType::Int32,
831 new_type: DataType::Int64,
832 }];
833 let proj = apply_changes(&old, &changes).unwrap();
834 assert_eq!(proj.target_schema.field(0).data_type(), &DataType::Int64);
835 assert_eq!(proj.mappings, vec![Some(0)]);
836 }
837
838 #[test]
839 fn test_apply_change_nullability() {
840 let old = schema(&[("val", DataType::Int64, false)]);
841 let changes = vec![SchemaChange::NullabilityChanged {
842 name: "val".into(),
843 was_nullable: false,
844 now_nullable: true,
845 }];
846 let proj = apply_changes(&old, &changes).unwrap();
847 assert!(proj.target_schema.field(0).is_nullable());
848 }
849
850 #[test]
851 fn test_apply_rename() {
852 let old = schema(&[("fname", DataType::Utf8, false)]);
853 let changes = vec![SchemaChange::ColumnRenamed {
854 old_name: "fname".into(),
855 new_name: "first_name".into(),
856 }];
857 let proj = apply_changes(&old, &changes).unwrap();
858 assert_eq!(proj.target_schema.field(0).name(), "first_name");
859 assert_eq!(proj.mappings, vec![Some(0)]);
860 }
861
862 #[test]
863 fn test_apply_multi_change() {
864 let old = schema(&[
865 ("a", DataType::Int64, false),
866 ("b", DataType::Int32, true),
867 ("c", DataType::Utf8, false),
868 ]);
869 let changes = vec![
870 SchemaChange::ColumnRemoved { name: "c".into() },
871 SchemaChange::ColumnAdded {
872 name: "d".into(),
873 data_type: DataType::Float64,
874 nullable: true,
875 },
876 SchemaChange::TypeChanged {
877 name: "b".into(),
878 old_type: DataType::Int32,
879 new_type: DataType::Int64,
880 },
881 ];
882 let proj = apply_changes(&old, &changes).unwrap();
883 assert_eq!(proj.target_schema.fields().len(), 3); assert_eq!(proj.target_schema.field(0).name(), "a");
885 assert_eq!(proj.target_schema.field(1).name(), "b");
886 assert_eq!(proj.target_schema.field(1).data_type(), &DataType::Int64);
887 assert_eq!(proj.target_schema.field(2).name(), "d");
888 assert_eq!(proj.mappings, vec![Some(0), Some(1), None]);
890 }
891
892 #[test]
895 fn test_history_record_and_query() {
896 let mut history = SchemaHistory::new();
897 let s1 = schema(&[("a", DataType::Int64, false)]);
898 let s2 = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
899
900 let v1 = history.record("test_source", s1, vec![], EvolutionTrigger::Ddl);
901 assert_eq!(v1, 1);
902
903 let v2 = history.record(
904 "test_source",
905 s2,
906 vec![SchemaChange::ColumnAdded {
907 name: "b".into(),
908 data_type: DataType::Utf8,
909 nullable: true,
910 }],
911 EvolutionTrigger::Ddl,
912 );
913 assert_eq!(v2, 2);
914
915 assert_eq!(history.versions("test_source").len(), 2);
916 assert_eq!(history.latest_version("test_source"), Some(2));
917 assert_eq!(history.latest_version("unknown"), None);
918 }
919
920 #[test]
923 fn test_engine_no_change() {
924 let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
925 let evolver = SchemaEvolution::new(CompatibilityMode::Backward);
926 let s = schema(&[("a", DataType::Int64, false)]);
927
928 let result = engine
929 .evolve("src", &evolver, &s, &s, EvolutionTrigger::Ddl)
930 .unwrap();
931
932 assert!(matches!(result, EvolutionResult::NoChange));
933 }
934
935 #[test]
936 fn test_engine_add_nullable_column() {
937 let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
938 let evolver = SchemaEvolution::new(CompatibilityMode::Backward);
939 let old = schema(&[("a", DataType::Int64, false)]);
940 let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
941
942 let result = engine
943 .evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl)
944 .unwrap();
945
946 match result {
947 EvolutionResult::Applied {
948 new_schema,
949 version,
950 changes,
951 ..
952 } => {
953 assert_eq!(new_schema.fields().len(), 2);
954 assert_eq!(version, 1);
955 assert_eq!(changes.len(), 1);
956 }
957 EvolutionResult::NoChange => panic!("expected Applied"),
958 }
959 }
960
961 #[test]
962 fn test_engine_incompatible_rejected() {
963 let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Full);
964 let evolver = SchemaEvolution::new(CompatibilityMode::Full);
965 let old = schema(&[("a", DataType::Int32, false)]);
966 let new = schema(&[("a", DataType::Int64, false)]); let result = engine.evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl);
969 assert!(result.is_err());
970 }
971
972 #[test]
975 fn test_default_evolver_diff_and_apply() {
976 let evolver = SchemaEvolution::default();
977 let old = schema(&[("id", DataType::Int64, false)]);
978 let new = schema(&[
979 ("id", DataType::Int64, false),
980 ("email", DataType::Utf8, true),
981 ]);
982
983 let changes = evolver.diff_schemas(&old, &new);
984 assert_eq!(changes.len(), 1);
985
986 let verdict = evolver.evaluate_evolution(&changes);
987 assert_eq!(verdict, EvolutionVerdict::Compatible);
988
989 let proj = evolver.apply_evolution(&old, &changes).unwrap();
990 assert_eq!(proj.target_schema.fields().len(), 2);
991 assert_eq!(proj.mappings, vec![Some(0), None]);
992 }
993}