Skip to main content

laminar_connectors/schema/
evolution.rs

1//! Schema evolution engine (F-SCHEMA-009).
2//!
3//! Provides:
4//!
5//! - [`SchemaEvolution`] — a general-purpose schema evolver that performs
6//!   name-based schema diffing, compatibility evaluation, and schema merging
7//! - [`SchemaEvolutionEngine`] — orchestrates the full evolution flow:
8//!   detect → diff → evaluate → apply → record
9//! - [`SchemaHistory`] — tracks per-source schema version history
10//! - [`is_safe_widening`] — determines whether a type can be safely widened
11#![allow(clippy::disallowed_types)] // cold path: schema management
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::SystemTime;
16
17use arrow_schema::{DataType, Field, Schema, SchemaRef};
18
19use super::error::{SchemaError, SchemaResult};
20use super::traits::{ColumnProjection, CompatibilityMode, EvolutionVerdict, SchemaChange};
21
22// ── Safe widening rules ────────────────────────────────────────────
23
24/// Returns `true` if `from` can be safely widened to `to` without data loss.
25#[must_use]
26pub fn is_safe_widening(from: &DataType, to: &DataType) -> bool {
27    matches!(
28        (from, to),
29        (
30            DataType::Int8,
31            DataType::Int16 | DataType::Int32 | DataType::Int64
32        ) | (DataType::Int16, DataType::Int32 | DataType::Int64)
33            | (DataType::Int32, DataType::Int64 | DataType::Float64)
34            | (
35                DataType::UInt8,
36                DataType::UInt16 | DataType::UInt32 | DataType::UInt64
37            )
38            | (DataType::UInt16, DataType::UInt32 | DataType::UInt64)
39            | (DataType::UInt32, DataType::UInt64)
40            | (
41                DataType::Float16 | DataType::Int8 | DataType::Int16,
42                DataType::Float32 | DataType::Float64
43            )
44            | (DataType::Float32, DataType::Float64)
45            | (DataType::Utf8, DataType::LargeUtf8)
46            | (DataType::Binary, DataType::LargeBinary)
47    )
48}
49
50// ── SchemaEvolution ───────────────────────────────────────────
51
52/// A general-purpose schema evolver that uses name-based schema matching
53/// (suitable for JSON, CSV, Avro, etc.).
54///
55/// For field-ID-based sources (Iceberg, Parquet), a specialised
56/// implementation should be used instead.
57#[derive(Debug, Clone)]
58pub struct SchemaEvolution {
59    /// The compatibility mode for evaluating changes.
60    pub compatibility: CompatibilityMode,
61}
62
63impl SchemaEvolution {
64    /// Creates a new evolver with the given compatibility mode.
65    #[must_use]
66    pub fn new(compatibility: CompatibilityMode) -> Self {
67        Self { compatibility }
68    }
69
70    /// Computes the differences between two schemas.
71    #[must_use]
72    pub fn diff_schemas(&self, old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
73        diff_schemas_by_name(old, new)
74    }
75
76    /// Evaluates whether a set of schema changes is acceptable.
77    #[must_use]
78    pub fn evaluate_evolution(&self, changes: &[SchemaChange]) -> EvolutionVerdict {
79        evaluate_changes(changes, self.compatibility)
80    }
81
82    /// Applies schema changes, returning a column projection.
83    ///
84    /// # Errors
85    ///
86    /// Returns a schema error if the evolution cannot be applied.
87    pub fn apply_evolution(
88        &self,
89        old: &SchemaRef,
90        changes: &[SchemaChange],
91    ) -> SchemaResult<ColumnProjection> {
92        apply_changes(old, changes)
93    }
94}
95
96impl Default for SchemaEvolution {
97    fn default() -> Self {
98        Self {
99            compatibility: CompatibilityMode::None,
100        }
101    }
102}
103
104// ── Core diffing algorithm ─────────────────────────────────────────
105
106/// Computes the diff between two Arrow schemas using name-based matching.
107///
108/// Returns a list of [`SchemaChange`] entries describing all differences.
109/// Order: column removals, column additions, type changes, nullability changes.
110#[must_use]
111pub fn diff_schemas_by_name(old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange> {
112    let mut changes = Vec::new();
113
114    let old_fields: HashMap<&str, &Field> = old
115        .fields()
116        .iter()
117        .map(|f| (f.name().as_str(), f.as_ref()))
118        .collect();
119
120    let new_fields: HashMap<&str, &Field> = new
121        .fields()
122        .iter()
123        .map(|f| (f.name().as_str(), f.as_ref()))
124        .collect();
125
126    // Detect removed columns (in old but not in new).
127    for field in old.fields() {
128        if !new_fields.contains_key(field.name().as_str()) {
129            changes.push(SchemaChange::ColumnRemoved {
130                name: field.name().clone(),
131            });
132        }
133    }
134
135    // Detect added columns (in new but not in old).
136    for field in new.fields() {
137        if !old_fields.contains_key(field.name().as_str()) {
138            changes.push(SchemaChange::ColumnAdded {
139                name: field.name().clone(),
140                data_type: field.data_type().clone(),
141                nullable: field.is_nullable(),
142            });
143        }
144    }
145
146    // Detect type and nullability changes.
147    for field in new.fields() {
148        if let Some(old_field) = old_fields.get(field.name().as_str()) {
149            if old_field.data_type() != field.data_type() {
150                changes.push(SchemaChange::TypeChanged {
151                    name: field.name().clone(),
152                    old_type: old_field.data_type().clone(),
153                    new_type: field.data_type().clone(),
154                });
155            }
156            if old_field.is_nullable() != field.is_nullable() {
157                changes.push(SchemaChange::NullabilityChanged {
158                    name: field.name().clone(),
159                    was_nullable: old_field.is_nullable(),
160                    now_nullable: field.is_nullable(),
161                });
162            }
163        }
164    }
165
166    changes
167}
168
169// ── Compatibility evaluation ───────────────────────────────────────
170
171/// Evaluates a set of schema changes against a compatibility mode.
172#[must_use]
173pub fn evaluate_changes(changes: &[SchemaChange], mode: CompatibilityMode) -> EvolutionVerdict {
174    if changes.is_empty() {
175        return EvolutionVerdict::Compatible;
176    }
177
178    // Mode::None — everything is allowed.
179    if mode == CompatibilityMode::None {
180        return EvolutionVerdict::Compatible;
181    }
182
183    let requires_backward = matches!(
184        mode,
185        CompatibilityMode::Backward
186            | CompatibilityMode::Full
187            | CompatibilityMode::BackwardTransitive
188            | CompatibilityMode::FullTransitive
189    );
190    let requires_forward = matches!(
191        mode,
192        CompatibilityMode::Forward
193            | CompatibilityMode::Full
194            | CompatibilityMode::ForwardTransitive
195            | CompatibilityMode::FullTransitive
196    );
197
198    let mut needs_migration = false;
199
200    for change in changes {
201        match change {
202            SchemaChange::ColumnAdded { nullable, .. } => {
203                // Backward: new schema must read old data.
204                // Adding a nullable column is backward-compatible (old data has null).
205                // Adding a non-nullable column breaks backward compatibility
206                // (old data has no value for this column).
207                if requires_backward && !nullable {
208                    return EvolutionVerdict::Incompatible(format!(
209                        "adding non-nullable column '{}' is not backward-compatible",
210                        change_name(change)
211                    ));
212                }
213            }
214            SchemaChange::ColumnRemoved { name } => {
215                // Backward: removing a column breaks backward compatibility
216                // (old data has the column, new reader doesn't expect it).
217                // Actually, backward means new reader reads old data — if the new
218                // schema drops a column, the new reader just ignores it. That's fine.
219                // Forward: old reader reads new data — new data is missing a column
220                // that old reader expects. That breaks forward compatibility.
221                if requires_forward {
222                    return EvolutionVerdict::Incompatible(format!(
223                        "removing column '{name}' is not forward-compatible"
224                    ));
225                }
226                needs_migration = true;
227            }
228            SchemaChange::TypeChanged {
229                name,
230                old_type,
231                new_type,
232            } => {
233                if is_safe_widening(old_type, new_type) {
234                    // Widening is backward-compatible (new reader can handle old
235                    // narrower data). But it's NOT forward-compatible (old reader
236                    // can't handle wider data).
237                    if requires_forward {
238                        return EvolutionVerdict::Incompatible(format!(
239                            "widening column '{name}' from {old_type:?} to {new_type:?} \
240                             is not forward-compatible"
241                        ));
242                    }
243                } else if is_safe_widening(new_type, old_type) {
244                    // Narrowing — forward-compatible but not backward.
245                    if requires_backward {
246                        return EvolutionVerdict::Incompatible(format!(
247                            "narrowing column '{name}' from {old_type:?} to {new_type:?} \
248                             is not backward-compatible"
249                        ));
250                    }
251                    needs_migration = true;
252                } else {
253                    // Unrelated types — always incompatible.
254                    return EvolutionVerdict::Incompatible(format!(
255                        "changing column '{name}' from {old_type:?} to {new_type:?} \
256                         is incompatible"
257                    ));
258                }
259            }
260            SchemaChange::NullabilityChanged {
261                name,
262                was_nullable,
263                now_nullable,
264            } => {
265                if *was_nullable && !now_nullable {
266                    // Making a column non-nullable: new reader rejects old nulls.
267                    if requires_backward {
268                        return EvolutionVerdict::Incompatible(format!(
269                            "making column '{name}' non-nullable is not backward-compatible"
270                        ));
271                    }
272                    needs_migration = true;
273                }
274                // Making a column nullable is always safe.
275            }
276            SchemaChange::ColumnRenamed { old_name, .. } => {
277                // Renames without field-IDs are always incompatible
278                // (name-based matching treats it as drop + add).
279                return EvolutionVerdict::Incompatible(format!(
280                    "renaming column '{old_name}' is not supported without field IDs"
281                ));
282            }
283        }
284    }
285
286    if needs_migration {
287        EvolutionVerdict::RequiresMigration
288    } else {
289        EvolutionVerdict::Compatible
290    }
291}
292
293fn change_name(change: &SchemaChange) -> &str {
294    match change {
295        SchemaChange::ColumnAdded { name, .. }
296        | SchemaChange::ColumnRemoved { name }
297        | SchemaChange::TypeChanged { name, .. }
298        | SchemaChange::NullabilityChanged { name, .. } => name,
299        SchemaChange::ColumnRenamed { old_name, .. } => old_name,
300    }
301}
302
303// ── Schema application ─────────────────────────────────────────────
304
305/// Applies a set of schema changes to produce a new schema and a column
306/// projection mapping old columns to new positions.
307///
308/// # Errors
309///
310/// Returns [`SchemaError`] if the changes reference columns that don't exist.
311pub fn apply_changes(old: &SchemaRef, changes: &[SchemaChange]) -> SchemaResult<ColumnProjection> {
312    let mut fields: Vec<Field> = old.fields().iter().map(|f| f.as_ref().clone()).collect();
313    let mut old_index_map: Vec<Option<usize>> = (0..fields.len()).map(Some).collect();
314
315    // Collect indices of columns to remove.
316    let mut remove_indices = Vec::new();
317
318    for change in changes {
319        match change {
320            SchemaChange::ColumnAdded {
321                name,
322                data_type,
323                nullable,
324            } => {
325                fields.push(Field::new(name, data_type.clone(), *nullable));
326                old_index_map.push(None); // New column — no old source.
327            }
328            SchemaChange::ColumnRemoved { name } => {
329                if let Some(idx) = fields.iter().position(|f| f.name() == name) {
330                    remove_indices.push(idx);
331                }
332            }
333            SchemaChange::TypeChanged { name, new_type, .. } => {
334                if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
335                    *field = Field::new(field.name(), new_type.clone(), field.is_nullable());
336                }
337            }
338            SchemaChange::NullabilityChanged {
339                name, now_nullable, ..
340            } => {
341                if let Some(field) = fields.iter_mut().find(|f| f.name() == name) {
342                    *field = Field::new(field.name(), field.data_type().clone(), *now_nullable);
343                }
344            }
345            SchemaChange::ColumnRenamed {
346                old_name, new_name, ..
347            } => {
348                if let Some(field) = fields.iter_mut().find(|f| f.name() == old_name) {
349                    *field = Field::new(new_name, field.data_type().clone(), field.is_nullable());
350                }
351            }
352        }
353    }
354
355    // Remove columns in reverse order to preserve indices.
356    remove_indices.sort_unstable();
357    remove_indices.dedup();
358    for &idx in remove_indices.iter().rev() {
359        fields.remove(idx);
360        old_index_map.remove(idx);
361    }
362
363    let new_schema = Arc::new(Schema::new(fields));
364
365    Ok(ColumnProjection {
366        mappings: old_index_map,
367        target_schema: new_schema,
368    })
369}
370
371// ── Schema history ─────────────────────────────────────────────────
372
373/// How a schema evolution was triggered.
374#[derive(Debug, Clone, PartialEq, Eq)]
375pub enum EvolutionTrigger {
376    /// User issued ALTER SOURCE DDL.
377    Ddl,
378    /// Schema registry detected a new schema version.
379    Registry {
380        /// The schema ID that triggered the change.
381        schema_id: u32,
382    },
383    /// User issued REFRESH SCHEMA.
384    ManualRefresh,
385}
386
387/// A record in the schema history.
388#[derive(Debug, Clone)]
389pub struct SchemaHistoryEntry {
390    /// Source name.
391    pub source_name: String,
392    /// Schema version (monotonically increasing per source).
393    pub version: u64,
394    /// The Arrow schema at this version.
395    pub schema: SchemaRef,
396    /// Changes from the previous version.
397    pub changes: Vec<SchemaChange>,
398    /// When this version was applied.
399    pub applied_at: SystemTime,
400    /// How the evolution was triggered.
401    pub trigger: EvolutionTrigger,
402}
403
404/// Tracks schema version history for all sources.
405#[derive(Debug, Default)]
406pub struct SchemaHistory {
407    entries: HashMap<String, Vec<SchemaHistoryEntry>>,
408}
409
410impl SchemaHistory {
411    /// Creates a new empty history.
412    #[must_use]
413    pub fn new() -> Self {
414        Self::default()
415    }
416
417    /// Records a new schema version for a source. Returns the version number.
418    pub fn record(
419        &mut self,
420        source_name: &str,
421        schema: SchemaRef,
422        changes: Vec<SchemaChange>,
423        trigger: EvolutionTrigger,
424    ) -> u64 {
425        let entries = self.entries.entry(source_name.to_string()).or_default();
426        let version = entries.last().map_or(1, |e| e.version + 1);
427        entries.push(SchemaHistoryEntry {
428            source_name: source_name.to_string(),
429            version,
430            schema,
431            changes,
432            applied_at: SystemTime::now(),
433            trigger,
434        });
435        version
436    }
437
438    /// Returns all versions for a source.
439    #[must_use]
440    pub fn versions(&self, source_name: &str) -> &[SchemaHistoryEntry] {
441        self.entries.get(source_name).map_or(&[], |v| v.as_slice())
442    }
443
444    /// Returns the latest version number for a source.
445    #[must_use]
446    pub fn latest_version(&self, source_name: &str) -> Option<u64> {
447        self.entries.get(source_name)?.last().map(|e| e.version)
448    }
449}
450
451// ── Schema Evolution Engine ────────────────────────────────────────
452
453/// Result of an evolution attempt.
454#[derive(Debug)]
455pub enum EvolutionResult {
456    /// No schema change detected.
457    NoChange,
458    /// Evolution applied successfully.
459    Applied {
460        /// The new schema.
461        new_schema: SchemaRef,
462        /// Column projection from old → new.
463        projection: ColumnProjection,
464        /// The new version number.
465        version: u64,
466        /// The individual changes.
467        changes: Vec<SchemaChange>,
468    },
469}
470
471/// Orchestrates the full schema evolution flow.
472///
473/// 1. Diff the current and proposed schemas
474/// 2. Evaluate compatibility
475/// 3. Apply changes and produce a new schema + projection
476/// 4. Record in schema history
477#[derive(Debug)]
478pub struct SchemaEvolutionEngine {
479    /// Per-source version history.
480    pub history: SchemaHistory,
481    /// Default compatibility mode.
482    pub default_compatibility: CompatibilityMode,
483}
484
485impl SchemaEvolutionEngine {
486    /// Creates a new engine with the given default compatibility mode.
487    #[must_use]
488    pub fn new(default_compatibility: CompatibilityMode) -> Self {
489        Self {
490            history: SchemaHistory::new(),
491            default_compatibility,
492        }
493    }
494
495    /// Executes the full evolution flow for a source.
496    ///
497    /// # Errors
498    ///
499    /// Returns [`SchemaError::EvolutionRejected`] if the changes are incompatible.
500    pub fn evolve(
501        &mut self,
502        source_name: &str,
503        evolvable: &SchemaEvolution,
504        current: &SchemaRef,
505        proposed: &SchemaRef,
506        trigger: EvolutionTrigger,
507    ) -> SchemaResult<EvolutionResult> {
508        let changes = evolvable.diff_schemas(current, proposed);
509
510        if changes.is_empty() {
511            return Ok(EvolutionResult::NoChange);
512        }
513
514        let verdict = evolvable.evaluate_evolution(&changes);
515
516        match verdict {
517            EvolutionVerdict::Compatible | EvolutionVerdict::RequiresMigration => {
518                let projection = evolvable.apply_evolution(current, &changes)?;
519                let version = self.history.record(
520                    source_name,
521                    projection.target_schema.clone(),
522                    changes.clone(),
523                    trigger,
524                );
525                Ok(EvolutionResult::Applied {
526                    new_schema: projection.target_schema.clone(),
527                    projection,
528                    version,
529                    changes,
530                })
531            }
532            EvolutionVerdict::Incompatible(reason) => Err(SchemaError::EvolutionRejected(reason)),
533        }
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
542        Arc::new(Schema::new(
543            fields
544                .iter()
545                .map(|(name, dt, nullable)| Field::new(*name, dt.clone(), *nullable))
546                .collect::<Vec<_>>(),
547        ))
548    }
549
550    // ── is_safe_widening tests ────────────────────────────────
551
552    #[test]
553    fn test_int_widening() {
554        assert!(is_safe_widening(&DataType::Int8, &DataType::Int16));
555        assert!(is_safe_widening(&DataType::Int8, &DataType::Int32));
556        assert!(is_safe_widening(&DataType::Int8, &DataType::Int64));
557        assert!(is_safe_widening(&DataType::Int16, &DataType::Int32));
558        assert!(is_safe_widening(&DataType::Int16, &DataType::Int64));
559        assert!(is_safe_widening(&DataType::Int32, &DataType::Int64));
560    }
561
562    #[test]
563    fn test_uint_widening() {
564        assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt16));
565        assert!(is_safe_widening(&DataType::UInt8, &DataType::UInt32));
566        assert!(is_safe_widening(&DataType::UInt32, &DataType::UInt64));
567    }
568
569    #[test]
570    fn test_float_widening() {
571        assert!(is_safe_widening(&DataType::Float16, &DataType::Float32));
572        assert!(is_safe_widening(&DataType::Float16, &DataType::Float64));
573        assert!(is_safe_widening(&DataType::Float32, &DataType::Float64));
574    }
575
576    #[test]
577    fn test_int_to_float_widening() {
578        assert!(is_safe_widening(&DataType::Int8, &DataType::Float32));
579        assert!(is_safe_widening(&DataType::Int16, &DataType::Float64));
580        assert!(is_safe_widening(&DataType::Int32, &DataType::Float64));
581    }
582
583    #[test]
584    fn test_string_binary_widening() {
585        assert!(is_safe_widening(&DataType::Utf8, &DataType::LargeUtf8));
586        assert!(is_safe_widening(&DataType::Binary, &DataType::LargeBinary));
587    }
588
589    #[test]
590    fn test_narrowing_not_safe() {
591        assert!(!is_safe_widening(&DataType::Int64, &DataType::Int32));
592        assert!(!is_safe_widening(&DataType::Float64, &DataType::Float32));
593        assert!(!is_safe_widening(&DataType::LargeUtf8, &DataType::Utf8));
594    }
595
596    #[test]
597    fn test_unrelated_types() {
598        assert!(!is_safe_widening(&DataType::Int64, &DataType::Utf8));
599        assert!(!is_safe_widening(&DataType::Boolean, &DataType::Int32));
600    }
601
602    // ── diff_schemas tests ────────────────────────────────────
603
604    #[test]
605    fn test_diff_identical() {
606        let s = schema(&[("a", DataType::Int64, false)]);
607        let changes = diff_schemas_by_name(&s, &s);
608        assert!(changes.is_empty());
609    }
610
611    #[test]
612    fn test_diff_column_added() {
613        let old = schema(&[("a", DataType::Int64, false)]);
614        let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
615        let changes = diff_schemas_by_name(&old, &new);
616        assert_eq!(changes.len(), 1);
617        assert!(matches!(&changes[0], SchemaChange::ColumnAdded { name, .. } if name == "b"));
618    }
619
620    #[test]
621    fn test_diff_column_removed() {
622        let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
623        let new = schema(&[("a", DataType::Int64, false)]);
624        let changes = diff_schemas_by_name(&old, &new);
625        assert_eq!(changes.len(), 1);
626        assert!(matches!(&changes[0], SchemaChange::ColumnRemoved { name } if name == "b"));
627    }
628
629    #[test]
630    fn test_diff_type_changed() {
631        let old = schema(&[("a", DataType::Int32, false)]);
632        let new = schema(&[("a", DataType::Int64, false)]);
633        let changes = diff_schemas_by_name(&old, &new);
634        assert_eq!(changes.len(), 1);
635        assert!(matches!(
636            &changes[0],
637            SchemaChange::TypeChanged {
638                name,
639                old_type: DataType::Int32,
640                new_type: DataType::Int64,
641            } if name == "a"
642        ));
643    }
644
645    #[test]
646    fn test_diff_nullability_changed() {
647        let old = schema(&[("a", DataType::Int64, false)]);
648        let new = schema(&[("a", DataType::Int64, true)]);
649        let changes = diff_schemas_by_name(&old, &new);
650        assert_eq!(changes.len(), 1);
651        assert!(matches!(
652            &changes[0],
653            SchemaChange::NullabilityChanged {
654                name,
655                was_nullable: false,
656                now_nullable: true,
657            } if name == "a"
658        ));
659    }
660
661    #[test]
662    fn test_diff_multiple_changes() {
663        let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
664        let new = schema(&[
665            ("a", DataType::Int64, true),    // nullability changed
666            ("c", DataType::Float64, false), // b removed, c added
667        ]);
668        let changes = diff_schemas_by_name(&old, &new);
669        // Should have: ColumnRemoved(b), ColumnAdded(c), NullabilityChanged(a)
670        assert_eq!(changes.len(), 3);
671    }
672
673    // ── evaluate_changes tests ────────────────────────────────
674
675    #[test]
676    fn test_evaluate_no_changes() {
677        let verdict = evaluate_changes(&[], CompatibilityMode::Full);
678        assert_eq!(verdict, EvolutionVerdict::Compatible);
679    }
680
681    #[test]
682    fn test_evaluate_none_mode_allows_all() {
683        let changes = vec![
684            SchemaChange::ColumnRemoved { name: "x".into() },
685            SchemaChange::TypeChanged {
686                name: "y".into(),
687                old_type: DataType::Int64,
688                new_type: DataType::Utf8, // unrelated type change
689            },
690        ];
691        let verdict = evaluate_changes(&changes, CompatibilityMode::None);
692        assert_eq!(verdict, EvolutionVerdict::Compatible);
693    }
694
695    #[test]
696    fn test_evaluate_backward_add_nullable_ok() {
697        let changes = vec![SchemaChange::ColumnAdded {
698            name: "email".into(),
699            data_type: DataType::Utf8,
700            nullable: true,
701        }];
702        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
703        assert_eq!(verdict, EvolutionVerdict::Compatible);
704    }
705
706    #[test]
707    fn test_evaluate_backward_add_non_nullable_rejected() {
708        let changes = vec![SchemaChange::ColumnAdded {
709            name: "email".into(),
710            data_type: DataType::Utf8,
711            nullable: false,
712        }];
713        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
714        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
715    }
716
717    #[test]
718    fn test_evaluate_forward_drop_rejected() {
719        let changes = vec![SchemaChange::ColumnRemoved {
720            name: "legacy".into(),
721        }];
722        let verdict = evaluate_changes(&changes, CompatibilityMode::Forward);
723        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
724    }
725
726    #[test]
727    fn test_evaluate_backward_drop_ok() {
728        let changes = vec![SchemaChange::ColumnRemoved {
729            name: "legacy".into(),
730        }];
731        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
732        assert_eq!(verdict, EvolutionVerdict::RequiresMigration);
733    }
734
735    #[test]
736    fn test_evaluate_backward_widening_ok() {
737        let changes = vec![SchemaChange::TypeChanged {
738            name: "count".into(),
739            old_type: DataType::Int32,
740            new_type: DataType::Int64,
741        }];
742        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
743        assert_eq!(verdict, EvolutionVerdict::Compatible);
744    }
745
746    #[test]
747    fn test_evaluate_full_widening_rejected() {
748        let changes = vec![SchemaChange::TypeChanged {
749            name: "count".into(),
750            old_type: DataType::Int32,
751            new_type: DataType::Int64,
752        }];
753        let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
754        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
755    }
756
757    #[test]
758    fn test_evaluate_unrelated_type_change_rejected() {
759        let changes = vec![SchemaChange::TypeChanged {
760            name: "val".into(),
761            old_type: DataType::Int64,
762            new_type: DataType::Utf8,
763        }];
764        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
765        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
766    }
767
768    #[test]
769    fn test_evaluate_make_nullable_ok() {
770        let changes = vec![SchemaChange::NullabilityChanged {
771            name: "field".into(),
772            was_nullable: false,
773            now_nullable: true,
774        }];
775        let verdict = evaluate_changes(&changes, CompatibilityMode::Full);
776        assert_eq!(verdict, EvolutionVerdict::Compatible);
777    }
778
779    #[test]
780    fn test_evaluate_make_non_nullable_backward_rejected() {
781        let changes = vec![SchemaChange::NullabilityChanged {
782            name: "field".into(),
783            was_nullable: true,
784            now_nullable: false,
785        }];
786        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
787        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
788    }
789
790    #[test]
791    fn test_evaluate_rename_rejected() {
792        let changes = vec![SchemaChange::ColumnRenamed {
793            old_name: "fname".into(),
794            new_name: "first_name".into(),
795        }];
796        let verdict = evaluate_changes(&changes, CompatibilityMode::Backward);
797        assert!(matches!(verdict, EvolutionVerdict::Incompatible(_)));
798    }
799
800    // ── apply_changes tests ───────────────────────────────────
801
802    #[test]
803    fn test_apply_add_column() {
804        let old = schema(&[("a", DataType::Int64, false)]);
805        let changes = vec![SchemaChange::ColumnAdded {
806            name: "b".into(),
807            data_type: DataType::Utf8,
808            nullable: true,
809        }];
810        let proj = apply_changes(&old, &changes).unwrap();
811        assert_eq!(proj.target_schema.fields().len(), 2);
812        assert_eq!(proj.mappings, vec![Some(0), None]);
813    }
814
815    #[test]
816    fn test_apply_remove_column() {
817        let old = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
818        let changes = vec![SchemaChange::ColumnRemoved { name: "b".into() }];
819        let proj = apply_changes(&old, &changes).unwrap();
820        assert_eq!(proj.target_schema.fields().len(), 1);
821        assert_eq!(proj.target_schema.field(0).name(), "a");
822        assert_eq!(proj.mappings, vec![Some(0)]);
823    }
824
825    #[test]
826    fn test_apply_widen_type() {
827        let old = schema(&[("val", DataType::Int32, false)]);
828        let changes = vec![SchemaChange::TypeChanged {
829            name: "val".into(),
830            old_type: DataType::Int32,
831            new_type: DataType::Int64,
832        }];
833        let proj = apply_changes(&old, &changes).unwrap();
834        assert_eq!(proj.target_schema.field(0).data_type(), &DataType::Int64);
835        assert_eq!(proj.mappings, vec![Some(0)]);
836    }
837
838    #[test]
839    fn test_apply_change_nullability() {
840        let old = schema(&[("val", DataType::Int64, false)]);
841        let changes = vec![SchemaChange::NullabilityChanged {
842            name: "val".into(),
843            was_nullable: false,
844            now_nullable: true,
845        }];
846        let proj = apply_changes(&old, &changes).unwrap();
847        assert!(proj.target_schema.field(0).is_nullable());
848    }
849
850    #[test]
851    fn test_apply_rename() {
852        let old = schema(&[("fname", DataType::Utf8, false)]);
853        let changes = vec![SchemaChange::ColumnRenamed {
854            old_name: "fname".into(),
855            new_name: "first_name".into(),
856        }];
857        let proj = apply_changes(&old, &changes).unwrap();
858        assert_eq!(proj.target_schema.field(0).name(), "first_name");
859        assert_eq!(proj.mappings, vec![Some(0)]);
860    }
861
862    #[test]
863    fn test_apply_multi_change() {
864        let old = schema(&[
865            ("a", DataType::Int64, false),
866            ("b", DataType::Int32, true),
867            ("c", DataType::Utf8, false),
868        ]);
869        let changes = vec![
870            SchemaChange::ColumnRemoved { name: "c".into() },
871            SchemaChange::ColumnAdded {
872                name: "d".into(),
873                data_type: DataType::Float64,
874                nullable: true,
875            },
876            SchemaChange::TypeChanged {
877                name: "b".into(),
878                old_type: DataType::Int32,
879                new_type: DataType::Int64,
880            },
881        ];
882        let proj = apply_changes(&old, &changes).unwrap();
883        assert_eq!(proj.target_schema.fields().len(), 3); // a, b (widened), d
884        assert_eq!(proj.target_schema.field(0).name(), "a");
885        assert_eq!(proj.target_schema.field(1).name(), "b");
886        assert_eq!(proj.target_schema.field(1).data_type(), &DataType::Int64);
887        assert_eq!(proj.target_schema.field(2).name(), "d");
888        // a→Some(0), b→Some(1), d→None
889        assert_eq!(proj.mappings, vec![Some(0), Some(1), None]);
890    }
891
892    // ── SchemaHistory tests ───────────────────────────────────
893
894    #[test]
895    fn test_history_record_and_query() {
896        let mut history = SchemaHistory::new();
897        let s1 = schema(&[("a", DataType::Int64, false)]);
898        let s2 = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
899
900        let v1 = history.record("test_source", s1, vec![], EvolutionTrigger::Ddl);
901        assert_eq!(v1, 1);
902
903        let v2 = history.record(
904            "test_source",
905            s2,
906            vec![SchemaChange::ColumnAdded {
907                name: "b".into(),
908                data_type: DataType::Utf8,
909                nullable: true,
910            }],
911            EvolutionTrigger::Ddl,
912        );
913        assert_eq!(v2, 2);
914
915        assert_eq!(history.versions("test_source").len(), 2);
916        assert_eq!(history.latest_version("test_source"), Some(2));
917        assert_eq!(history.latest_version("unknown"), None);
918    }
919
920    // ── SchemaEvolutionEngine tests ───────────────────────────
921
922    #[test]
923    fn test_engine_no_change() {
924        let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
925        let evolver = SchemaEvolution::new(CompatibilityMode::Backward);
926        let s = schema(&[("a", DataType::Int64, false)]);
927
928        let result = engine
929            .evolve("src", &evolver, &s, &s, EvolutionTrigger::Ddl)
930            .unwrap();
931
932        assert!(matches!(result, EvolutionResult::NoChange));
933    }
934
935    #[test]
936    fn test_engine_add_nullable_column() {
937        let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Backward);
938        let evolver = SchemaEvolution::new(CompatibilityMode::Backward);
939        let old = schema(&[("a", DataType::Int64, false)]);
940        let new = schema(&[("a", DataType::Int64, false), ("b", DataType::Utf8, true)]);
941
942        let result = engine
943            .evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl)
944            .unwrap();
945
946        match result {
947            EvolutionResult::Applied {
948                new_schema,
949                version,
950                changes,
951                ..
952            } => {
953                assert_eq!(new_schema.fields().len(), 2);
954                assert_eq!(version, 1);
955                assert_eq!(changes.len(), 1);
956            }
957            EvolutionResult::NoChange => panic!("expected Applied"),
958        }
959    }
960
961    #[test]
962    fn test_engine_incompatible_rejected() {
963        let mut engine = SchemaEvolutionEngine::new(CompatibilityMode::Full);
964        let evolver = SchemaEvolution::new(CompatibilityMode::Full);
965        let old = schema(&[("a", DataType::Int32, false)]);
966        let new = schema(&[("a", DataType::Int64, false)]); // widening under Full = incompatible
967
968        let result = engine.evolve("src", &evolver, &old, &new, EvolutionTrigger::Ddl);
969        assert!(result.is_err());
970    }
971
972    // ── SchemaEvolution integration ──────────────────────
973
974    #[test]
975    fn test_default_evolver_diff_and_apply() {
976        let evolver = SchemaEvolution::default();
977        let old = schema(&[("id", DataType::Int64, false)]);
978        let new = schema(&[
979            ("id", DataType::Int64, false),
980            ("email", DataType::Utf8, true),
981        ]);
982
983        let changes = evolver.diff_schemas(&old, &new);
984        assert_eq!(changes.len(), 1);
985
986        let verdict = evolver.evaluate_evolution(&changes);
987        assert_eq!(verdict, EvolutionVerdict::Compatible);
988
989        let proj = evolver.apply_evolution(&old, &changes).unwrap();
990        assert_eq!(proj.target_schema.fields().len(), 2);
991        assert_eq!(proj.mappings, vec![Some(0), None]);
992    }
993}