Skip to main content

laminar_connectors/schema/
evolution.rs

1//! Schema evolution engine (F-SCHEMA-009).
2//!
3//! Provides:
4//!
5//! - [`DefaultSchemaEvolver`] — a general-purpose implementation of
6//!   [`SchemaEvolvable`] that performs name-based schema diffing,
7//!   compatibility evaluation, and schema merging
8//! - [`SchemaEvolutionEngine`] — orchestrates the full evolution flow:
9//!   detect → diff → evaluate → apply → record
10//! - [`SchemaHistory`] — tracks per-source schema version history
11//! - [`is_safe_widening`] — determines whether a type can be safely widened
12#![allow(clippy::disallowed_types)] // cold path: schema management
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::SystemTime;
17
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19
20use super::error::{SchemaError, SchemaResult};
21use super::traits::{
22    ColumnProjection, CompatibilityMode, EvolutionVerdict, SchemaChange, SchemaEvolvable,
23};
24
25// ── Safe widening rules ────────────────────────────────────────────
26
27/// Returns `true` if `from` can be safely widened to `to` without data loss.
28#[must_use]
29pub fn is_safe_widening(from: &DataType, to: &DataType) -> bool {
30    matches!(
31        (from, to),
32        (
33            DataType::Int8,
34            DataType::Int16 | DataType::Int32 | DataType::Int64
35        ) | (DataType::Int16, DataType::Int32 | DataType::Int64)
36            | (DataType::Int32, DataType::Int64 | DataType::Float64)
37            | (
38                DataType::UInt8,
39                DataType::UInt16 | DataType::UInt32 | DataType::UInt64
40            )
41            | (DataType::UInt16, DataType::UInt32 | DataType::UInt64)
42            | (DataType::UInt32, DataType::UInt64)
43            | (
44                DataType::Float16 | DataType::Int8 | DataType::Int16,
45                DataType::Float32 | DataType::Float64
46            )
47            | (DataType::Float32, DataType::Float64)
48            | (DataType::Utf8, DataType::LargeUtf8)
49            | (DataType::Binary, DataType::LargeBinary)
50    )
51}
52
53// ── DefaultSchemaEvolver ───────────────────────────────────────────
54
55/// A general-purpose implementation of [`SchemaEvolvable`] that uses
56/// name-based schema matching (suitable for JSON, CSV, Avro, etc.).
57///
58/// For field-ID-based sources (Iceberg, Parquet), a specialised
59/// implementation should be used instead.
60#[derive(Debug, Clone)]
61pub struct DefaultSchemaEvolver {
62    /// The compatibility mode for evaluating changes.
63    pub compatibility: CompatibilityMode,
64}
65
66impl DefaultSchemaEvolver {
67    /// Creates a new evolver with the given compatibility mode.
68    #[must_use]
69    pub fn new(compatibility: CompatibilityMode) -> Self {
70        Self { compatibility }
71    }
72}
73
74impl Default for DefaultSchemaEvolver {
75    fn default() -> Self {
76        Self {
77            compatibility: CompatibilityMode::None,
78        }
79    }
80}
81
82impl SchemaEvolvable for DefaultSchemaEvolver {
83    fn diff_schemas(&self, old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
84        diff_schemas_by_name(old, new)
85    }
86
87    fn evaluate_evolution(&self, changes: &[SchemaChange]) -> EvolutionVerdict {
88        evaluate_changes(changes, self.compatibility)
89    }
90
91    fn apply_evolution(
92        &self,
93        old: &SchemaRef,
94        changes: &[SchemaChange],
95    ) -> SchemaResult<ColumnProjection> {
96        apply_changes(old, changes)
97    }
98}
99
100// ── Core diffing algorithm ─────────────────────────────────────────
101
102/// Computes the diff between two Arrow schemas using name-based matching.
103///
104/// Returns a list of [`SchemaChange`] entries describing all differences.
105/// Order: column removals, column additions, type changes, nullability changes.
106#[must_use]
107pub fn diff_schemas_by_name(old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
108    let mut changes = Vec::new();
109
110    let old_fields: HashMap<&str, &Field> = old
111        .fields()
112        .iter()
113        .map(|f| (f.name().as_str(), f.as_ref()))
114        .collect();
115
116    let new_fields: HashMap<&str, &Field> = new
117        .fields()
118        .iter()
119        .map(|f| (f.name().as_str(), f.as_ref()))
120        .collect();
121
122    // Detect removed columns (in old but not in new).
123    for field in old.fields() {
124        if !new_fields.contains_key(field.name().as_str()) {
125            changes.push(SchemaChange::ColumnRemoved {
126                name: field.name().clone(),
127            });
128        }
129    }
130
131    // Detect added columns (in new but not in old).
132    for field in new.fields() {
133        if !old_fields.contains_key(field.name().as_str()) {
134            changes.push(SchemaChange::ColumnAdded {
135                name: field.name().clone(),
136                data_type: field.data_type().clone(),
137                nullable: field.is_nullable(),
138            });
139        }
140    }
141
142    // Detect type and nullability changes.
143    for field in new.fields() {
144        if let Some(old_field) = old_fields.get(field.name().as_str()) {
145            if old_field.data_type() != field.data_type() {
146                changes.push(SchemaChange::TypeChanged {
147                    name: field.name().clone(),
148                    old_type: old_field.data_type().clone(),
149                    new_type: field.data_type().clone(),
150                });
151            }
152            if old_field.is_nullable() != field.is_nullable() {
153                changes.push(SchemaChange::NullabilityChanged {
154                    name: field.name().clone(),
155                    was_nullable: old_field.is_nullable(),
156                    now_nullable: field.is_nullable(),
157                });
158            }
159        }
160    }
161
162    changes
163}
164
165// ── Compatibility evaluation ───────────────────────────────────────
166
167/// Evaluates a set of schema changes against a compatibility mode.
168#[must_use]
169pub fn evaluate_changes(changes: &[SchemaChange], mode: CompatibilityMode) -> EvolutionVerdict {
170    if changes.is_empty() {
171        return EvolutionVerdict::Compatible;
172    }
173
174    // Mode::None — everything is allowed.
175    if mode == CompatibilityMode::None {
176        return EvolutionVerdict::Compatible;
177    }
178
179    let requires_backward = matches!(
180        mode,
181        CompatibilityMode::Backward
182            | CompatibilityMode::Full
183            | CompatibilityMode::BackwardTransitive
184            | CompatibilityMode::FullTransitive
185    );
186    let requires_forward = matches!(
187        mode,
188        CompatibilityMode::Forward
189            | CompatibilityMode::Full
190            | CompatibilityMode::ForwardTransitive
191            | CompatibilityMode::FullTransitive
192    );
193
194    let mut needs_migration = false;
195
196    for change in changes {
197        match change {
198            SchemaChange::ColumnAdded { nullable, .. } => {
199                // Backward: new schema must read old data.
200                // Adding a nullable column is backward-compatible (old data has null).
201                // Adding a non-nullable column breaks backward compatibility
202                // (old data has no value for this column).
203                if requires_backward && !nullable {
204                    return EvolutionVerdict::Incompatible(format!(
205                        "adding non-nullable column '{}' is not backward-compatible",
206                        change_name(change)
207                    ));
208                }
209            }
210            SchemaChange::ColumnRemoved { name } => {
211                // Backward: removing a column breaks backward compatibility
212                // (old data has the column, new reader doesn't expect it).
213                // Actually, backward means new reader reads old data — if the new
214                // schema drops a column, the new reader just ignores it. That's fine.
215                // Forward: old reader reads new data — new data is missing a column
216                // that old reader expects. That breaks forward compatibility.
217                if requires_forward {
218                    return EvolutionVerdict::Incompatible(format!(
219                        "removing column '{name}' is not forward-compatible"
220                    ));
221                }
222                needs_migration = true;
223            }
224            SchemaChange::TypeChanged {
225                name,
226                old_type,
227                new_type,
228            } => {
229                if is_safe_widening(old_type, new_type) {
230                    // Widening is backward-compatible (new reader can handle old
231                    // narrower data). But it's NOT forward-compatible (old reader
232                    // can't handle wider data).
233                    if requires_forward {
234                        return EvolutionVerdict::Incompatible(format!(
235                            "widening column '{name}' from {old_type:?} to {new_type:?} \
236                             is not forward-compatible"
237                        ));
238                    }
239                } else if is_safe_widening(new_type, old_type) {
240                    // Narrowing — forward-compatible but not backward.
241                    if requires_backward {
242                        return EvolutionVerdict::Incompatible(format!(
243                            "narrowing column '{name}' from {old_type:?} to {new_type:?} \
244                             is not backward-compatible"
245                        ));
246                    }
247                    needs_migration = true;
248                } else {
249                    // Unrelated types — always incompatible.
250                    return EvolutionVerdict::Incompatible(format!(
251                        "changing column '{name}' from {old_type:?} to {new_type:?} \
252                         is incompatible"
253                    ));
254                }
255            }
256            SchemaChange::NullabilityChanged {
257                name,
258                was_nullable,
259                now_nullable,
260            } => {
261                if *was_nullable && !now_nullable {
262                    // Making a column non-nullable: new reader rejects old nulls.
263                    if requires_backward {
264                        return EvolutionVerdict::Incompatible(format!(
265                            "making column '{name}' non-nullable is not backward-compatible"
266                        ));
267                    }
268                    needs_migration = true;
269                }
270                // Making a column nullable is always safe.
271            }
272            SchemaChange::ColumnRenamed { old_name, .. } => {
273                // Renames without field-IDs are always incompatible
274                // (name-based matching treats it as drop + add).
275                return EvolutionVerdict::Incompatible(format!(
276                    "renaming column '{old_name}' is not supported without field IDs"
277                ));
278            }
279        }
280    }
281
282    if needs_migration {
283        EvolutionVerdict::RequiresMigration
284    } else {
285        EvolutionVerdict::Compatible
286    }
287}
288
289fn change_name(change: &SchemaChange) -> &str {
290    match change {
291        SchemaChange::ColumnAdded { name, .. }
292        | SchemaChange::ColumnRemoved { name }
293        | SchemaChange::TypeChanged { name, .. }
294        | SchemaChange::NullabilityChanged { name, .. } => name,
295        SchemaChange::ColumnRenamed { old_name, .. } => old_name,
296    }
297}
298
299// ── Schema application ─────────────────────────────────────────────
300
301/// Applies a set of schema changes to produce a new schema and a column
302/// projection mapping old columns to new positions.
303///
304/// # Errors
305///
306/// Returns [`SchemaError`] if the changes reference columns that don't exist.
307pub fn apply_changes(old: &SchemaRef, changes: &[SchemaChange]) -> SchemaResult<ColumnProjection> {
308    let mut fields: Vec<Field> = old.fields().iter().map(|f| f.as_ref().clone()).collect();
309    let mut old_index_map: Vec<Option<usize>> = (0..fields.len()).map(Some).collect();
310
311    // Collect indices of columns to remove.
312    let mut remove_indices = Vec::new();
313
314    for change in changes {
315        match change {
316            SchemaChange::ColumnAdded {
317                name,
318                data_type,
319                nullable,
320            } => {
321                fields.push(Field::new(name, data_type.clone(), *nullable));
322                old_index_map.push(None); // New column — no old source.
323            }
324            SchemaChange::ColumnRemoved { name } => {
325                if let Some(idx) = fields.iter().position(|f| f.name() == name) {
326                    remove_indices.push(idx);
327                }
328            }
329            SchemaChange::TypeChanged { name, new_type, .. } => {
330                if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
331                    *field = Field::new(field.name(), new_type.clone(), field.is_nullable());
332                }
333            }
334            SchemaChange::NullabilityChanged {
335                name, now_nullable, ..
336            } => {
337                if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
338                    *field = Field::new(field.name(), field.data_type().clone(), *now_nullable);
339                }
340            }
341            SchemaChange::ColumnRenamed {
342                old_name, new_name, ..
343            } => {
344                if let Some(field) = fields.iter_mut().find(|f| f.name() == old_name) {
345                    *field = Field::new(new_name, field.data_type().clone(), field.is_nullable());
346                }
347            }
348        }
349    }
350
351    // Remove columns in reverse order to preserve indices.
352    remove_indices.sort_unstable();
353    remove_indices.dedup();
354    for &idx in remove_indices.iter().rev() {
355        fields.remove(idx);
356        old_index_map.remove(idx);
357    }
358
359    let new_schema = Arc::new(Schema::new(fields));
360
361    Ok(ColumnProjection {
362        mappings: old_index_map,
363        target_schema: new_schema,
364    })
365}
366
367// ── Schema history ─────────────────────────────────────────────────
368
369/// How a schema evolution was triggered.
370#[derive(Debug, Clone, PartialEq, Eq)]
371pub enum EvolutionTrigger {
372    /// User issued ALTER SOURCE DDL.
373    Ddl,
374    /// Schema registry detected a new schema version.
375    Registry {
376        /// The schema ID that triggered the change.
377        schema_id: u32,
378    },
379    /// User issued REFRESH SCHEMA.
380    ManualRefresh,
381}
382
383/// A record in the schema history.
384#[derive(Debug, Clone)]
385pub struct SchemaHistoryEntry {
386    /// Source name.
387    pub source_name: String,
388    /// Schema version (monotonically increasing per source).
389    pub version: u64,
390    /// The Arrow schema at this version.
391    pub schema: SchemaRef,
392    /// Changes from the previous version.
393    pub changes: Vec<SchemaChange>,
394    /// When this version was applied.
395    pub applied_at: SystemTime,
396    /// How the evolution was triggered.
397    pub trigger: EvolutionTrigger,
398}
399
400/// Tracks schema version history for all sources.
401#[derive(Debug, Default)]
402pub struct SchemaHistory {
403    entries: HashMap<String, Vec<SchemaHistoryEntry>>,
404}
405
406impl SchemaHistory {
407    /// Creates a new empty history.
408    #[must_use]
409    pub fn new() -> Self {
410        Self::default()
411    }
412
413    /// Records a new schema version for a source. Returns the version number.
414    pub fn record(
415        &mut self,
416        source_name: &str,
417        schema: SchemaRef,
418        changes: Vec<SchemaChange>,
419        trigger: EvolutionTrigger,
420    ) -> u64 {
421        let entries = self.entries.entry(source_name.to_string()).or_default();
422        let version = entries.last().map_or(1, |e| e.version + 1);
423        entries.push(SchemaHistoryEntry {
424            source_name: source_name.to_string(),
425            version,
426            schema,
427            changes,
428            applied_at: SystemTime::now(),
429            trigger,
430        });
431        version
432    }
433
434    /// Returns all versions for a source.
435    #[must_use]
436    pub fn versions(&self, source_name: &str) -> &[SchemaHistoryEntry] {
437        self.entries.get(source_name).map_or(&[], |v| v.as_slice())
438    }
439
440    /// Returns the latest version number for a source.
441    #[must_use]
442    pub fn latest_version(&self, source_name: &str) -> Option<u64> {
443        self.entries.get(source_name)?.last().map(|e| e.version)
444    }
445}
446
447// ── Schema Evolution Engine ────────────────────────────────────────
448
449/// Result of an evolution attempt.
450#[derive(Debug)]
451pub enum EvolutionResult {
452    /// No schema change detected.
453    NoChange,
454    /// Evolution applied successfully.
455    Applied {
456        /// The new schema.
457        new_schema: SchemaRef,
458        /// Column projection from old → new.
459        projection: ColumnProjection,
460        /// The new version number.
461        version: u64,
462        /// The individual changes.
463        changes: Vec<SchemaChange>,
464    },
465}
466
467/// Orchestrates the full schema evolution flow.
468///
469/// 1. Diff the current and proposed schemas
470/// 2. Evaluate compatibility
471/// 3. Apply changes and produce a new schema + projection
472/// 4. Record in schema history
473#[derive(Debug)]
474pub struct SchemaEvolutionEngine {
475    /// Per-source version history.
476    pub history: SchemaHistory,
477    /// Default compatibility mode.
478    pub default_compatibility: CompatibilityMode,
479}
480
481impl SchemaEvolutionEngine {
482    /// Creates a new engine with the given default compatibility mode.
483    #[must_use]
484    pub fn new(default_compatibility: CompatibilityMode) -> Self {
485        Self {
486            history: SchemaHistory::new(),
487            default_compatibility,
488        }
489    }
490
491    /// Executes the full evolution flow for a source.
492    ///
493    /// # Errors
494    ///
495    /// Returns [`SchemaError::EvolutionRejected`] if the changes are incompatible.
496    pub fn evolve(
497        &mut self,
498        source_name: &str,
499        evolvable: &dyn SchemaEvolvable,
500        current: &SchemaRef,
501        proposed: &SchemaRef,
502        trigger: EvolutionTrigger,
503    ) -> SchemaResult<EvolutionResult> {
504        let changes = evolvable.diff_schemas(current, proposed);
505
506        if changes.is_empty() {
507            return Ok(EvolutionResult::NoChange);
508        }
509
510        let verdict = evolvable.evaluate_evolution(&changes);
511
512        match verdict {
513            EvolutionVerdict::Compatible | EvolutionVerdict::RequiresMigration => {
514                let projection = evolvable.apply_evolution(current, &changes)?;
515                let version = self.history.record(
516                    source_name,
517                    projection.target_schema.clone(),
518                    changes.clone(),
519                    trigger,
520                );
521                Ok(EvolutionResult::Applied {
522                    new_schema: projection.target_schema.clone(),
523                    projection,
524                    version,
525                    changes,
526                })
527            }
528            EvolutionVerdict::Incompatible(reason) => Err(SchemaError::EvolutionRejected(reason)),
529        }
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536
537    fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
538        Arc::new(Schema::new(
539            fields
540                .iter()
541                .map(|(name, dt, nullable)| Field::new(*name, dt.clone(), *nullable))
542                .collect::<Vec<_>>(),
543        ))
544    }
545
546    // ── is_safe_widening tests ────────────────────────────────
547
548    #[test]
549    fn test_int_widening() {
550        assert!(is_safe_widening(&DataType::Int8, &DataType::Int16));
551        assert!(is_safe_widening(&DataType::Int8, &DataType::Int32));
552        assert!(is_safe_widening(&DataType::Int8, &DataType::Int64));
553        assert!(is_safe_widening(&DataType::Int16, &DataType::Int32));
554        assert!(is_safe_widening(&DataType::Int16, &DataType::Int64));
555        assert!(is_safe_widening(&DataType::Int32, &DataType::Int64));
556    }
557
558    #[test]
559    fn test_uint_widening() {
560        assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt16));
561        assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt32));
562        assert!(is_safe_widening(&DataType::UInt32, &DataType::UInt64));
563    }
564
565    #[test]
566    fn test_float_widening() {
567        assert!(is_safe_widening(&DataType::Float16, &DataType::Float32));
568        assert!(is_safe_widening(&DataType::Float16, &DataType::Float64));
569        assert!(is_safe_widening(&DataType::Float32, &DataType::Float64));
570    }
571
572    #[test]
573    fn test_int_to_float_widening() {
574        assert!(is_safe_widening(&DataType::Int8, &DataType::Float32));
575        assert!(is_safe_widening(&DataType::Int16, &DataType::Float64));
576        assert!(is_safe_widening(&DataType::Int32, &DataType::Float64));
577    }
578
579    #[test]
580    fn test_string_binary_widening() {
581        assert!(is_safe_widening(&DataType::Utf8, &DataType::LargeUtf8));
582        assert!(is_safe_widening(&DataType::Binary, &DataType::LargeBinary));
583    }
584
585    #[test]
586    fn test_narrowing_not_safe() {
587        assert!(!is_safe_widening(&DataType::Int64, &DataType::Int32));
588        assert!(!is_safe_widening(&DataType::Float64, &DataType::Float32));
589        assert!(!is_safe_widening(&DataType::LargeUtf8, &DataType::Utf8));
590    }
591
592    #[test]
593    fn test_unrelated_types() {
594        assert!(!is_safe_widening(&DataType::Int64, &DataType::Utf8));
595        assert!(!is_safe_widening(&DataType::Boolean, &DataType::Int32));
596    }
597
598    // ── diff_schemas tests ────────────────────────────────────
599
600    #[test]
601    fn test_diff_identical() {
602        let s = schema(&[("a", DataType::Int64, false)]);
603        let changes = diff_schemas_by_name(&s, &s);
604        assert!(changes.is_empty());
605    }
606
607    #[test]
608    fn test_diff_column_added() {
609        let old = schema(&[("a", DataType::Int64, false)]);
610        let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
611        let changes = diff_schemas_by_name(&old, &new);
612        assert_eq!(changes.len(), 1);
613        assert!(matches!(&changes[0], SchemaChange::ColumnAdded { name, .. } if name == "b"));
614    }
615
616    #[test]
617    fn test_diff_column_removed() {
618        let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
619        let new = schema(&[("a", DataType::Int64, false)]);
620        let changes = diff_schemas_by_name(&old, &new);
621        assert_eq!(changes.len(), 1);
622        assert!(matches!(&changes[0], SchemaChange::ColumnRemoved { name } if name == "b"));
623    }
624
625    #[test]
626    fn test_diff_type_changed() {
627        let old = schema(&[("a", DataType::Int32, false)]);
628        let new = schema(&[("a", DataType::Int64, false)]);
629        let changes = diff_schemas_by_name(&old, &new);
630        assert_eq!(changes.len(), 1);
631        assert!(matches!(
632            &changes[0],
633            SchemaChange::TypeChanged {
634                name,
635                old_type: DataType::Int32,
636                new_type: DataType::Int64,
637            } if name == "a"
638        ));
639    }
640
641    #[test]
642    fn test_diff_nullability_changed() {
643        let old = schema(&[("a", DataType::Int64, false)]);
644        let new = schema(&[("a", DataType::Int64, true)]);
645        let changes = diff_schemas_by_name(&old, &new);
646        assert_eq!(changes.len(), 1);
647        assert!(matches!(
648            &changes[0],
649            SchemaChange::NullabilityChanged {
650                name,
651                was_nullable: false,
652                now_nullable: true,
653            } if name == "a"
654        ));
655    }
656
657    #[test]
658    fn test_diff_multiple_changes() {
659        let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
660        let new = schema(&[
661            ("a", DataType::Int64, true),    // nullability changed
662            ("c", DataType::Float64, false), // b removed, c added
663        ]);
664        let changes = diff_schemas_by_name(&old, &new);
665        // Should have: ColumnRemoved(b), ColumnAdded(c), NullabilityChanged(a)
666        assert_eq!(changes.len(), 3);
667    }
668
669    // ── evaluate_changes tests ────────────────────────────────
670
671    #[test]
672    fn test_evaluate_no_changes() {
673        let verdict = evaluate_changes(&[], CompatibilityMode::Full);
674        assert_eq!(verdict, EvolutionVerdict::Compatible);
675    }
676
677    #[test]
678    fn test_evaluate_none_mode_allows_all() {
679        let changes = vec![
680            SchemaChange::ColumnRemoved { name: "x".into() },
681            SchemaChange::TypeChanged {
682                name: "y".into(),
683                old_type: DataType::Int64,
684                new_type: DataType::Utf8, // unrelated type change
685            },
686        ];
687        let verdict = evaluate_changes(&changes, CompatibilityMode::None);
688        assert_eq!(verdict, EvolutionVerdict::Compatible);
689    }
690
691    #[test]
692    fn test_evaluate_backward_add_nullable_ok() {
693        let changes = vec![SchemaChange::ColumnAdded {
694            name: "email".into(),
695            data_type: DataType::Utf8,
696            nullable: true,
697        }];
698        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
699        assert_eq!(verdict, EvolutionVerdict::Compatible);
700    }
701
702    #[test]
703    fn test_evaluate_backward_add_non_nullable_rejected() {
704        let changes = vec![SchemaChange::ColumnAdded {
705            name: "email".into(),
706            data_type: DataType::Utf8,
707            nullable: false,
708        }];
709        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
710        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
711    }
712
713    #[test]
714    fn test_evaluate_forward_drop_rejected() {
715        let changes = vec![SchemaChange::ColumnRemoved {
716            name: "legacy".into(),
717        }];
718        let verdict = evaluate_changes(&changes, CompatibilityMode::Forward);
719        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
720    }
721
722    #[test]
723    fn test_evaluate_backward_drop_ok() {
724        let changes = vec![SchemaChange::ColumnRemoved {
725            name: "legacy".into(),
726        }];
727        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
728        assert_eq!(verdict, EvolutionVerdict::RequiresMigration);
729    }
730
731    #[test]
732    fn test_evaluate_backward_widening_ok() {
733        let changes = vec![SchemaChange::TypeChanged {
734            name: "count".into(),
735            old_type: DataType::Int32,
736            new_type: DataType::Int64,
737        }];
738        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
739        assert_eq!(verdict, EvolutionVerdict::Compatible);
740    }
741
742    #[test]
743    fn test_evaluate_full_widening_rejected() {
744        let changes = vec![SchemaChange::TypeChanged {
745            name: "count".into(),
746            old_type: DataType::Int32,
747            new_type: DataType::Int64,
748        }];
749        let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
750        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
751    }
752
753    #[test]
754    fn test_evaluate_unrelated_type_change_rejected() {
755        let changes = vec![SchemaChange::TypeChanged {
756            name: "val".into(),
757            old_type: DataType::Int64,
758            new_type: DataType::Utf8,
759        }];
760        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
761        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
762    }
763
764    #[test]
765    fn test_evaluate_make_nullable_ok() {
766        let changes = vec![SchemaChange::NullabilityChanged {
767            name: "field".into(),
768            was_nullable: false,
769            now_nullable: true,
770        }];
771        let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
772        assert_eq!(verdict, EvolutionVerdict::Compatible);
773    }
774
775    #[test]
776    fn test_evaluate_make_non_nullable_backward_rejected() {
777        let changes = vec![SchemaChange::NullabilityChanged {
778            name: "field".into(),
779            was_nullable: true,
780            now_nullable: false,
781        }];
782        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
783        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
784    }
785
786    #[test]
787    fn test_evaluate_rename_rejected() {
788        let changes = vec![SchemaChange::ColumnRenamed {
789            old_name: "fname".into(),
790            new_name: "first_name".into(),
791        }];
792        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
793        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
794    }
795
796    // ── apply_changes tests ───────────────────────────────────
797
798    #[test]
799    fn test_apply_add_column() {
800        let old = schema(&[("a", DataType::Int64, false)]);
801        let changes = vec![SchemaChange::ColumnAdded {
802            name: "b".into(),
803            data_type: DataType::Utf8,
804            nullable: true,
805        }];
806        let proj = apply_changes(&old, &changes).unwrap();
807        assert_eq!(proj.target_schema.fields().len(), 2);
808        assert_eq!(proj.mappings, vec![Some(0), None]);
809    }
810
811    #[test]
812    fn test_apply_remove_column() {
813        let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
814        let changes = vec![SchemaChange::ColumnRemoved { name: "b".into() }];
815        let proj = apply_changes(&old, &changes).unwrap();
816        assert_eq!(proj.target_schema.fields().len(), 1);
817        assert_eq!(proj.target_schema.field(0).name(), "a");
818        assert_eq!(proj.mappings, vec![Some(0)]);
819    }
820
821    #[test]
822    fn test_apply_widen_type() {
823        let old = schema(&[("val", DataType::Int32, false)]);
824        let changes = vec![SchemaChange::TypeChanged {
825            name: "val".into(),
826            old_type: DataType::Int32,
827            new_type: DataType::Int64,
828        }];
829        let proj = apply_changes(&old, &changes).unwrap();
830        assert_eq!(proj.target_schema.field(0).data_type(), &DataType::Int64);
831        assert_eq!(proj.mappings, vec![Some(0)]);
832    }
833
834    #[test]
835    fn test_apply_change_nullability() {
836        let old = schema(&[("val", DataType::Int64, false)]);
837        let changes = vec![SchemaChange::NullabilityChanged {
838            name: "val".into(),
839            was_nullable: false,
840            now_nullable: true,
841        }];
842        let proj = apply_changes(&old, &changes).unwrap();
843        assert!(proj.target_schema.field(0).is_nullable());
844    }
845
846    #[test]
847    fn test_apply_rename() {
848        let old = schema(&[("fname", DataType::Utf8, false)]);
849        let changes = vec![SchemaChange::ColumnRenamed {
850            old_name: "fname".into(),
851            new_name: "first_name".into(),
852        }];
853        let proj = apply_changes(&old, &changes).unwrap();
854        assert_eq!(proj.target_schema.field(0).name(), "first_name");
855        assert_eq!(proj.mappings, vec![Some(0)]);
856    }
857
858    #[test]
859    fn test_apply_multi_change() {
860        let old = schema(&[
861            ("a", DataType::Int64, false),
862            ("b", DataType::Int32, true),
863            ("c", DataType::Utf8, false),
864        ]);
865        let changes = vec![
866            SchemaChange::ColumnRemoved { name: "c".into() },
867            SchemaChange::ColumnAdded {
868                name: "d".into(),
869                data_type: DataType::Float64,
870                nullable: true,
871            },
872            SchemaChange::TypeChanged {
873                name: "b".into(),
874                old_type: DataType::Int32,
875                new_type: DataType::Int64,
876            },
877        ];
878        let proj = apply_changes(&old, &changes).unwrap();
879        assert_eq!(proj.target_schema.fields().len(), 3); // a, b (widened), d
880        assert_eq!(proj.target_schema.field(0).name(), "a");
881        assert_eq!(proj.target_schema.field(1).name(), "b");
882        assert_eq!(proj.target_schema.field(1).data_type(), &DataType::Int64);
883        assert_eq!(proj.target_schema.field(2).name(), "d");
884        // a→Some(0), b→Some(1), d→None
885        assert_eq!(proj.mappings, vec![Some(0), Some(1), None]);
886    }
887
888    // ── SchemaHistory tests ───────────────────────────────────
889
890    #[test]
891    fn test_history_record_and_query() {
892        let mut history = SchemaHistory::new();
893        let s1 = schema(&[("a", DataType::Int64, false)]);
894        let s2 = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
895
896        let v1 = history.record("test_source", s1, vec![], EvolutionTrigger::Ddl);
897        assert_eq!(v1, 1);
898
899        let v2 = history.record(
900            "test_source",
901            s2,
902            vec![SchemaChange::ColumnAdded {
903                name: "b".into(),
904                data_type: DataType::Utf8,
905                nullable: true,
906            }],
907            EvolutionTrigger::Ddl,
908        );
909        assert_eq!(v2, 2);
910
911        assert_eq!(history.versions("test_source").len(), 2);
912        assert_eq!(history.latest_version("test_source"), Some(2));
913        assert_eq!(history.latest_version("unknown"), None);
914    }
915
916    // ── SchemaEvolutionEngine tests ───────────────────────────
917
918    #[test]
919    fn test_engine_no_change() {
920        let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
921        let evolver = DefaultSchemaEvolver::new(CompatibilityMode::Backward);
922        let s = schema(&[("a", DataType::Int64, false)]);
923
924        let result = engine
925            .evolve("src", &evolver, &s, &s, EvolutionTrigger::Ddl)
926            .unwrap();
927
928        assert!(matches!(result, EvolutionResult::NoChange));
929    }
930
931    #[test]
932    fn test_engine_add_nullable_column() {
933        let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
934        let evolver = DefaultSchemaEvolver::new(CompatibilityMode::Backward);
935        let old = schema(&[("a", DataType::Int64, false)]);
936        let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
937
938        let result = engine
939            .evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl)
940            .unwrap();
941
942        match result {
943            EvolutionResult::Applied {
944                new_schema,
945                version,
946                changes,
947                ..
948            } => {
949                assert_eq!(new_schema.fields().len(), 2);
950                assert_eq!(version, 1);
951                assert_eq!(changes.len(), 1);
952            }
953            EvolutionResult::NoChange => panic!("expected Applied"),
954        }
955    }
956
957    #[test]
958    fn test_engine_incompatible_rejected() {
959        let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Full);
960        let evolver = DefaultSchemaEvolver::new(CompatibilityMode::Full);
961        let old = schema(&[("a", DataType::Int32, false)]);
962        let new = schema(&[("a", DataType::Int64, false)]); // widening under Full = incompatible
963
964        let result = engine.evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl);
965        assert!(result.is_err());
966    }
967
968    // ── DefaultSchemaEvolver integration ──────────────────────
969
970    #[test]
971    fn test_default_evolver_diff_and_apply() {
972        let evolver = DefaultSchemaEvolver::default();
973        let old = schema(&[("id", DataType::Int64, false)]);
974        let new = schema(&[
975            ("id", DataType::Int64, false),
976            ("email", DataType::Utf8, true),
977        ]);
978
979        let changes = evolver.diff_schemas(&old, &new);
980        assert_eq!(changes.len(), 1);
981
982        let verdict = evolver.evaluate_evolution(&changes);
983        assert_eq!(verdict, EvolutionVerdict::Compatible);
984
985        let proj = evolver.apply_evolution(&old, &changes).unwrap();
986        assert_eq!(proj.target_schema.fields().len(), 2);
987        assert_eq!(proj.mappings, vec![Some(0), None]);
988    }
989}