Skip to main content

laminar_connectors/lakehouse/
delta_io.rs

1//! Delta Lake I/O integration module.
2//!
3//! This module provides the actual I/O operations for Delta Lake tables via the
4//! `deltalake` crate. All functions are feature-gated behind `delta-lake`.
5//!
6//! # Architecture
7//!
8//! The I/O module is separate from the business logic in [`delta.rs`](super::delta)
9//! to allow:
10//! - Testing business logic without the `deltalake` dependency
11//! - Clean separation of concerns (buffering/epoch management vs. actual writes)
12//! - Easy mocking for unit tests
13//!
14//! # Exactly-Once Semantics
15//!
16//! Delta Lake's transaction log supports application-level transaction metadata
17//! via the `txn` action. We use this to store `(writer_id, epoch)` pairs, enabling
18//! exactly-once semantics:
19//!
20//! 1. On recovery, read `txn` metadata to find the last committed epoch for this writer
21//! 2. Skip epochs <= last committed (idempotent replay)
22//! 3. Each write includes the epoch in `txn` metadata
23
24#[cfg(feature = "delta-lake")]
25use std::collections::HashMap;
26
27#[cfg(feature = "delta-lake")]
28use std::sync::Arc;
29
30#[cfg(feature = "delta-lake")]
31use arrow_array::RecordBatch;
32
33#[cfg(feature = "delta-lake")]
34use arrow_schema::SchemaRef;
35
36// delta_kernel's TryIntoKernel trait is re-exported via deltalake.
37#[cfg(feature = "delta-lake")]
38use deltalake::kernel::engine::arrow_conversion::TryIntoKernel as _;
39
40#[cfg(feature = "delta-lake")]
41use deltalake::kernel::transaction::CommitProperties;
42
43#[cfg(feature = "delta-lake")]
44use deltalake::kernel::Transaction;
45
46#[cfg(feature = "delta-lake")]
47use deltalake::operations::write::SchemaMode;
48
49#[cfg(feature = "delta-lake")]
50use deltalake::protocol::SaveMode;
51
52#[cfg(feature = "delta-lake")]
53use deltalake::DeltaTable;
54
55#[cfg(feature = "delta-lake")]
56use tracing::{debug, info, warn};
57
58#[cfg(feature = "delta-lake")]
59use url::Url;
60
61#[cfg(feature = "delta-lake")]
62use crate::error::ConnectorError;
63
64/// Converts a path string to a URL.
65#[cfg(feature = "delta-lake")]
66fn path_to_url(path: &str) -> Result<Url, ConnectorError> {
67    // If it already looks like a URL, parse it directly.
68    if path.contains("://") {
69        Url::parse(path)
70            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid URL '{path}': {e}")))
71    } else {
72        // Local path - convert to file URL.
73        // First canonicalize if it exists, otherwise use as-is.
74        let path_buf = std::path::Path::new(path);
75        let normalized = if path_buf.exists() {
76            std::fs::canonicalize(path_buf).map_err(|e| {
77                ConnectorError::ConfigurationError(format!("invalid path '{path}': {e}"))
78            })?
79        } else {
80            // For new tables, the path might not exist yet.
81            // Use absolute path if possible.
82            if path_buf.is_absolute() {
83                path_buf.to_path_buf()
84            } else {
85                std::env::current_dir()
86                    .map_err(|e| {
87                        ConnectorError::ConfigurationError(format!("cannot get current dir: {e}"))
88                    })?
89                    .join(path_buf)
90            }
91        };
92
93        Url::from_directory_path(&normalized).map_err(|()| {
94            ConnectorError::ConfigurationError(format!(
95                "cannot convert path to URL: {}",
96                normalized.display()
97            ))
98        })
99    }
100}
101
102/// Opens an existing Delta Lake table or creates a new one.
103///
104/// # Arguments
105///
106/// * `table_path` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
107/// * `storage_options` - Storage credentials and configuration
108/// * `schema` - Optional Arrow schema for table creation (required if table doesn't exist)
109///
110/// # Returns
111///
112/// The opened `DeltaTable` handle.
113///
114/// # Errors
115///
116/// Returns `ConnectorError::ConnectionFailed` if the table cannot be opened or created.
117#[cfg(feature = "delta-lake")]
118#[allow(clippy::implicit_hasher)]
119pub async fn open_or_create_table(
120    table_path: &str,
121    storage_options: HashMap<String, String>,
122    schema: Option<&SchemaRef>,
123) -> Result<DeltaTable, ConnectorError> {
124    info!(table_path, "opening Delta Lake table");
125
126    let url = path_to_url(table_path)?;
127
128    // Try to open or initialize the table.
129    let table = DeltaTable::try_from_url_with_storage_options(url.clone(), storage_options.clone())
130        .await
131        .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to open table: {e}")))?;
132
133    // Check if the table is initialized (has state).
134    if table.version().is_some() {
135        info!(
136            table_path,
137            version = table.version(),
138            "opened existing Delta Lake table"
139        );
140        return Ok(table);
141    }
142
143    // Table doesn't exist - create it if we have a schema.
144    let schema = schema.ok_or_else(|| {
145        ConnectorError::ConfigurationError(
146            "cannot create Delta Lake table without schema - \
147             write at least one batch first"
148                .into(),
149        )
150    })?;
151
152    info!(table_path, "creating new Delta Lake table");
153
154    // Convert Arrow schema to Delta Lake schema using TryIntoKernel.
155    let delta_schema: deltalake::kernel::StructType = schema
156        .as_ref()
157        .try_into_kernel()
158        .map_err(|e| ConnectorError::SchemaMismatch(format!("schema conversion failed: {e}")))?;
159
160    // Create the table.
161    let table = table
162        .create()
163        .with_columns(delta_schema.fields().cloned())
164        .await
165        .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to create table: {e}")))?;
166
167    info!(
168        table_path,
169        version = table.version(),
170        "created new Delta Lake table"
171    );
172
173    Ok(table)
174}
175
176/// Writes batches to a Delta Lake table with exactly-once semantics.
177///
178/// # Arguments
179///
180/// * `table` - The Delta Lake table handle (consumed and returned)
181/// * `batches` - Record batches to write
182/// * `writer_id` - Unique writer identifier for exactly-once deduplication
183/// * `epoch` - The epoch number for this write (stored in txn metadata)
184/// * `save_mode` - Delta Lake save mode (Append, Overwrite, etc.)
185/// * `partition_columns` - Optional partition column name slice
186/// * `schema_evolution` - If true, auto-merge new columns into the table schema
187///
188/// # Returns
189///
190/// A tuple of (updated table handle, new Delta version).
191///
192/// # Errors
193///
194/// Returns `ConnectorError::WriteError` if the write fails.
195#[cfg(feature = "delta-lake")]
196pub async fn write_batches(
197    table: DeltaTable,
198    batches: Vec<RecordBatch>,
199    writer_id: &str,
200    epoch: u64,
201    save_mode: SaveMode,
202    partition_columns: Option<&[String]>,
203    schema_evolution: bool,
204) -> Result<(DeltaTable, i64), ConnectorError> {
205    if batches.is_empty() {
206        debug!("no batches to write, skipping");
207        let version = table.version().unwrap_or(0);
208        return Ok((table, version));
209    }
210
211    let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
212
213    debug!(
214        writer_id,
215        epoch,
216        total_rows,
217        num_batches = batches.len(),
218        "writing batches to Delta Lake"
219    );
220
221    // Build the write operation with transaction metadata for exactly-once.
222    // Note: Delta Lake uses i64 for epoch, but our API uses u64. This is safe
223    // as epochs won't exceed i64::MAX in practice.
224    #[allow(clippy::cast_possible_wrap)]
225    let epoch_i64 = epoch as i64;
226
227    let mut write_builder = table
228        .write(batches)
229        .with_save_mode(save_mode)
230        .with_commit_properties(
231            CommitProperties::default()
232                .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
233        );
234
235    // Enable schema evolution (additive column merge) if requested.
236    if schema_evolution {
237        write_builder = write_builder.with_schema_mode(SchemaMode::Merge);
238    }
239
240    // Add partition columns if specified.
241    if let Some(cols) = partition_columns {
242        if !cols.is_empty() {
243            write_builder = write_builder.with_partition_columns(cols.to_vec());
244        }
245    }
246
247    // Execute the write.
248    let table = write_builder
249        .await
250        .map_err(|e| ConnectorError::WriteError(format!("Delta Lake write failed: {e}")))?;
251
252    let version = table.version().unwrap_or(0);
253
254    info!(
255        writer_id,
256        epoch, version, total_rows, "committed Delta Lake transaction"
257    );
258
259    Ok((table, version))
260}
261
262/// Retrieves the last committed epoch for a writer from Delta Lake's txn metadata.
263///
264/// This is used for exactly-once recovery: on startup, we check what epoch was
265/// last committed and skip any epochs <= that value.
266///
267/// # Arguments
268///
269/// * `table` - The Delta Lake table handle
270/// * `writer_id` - The writer identifier to look up
271///
272/// # Returns
273///
274/// The last committed epoch for this writer, or 0 if no commits found.
275#[cfg(feature = "delta-lake")]
276pub async fn get_last_committed_epoch(table: &DeltaTable, writer_id: &str) -> u64 {
277    // Query the table's application transaction version.
278    let Ok(snapshot) = table.snapshot() else {
279        debug!(writer_id, "no snapshot available, assuming epoch 0");
280        return 0;
281    };
282
283    match snapshot
284        .transaction_version(&table.log_store(), writer_id)
285        .await
286    {
287        Ok(Some(version)) => {
288            // Note: Delta Lake uses i64 for version, but our epoch is u64.
289            // Versions are always non-negative, so this is safe.
290            #[allow(clippy::cast_sign_loss)]
291            let epoch = version as u64;
292            debug!(
293                writer_id,
294                epoch, "found last committed epoch from txn metadata"
295            );
296            epoch
297        }
298        Ok(None) => {
299            debug!(
300                writer_id,
301                "no txn metadata found for writer, assuming epoch 0"
302            );
303            0
304        }
305        Err(e) => {
306            warn!(writer_id, error = %e, "failed to read txn metadata, assuming epoch 0");
307            0
308        }
309    }
310}
311
312/// Extracts the Arrow schema from a Delta Lake table.
313///
314/// # Arguments
315///
316/// * `table` - The Delta Lake table handle
317///
318/// # Returns
319///
320/// The table's Arrow schema.
321///
322/// # Errors
323///
324/// Returns `ConnectorError::SchemaMismatch` if schema extraction fails.
325#[cfg(feature = "delta-lake")]
326pub fn get_table_schema(table: &DeltaTable) -> Result<SchemaRef, ConnectorError> {
327    let state = table
328        .snapshot()
329        .map_err(|e| ConnectorError::SchemaMismatch(format!("table has no snapshot: {e}")))?;
330
331    // Use the pre-computed Arrow schema from the EagerSnapshot.
332    Ok(state.snapshot().arrow_schema())
333}
334
335/// Returns the latest committed version of a Delta Lake table.
336///
337/// This refreshes the table state from storage before checking.
338///
339/// # Errors
340///
341/// Returns `ConnectorError::ReadError` if the table state cannot be refreshed.
342#[cfg(feature = "delta-lake")]
343pub async fn get_latest_version(table: &mut DeltaTable) -> Result<i64, ConnectorError> {
344    // DeltaTable::update() takes ownership, so clone and replace.
345    let (updated, _metrics) =
346        table.clone().update().await.map_err(|e| {
347            ConnectorError::ReadError(format!("failed to refresh Delta table: {e}"))
348        })?;
349
350    *table = updated;
351    Ok(table.version().unwrap_or(0))
352}
353
354/// Reads record batches from a specific Delta Lake table version.
355///
356/// Loads the requested version, applies a `LIMIT` to bound memory usage,
357/// then streams results via `execute_stream` to avoid materializing the
358/// entire version in memory.
359///
360/// # Arguments
361///
362/// * `table` - Mutable reference to the Delta Lake table handle
363/// * `version` - The table version to read
364/// * `max_records` - Maximum number of records to return. Pass `usize::MAX`
365///   to read all records (unbounded).
366///
367/// # Errors
368///
369/// Returns `ConnectorError::ReadError` if the version cannot be loaded or scanned.
370#[cfg(feature = "delta-lake")]
371pub async fn read_batches_at_version(
372    table: &mut DeltaTable,
373    version: i64,
374    max_records: usize,
375) -> Result<Vec<RecordBatch>, ConnectorError> {
376    use datafusion::prelude::SessionContext;
377    use tokio_stream::StreamExt;
378
379    // Load the specific version.
380    table
381        .load_version(version)
382        .await
383        .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
384
385    debug!(version, "Delta Lake: loaded version for reading");
386
387    // Build a DeltaTableProvider via the builder and register it with DataFusion.
388    let provider =
389        table.table_provider().build().await.map_err(|e| {
390            ConnectorError::ReadError(format!("failed to build table provider: {e}"))
391        })?;
392
393    let ctx = SessionContext::new();
394    ctx.register_table("delta_source_scan", Arc::new(provider))
395        .map_err(|e| ConnectorError::ReadError(format!("failed to register scan table: {e}")))?;
396
397    // Apply LIMIT to bound memory: prevents OOM on large versions.
398    let df = ctx
399        .sql("SELECT * FROM delta_source_scan")
400        .await
401        .map_err(|e| ConnectorError::ReadError(format!("scan query failed: {e}")))?;
402
403    let df = if max_records < usize::MAX {
404        df.limit(0, Some(max_records))
405            .map_err(|e| ConnectorError::ReadError(format!("limit failed: {e}")))?
406    } else {
407        df
408    };
409
410    // Stream results instead of collect() to avoid materializing everything.
411    let mut stream = df
412        .execute_stream()
413        .await
414        .map_err(|e| ConnectorError::ReadError(format!("stream execution failed: {e}")))?;
415
416    let mut batches = Vec::new();
417    let mut total_rows: usize = 0;
418
419    while let Some(result) = stream.next().await {
420        let batch =
421            result.map_err(|e| ConnectorError::ReadError(format!("stream batch failed: {e}")))?;
422        if batch.num_rows() == 0 {
423            continue;
424        }
425        total_rows += batch.num_rows();
426        batches.push(batch);
427
428        // Respect max_records even between DataFusion batches.
429        if total_rows >= max_records {
430            break;
431        }
432    }
433
434    debug!(
435        version,
436        num_batches = batches.len(),
437        total_rows,
438        "Delta Lake: scanned version"
439    );
440
441    Ok(batches)
442}
443
444/// Result of a MERGE (upsert) operation.
445#[cfg(feature = "delta-lake")]
446#[derive(Debug)]
447pub struct MergeResult {
448    /// Number of rows inserted.
449    pub rows_inserted: usize,
450    /// Number of rows updated.
451    pub rows_updated: usize,
452    /// Number of rows deleted.
453    pub rows_deleted: usize,
454}
455
456/// Performs a MERGE (upsert) of a source batch into a Delta Lake table.
457///
458/// Matches source rows to target rows by `key_columns`, then:
459/// - **Matched**: update all non-key columns from source
460/// - **Not matched**: insert all columns from source
461///
462/// # Arguments
463///
464/// * `table` - The Delta Lake table handle (consumed and returned)
465/// * `source_batch` - The source `RecordBatch` to merge
466/// * `key_columns` - Columns used to match source to target rows
467/// * `writer_id` - Unique writer identifier for exactly-once deduplication
468/// * `epoch` - The epoch number for this write
469/// * `schema_evolution` - If true, auto-merge new columns into the table schema
470///
471/// # Errors
472///
473/// Returns `ConnectorError::WriteError` if the merge fails.
474#[cfg(feature = "delta-lake")]
475#[allow(clippy::too_many_lines)]
476pub async fn merge_batches(
477    table: DeltaTable,
478    source_batch: RecordBatch,
479    key_columns: &[String],
480    writer_id: &str,
481    epoch: u64,
482    schema_evolution: bool,
483) -> Result<(DeltaTable, MergeResult), ConnectorError> {
484    use datafusion::prelude::*;
485    use deltalake::kernel::transaction::CommitProperties;
486    use deltalake::kernel::Transaction;
487    if source_batch.num_rows() == 0 {
488        return Ok((
489            table,
490            MergeResult {
491                rows_inserted: 0,
492                rows_updated: 0,
493                rows_deleted: 0,
494            },
495        ));
496    }
497
498    debug!(
499        key_columns = ?key_columns,
500        source_rows = source_batch.num_rows(),
501        "performing Delta Lake MERGE"
502    );
503
504    // Register source batch as a DataFrame.
505    let ctx = SessionContext::new();
506    let source_df = ctx.read_batch(source_batch).map_err(|e| {
507        ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
508    })?;
509
510    // Build the join predicate: target.k1 = source.k1 AND target.k2 = source.k2 ...
511    let predicate = key_columns
512        .iter()
513        .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
514        .reduce(Expr::and)
515        .ok_or_else(|| {
516            ConnectorError::ConfigurationError("merge requires at least one key column".into())
517        })?;
518
519    // Build the merge operation.
520    #[allow(clippy::cast_possible_wrap)]
521    let epoch_i64 = epoch as i64;
522
523    let source_schema = source_df.schema().clone();
524
525    // Use a HashSet for O(1) key lookups instead of O(k) linear scan.
526    let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
527
528    let all_columns: Vec<String> = source_schema
529        .fields()
530        .iter()
531        .map(|f| f.name().clone())
532        .collect();
533
534    let non_key_columns: Vec<String> = all_columns
535        .iter()
536        .filter(|c| !key_set.contains(c.as_str()))
537        .cloned()
538        .collect();
539
540    // Move into closures (no clone needed — originals are not used after).
541    let non_key_for_update = non_key_columns;
542    let all_for_insert = all_columns;
543
544    let mut merge_builder = table
545        .merge(source_df, predicate)
546        .with_source_alias("source")
547        .with_target_alias("target")
548        .with_commit_properties(
549            CommitProperties::default()
550                .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
551        )
552        .when_matched_update(|update| {
553            let mut u = update;
554            for col_name in &non_key_for_update {
555                u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
556            }
557            u
558        })
559        .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
560        .when_not_matched_insert(|insert| {
561            let mut ins = insert;
562            for col_name in &all_for_insert {
563                ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
564            }
565            ins
566        })
567        .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
568
569    if schema_evolution {
570        merge_builder = merge_builder.with_merge_schema(true);
571    }
572
573    let (table, metrics) = merge_builder
574        .await
575        .map_err(|e| ConnectorError::WriteError(format!("Delta Lake MERGE failed: {e}")))?;
576
577    let result = MergeResult {
578        rows_inserted: metrics.num_target_rows_inserted,
579        rows_updated: metrics.num_target_rows_updated,
580        rows_deleted: metrics.num_target_rows_deleted,
581    };
582
583    info!(
584        writer_id,
585        epoch,
586        rows_inserted = result.rows_inserted,
587        rows_updated = result.rows_updated,
588        rows_deleted = result.rows_deleted,
589        "Delta Lake MERGE complete"
590    );
591
592    Ok((table, result))
593}
594
595/// Performs a DELETE-by-merge for rows to be removed from the target table.
596///
597/// Matches source (delete) rows by `key_columns` and deletes matching target rows.
598///
599/// # Errors
600///
601/// Returns `ConnectorError::WriteError` if the operation fails.
602#[cfg(feature = "delta-lake")]
603pub async fn delete_by_merge(
604    table: DeltaTable,
605    delete_batch: RecordBatch,
606    key_columns: &[String],
607    writer_id: &str,
608    epoch: u64,
609) -> Result<(DeltaTable, usize), ConnectorError> {
610    use datafusion::prelude::*;
611    use deltalake::kernel::transaction::CommitProperties;
612    use deltalake::kernel::Transaction;
613
614    if delete_batch.num_rows() == 0 {
615        return Ok((table, 0));
616    }
617
618    debug!(
619        delete_rows = delete_batch.num_rows(),
620        "performing Delta Lake delete-by-merge"
621    );
622
623    let ctx = SessionContext::new();
624    let source_df = ctx.read_batch(delete_batch).map_err(|e| {
625        ConnectorError::WriteError(format!("failed to create delete DataFrame: {e}"))
626    })?;
627
628    let predicate = key_columns
629        .iter()
630        .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
631        .reduce(Expr::and)
632        .ok_or_else(|| {
633            ConnectorError::ConfigurationError("delete requires at least one key column".into())
634        })?;
635
636    #[allow(clippy::cast_possible_wrap)]
637    let epoch_i64 = epoch as i64;
638
639    let (table, metrics) = table
640        .merge(source_df, predicate)
641        .with_source_alias("source")
642        .with_target_alias("target")
643        .with_commit_properties(
644            CommitProperties::default()
645                .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
646        )
647        .when_matched_delete(|delete| delete)
648        .map_err(|e| ConnectorError::WriteError(format!("delete-merge setup failed: {e}")))?
649        .await
650        .map_err(|e| ConnectorError::WriteError(format!("Delta Lake delete-merge failed: {e}")))?;
651
652    let rows_deleted = metrics.num_target_rows_deleted;
653    info!(rows_deleted, "Delta Lake delete-by-merge complete");
654
655    Ok((table, rows_deleted))
656}
657
658/// Atomic changelog MERGE: inserts, updates, and deletes in one Delta commit.
659///
660/// The source batch must contain an `_op` column (Utf8) with values:
661/// - `"I"`, `"U"`, `"r"` → upsert (update if matched, insert if not)
662/// - `"D"` → delete matched rows
663///
664/// Columns prefixed with `_` are excluded from SET clauses but remain
665/// in the source `DataFrame` for predicate filtering.
666///
667/// # Errors
668///
669/// Returns `ConnectorError::WriteError` if the merge fails.
670#[cfg(feature = "delta-lake")]
671#[allow(clippy::too_many_lines)]
672pub async fn merge_changelog(
673    table: DeltaTable,
674    source_batch: RecordBatch,
675    key_columns: &[String],
676    writer_id: &str,
677    epoch: u64,
678    schema_evolution: bool,
679) -> Result<(DeltaTable, MergeResult), ConnectorError> {
680    use datafusion::prelude::*;
681    use deltalake::kernel::transaction::CommitProperties;
682    use deltalake::kernel::Transaction;
683
684    const CDC_COLUMNS: &[&str] = &["_op", "_ts_ms"];
685
686    if source_batch.num_rows() == 0 {
687        return Ok((
688            table,
689            MergeResult {
690                rows_inserted: 0,
691                rows_updated: 0,
692                rows_deleted: 0,
693            },
694        ));
695    }
696
697    debug!(
698        key_columns = ?key_columns,
699        source_rows = source_batch.num_rows(),
700        "performing atomic changelog MERGE"
701    );
702
703    let ctx = SessionContext::new();
704    let source_df = ctx.read_batch(source_batch).map_err(|e| {
705        ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
706    })?;
707
708    // Join predicate: target.k1 = source.k1 AND ...
709    let predicate = key_columns
710        .iter()
711        .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
712        .reduce(Expr::and)
713        .ok_or_else(|| {
714            ConnectorError::ConfigurationError("merge requires at least one key column".into())
715        })?;
716
717    let source_schema = source_df.schema().clone();
718    let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
719
720    // Exclude CDC metadata columns from SET clauses (preserve user columns like _id).
721    let all_user_columns: Vec<String> = source_schema
722        .fields()
723        .iter()
724        .map(|f| f.name().clone())
725        .filter(|name| !CDC_COLUMNS.contains(&name.as_str()))
726        .collect();
727
728    let non_key_user_columns: Vec<String> = all_user_columns
729        .iter()
730        .filter(|c| !key_set.contains(c.as_str()))
731        .cloned()
732        .collect();
733
734    // Predicates for conditional clause execution.
735    let upsert_pred = col("source._op").in_list(vec![lit("I"), lit("U"), lit("r")], false);
736    let delete_pred = col("source._op").eq(lit("D"));
737
738    #[allow(clippy::cast_possible_wrap)]
739    let epoch_i64 = epoch as i64;
740
741    let non_key_for_update = non_key_user_columns;
742    let all_for_insert = all_user_columns;
743
744    let mut merge_builder = table
745        .merge(source_df, predicate)
746        .with_source_alias("source")
747        .with_target_alias("target")
748        .with_commit_properties(
749            CommitProperties::default()
750                .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
751        )
752        .when_matched_update(|update| {
753            let mut u = update.predicate(upsert_pred.clone());
754            for col_name in &non_key_for_update {
755                u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
756            }
757            u
758        })
759        .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
760        .when_matched_delete(|delete| delete.predicate(delete_pred))
761        .map_err(|e| ConnectorError::WriteError(format!("merge matched-delete failed: {e}")))?
762        .when_not_matched_insert(|insert| {
763            let mut ins = insert.predicate(upsert_pred);
764            for col_name in &all_for_insert {
765                ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
766            }
767            ins
768        })
769        .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
770
771    if schema_evolution {
772        merge_builder = merge_builder.with_merge_schema(true);
773    }
774
775    let (table, metrics) = merge_builder.await.map_err(|e| {
776        ConnectorError::WriteError(format!("Delta Lake changelog MERGE failed: {e}"))
777    })?;
778
779    let result = MergeResult {
780        rows_inserted: metrics.num_target_rows_inserted,
781        rows_updated: metrics.num_target_rows_updated,
782        rows_deleted: metrics.num_target_rows_deleted,
783    };
784
785    info!(
786        writer_id,
787        epoch,
788        rows_inserted = result.rows_inserted,
789        rows_updated = result.rows_updated,
790        rows_deleted = result.rows_deleted,
791        "Delta Lake changelog MERGE complete"
792    );
793
794    Ok((table, result))
795}
796
797/// Result of a compaction (OPTIMIZE) operation.
798#[cfg(feature = "delta-lake")]
799#[derive(Debug)]
800pub struct CompactionResult {
801    /// Number of new optimized files written.
802    pub files_added: u64,
803    /// Number of small files removed.
804    pub files_removed: u64,
805    /// Number of partitions that were optimized.
806    pub partitions_optimized: u64,
807}
808
809/// Runs an OPTIMIZE compaction on a Delta Lake table.
810///
811/// Compacts small Parquet files into larger ones (target size), optionally
812/// applying Z-ORDER clustering.
813///
814/// # Errors
815///
816/// Returns `ConnectorError::Internal` if the operation fails.
817#[cfg(feature = "delta-lake")]
818pub async fn run_compaction(
819    table: DeltaTable,
820    target_file_size: u64,
821    z_order_columns: &[String],
822) -> Result<(DeltaTable, CompactionResult), ConnectorError> {
823    use deltalake::operations::optimize::OptimizeType;
824
825    info!(target_file_size, "running Delta Lake compaction (OPTIMIZE)");
826
827    let optimize_type = if z_order_columns.is_empty() {
828        OptimizeType::Compact
829    } else {
830        OptimizeType::ZOrder(z_order_columns.to_vec())
831    };
832
833    let (table, metrics) = table
834        .optimize()
835        .with_type(optimize_type)
836        .with_target_size(target_file_size)
837        .await
838        .map_err(|e| ConnectorError::Internal(format!("compaction failed: {e}")))?;
839
840    let result = CompactionResult {
841        files_added: metrics.num_files_added,
842        files_removed: metrics.num_files_removed,
843        partitions_optimized: metrics.partitions_optimized,
844    };
845
846    info!(
847        files_added = result.files_added,
848        files_removed = result.files_removed,
849        partitions_optimized = result.partitions_optimized,
850        "Delta Lake compaction complete"
851    );
852
853    Ok((table, result))
854}
855
856/// Runs VACUUM on a Delta Lake table, deleting old unreferenced files.
857///
858/// # Errors
859///
860/// Returns `ConnectorError::Internal` if the operation fails.
861#[cfg(feature = "delta-lake")]
862pub async fn run_vacuum(
863    table: DeltaTable,
864    retention: std::time::Duration,
865) -> Result<(DeltaTable, usize), ConnectorError> {
866    let retention_hours = retention.as_secs() / 3600;
867    info!(retention_hours, "running Delta Lake VACUUM");
868
869    let chrono_duration =
870        chrono::Duration::from_std(retention).unwrap_or_else(|_| chrono::Duration::hours(168)); // fallback: 7 days
871
872    let (table, metrics) = table
873        .vacuum()
874        .with_retention_period(chrono_duration)
875        .with_enforce_retention_duration(false)
876        .await
877        .map_err(|e| ConnectorError::Internal(format!("vacuum failed: {e}")))?;
878
879    let files_deleted = metrics.files_deleted.len();
880
881    info!(files_deleted, "Delta Lake VACUUM complete");
882
883    Ok((table, files_deleted))
884}
885
886/// Resolves catalog-aware table URI and merges catalog-specific storage options.
887///
888/// - `None`: returns table path and storage options as-is.
889/// - `Glue`: calls AWS Glue API to resolve the table's S3 location.
890/// - `Unity`: injects workspace URL and access token into storage options.
891///
892/// # Errors
893///
894/// Returns `ConnectorError` if catalog resolution fails.
895#[cfg(feature = "delta-lake")]
896#[allow(clippy::implicit_hasher, clippy::unused_async)]
897pub async fn resolve_catalog_options(
898    catalog: &super::delta_config::DeltaCatalogType,
899    #[allow(unused_variables)] catalog_database: Option<&str>,
900    #[allow(unused_variables)] catalog_name: Option<&str>,
901    _catalog_schema: Option<&str>,
902    table_path: &str,
903    base_storage_options: &HashMap<String, String>,
904    catalog_properties: &HashMap<String, String>,
905) -> Result<(String, HashMap<String, String>), ConnectorError> {
906    use super::delta_config::DeltaCatalogType;
907
908    match catalog {
909        DeltaCatalogType::None => Ok((table_path.to_string(), base_storage_options.clone())),
910        #[cfg(feature = "delta-lake-glue")]
911        DeltaCatalogType::Glue => {
912            use deltalake::DataCatalog;
913            let database = catalog_database.ok_or_else(|| {
914                ConnectorError::ConfigurationError(
915                    "Glue catalog requires 'catalog.database'".into(),
916                )
917            })?;
918            let glue = deltalake_catalog_glue::GlueDataCatalog::from_env()
919                .await
920                .map_err(|e| {
921                    ConnectorError::ConnectionFailed(format!("failed to init Glue catalog: {e}"))
922                })?;
923            let resolved = glue
924                .get_table_storage_location(catalog_name.map(String::from), database, table_path)
925                .await
926                .map_err(|e| {
927                    ConnectorError::ConnectionFailed(format!(
928                        "Glue catalog lookup failed for '{database}.{table_path}': {e}"
929                    ))
930                })?;
931            info!(
932                glue_database = database,
933                table = table_path,
934                resolved_path = %resolved,
935                "resolved table path via Glue catalog"
936            );
937            let mut opts = base_storage_options.clone();
938            opts.extend(
939                catalog_properties
940                    .iter()
941                    .map(|(k, v)| (k.clone(), v.clone())),
942            );
943            Ok((resolved, opts))
944        }
945        #[cfg(not(feature = "delta-lake-glue"))]
946        DeltaCatalogType::Glue => Err(ConnectorError::ConfigurationError(
947            "Glue catalog requires the 'delta-lake-glue' feature. \
948             Build with: cargo build --features delta-lake-glue"
949                .into(),
950        )),
951        #[cfg(feature = "delta-lake-unity")]
952        DeltaCatalogType::Unity {
953            workspace_url,
954            access_token,
955        } => {
956            // The deltalake-catalog-unity crate auto-registers a factory for
957            // uc:// URIs via #[ctor]. It reads `databricks_host` and
958            // `databricks_token` from storage options (or DATABRICKS_HOST /
959            // DATABRICKS_TOKEN env vars). We inject from config so users can
960            // specify credentials in TOML instead of env vars.
961            let mut opts = base_storage_options.clone();
962            opts.extend(
963                catalog_properties
964                    .iter()
965                    .map(|(k, v)| (k.clone(), v.clone())),
966            );
967            if !workspace_url.is_empty() {
968                opts.insert("databricks_host".to_string(), workspace_url.clone());
969            }
970            if !access_token.is_empty() {
971                opts.insert("databricks_token".to_string(), access_token.clone());
972            }
973            Ok((table_path.to_string(), opts))
974        }
975        #[cfg(not(feature = "delta-lake-unity"))]
976        DeltaCatalogType::Unity { .. } => Err(ConnectorError::ConfigurationError(
977            "Unity catalog requires the 'delta-lake-unity' feature. \
978             Build with: cargo build --features delta-lake-unity"
979                .into(),
980        )),
981    }
982}
983
984// ============================================================================
985// Integration tests (require delta-lake feature)
986// ============================================================================
987
988#[cfg(all(test, feature = "delta-lake"))]
989mod tests {
990    use super::*;
991    use arrow_array::{Float64Array, Int64Array, StringArray};
992    use arrow_schema::{DataType, Field, Schema};
993    use std::sync::Arc;
994    use tempfile::TempDir;
995
996    fn test_schema() -> SchemaRef {
997        Arc::new(Schema::new(vec![
998            Field::new("id", DataType::Int64, false),
999            Field::new("name", DataType::Utf8, true),
1000            Field::new("value", DataType::Float64, true),
1001        ]))
1002    }
1003
1004    fn test_batch(n: usize) -> RecordBatch {
1005        let ids: Vec<i64> = (0..n as i64).collect();
1006        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1007        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1008
1009        RecordBatch::try_new(
1010            test_schema(),
1011            vec![
1012                Arc::new(Int64Array::from(ids)),
1013                Arc::new(StringArray::from(names)),
1014                Arc::new(Float64Array::from(values)),
1015            ],
1016        )
1017        .unwrap()
1018    }
1019
1020    #[tokio::test]
1021    async fn test_open_creates_table() {
1022        let temp_dir = TempDir::new().unwrap();
1023        let table_path = temp_dir.path().to_str().unwrap();
1024
1025        // Open with schema should create the table.
1026        let schema = test_schema();
1027        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1028            .await
1029            .unwrap();
1030
1031        assert_eq!(table.version(), Some(0));
1032
1033        // Verify _delta_log directory was created.
1034        let delta_log = temp_dir.path().join("_delta_log");
1035        assert!(delta_log.exists(), "_delta_log directory should exist");
1036    }
1037
1038    #[tokio::test]
1039    async fn test_open_existing_table() {
1040        let temp_dir = TempDir::new().unwrap();
1041        let table_path = temp_dir.path().to_str().unwrap();
1042
1043        // Create the table.
1044        let schema = test_schema();
1045        let _ = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1046            .await
1047            .unwrap();
1048
1049        // Reopen without schema - should work.
1050        let table = open_or_create_table(table_path, HashMap::new(), None)
1051            .await
1052            .unwrap();
1053
1054        assert_eq!(table.version(), Some(0));
1055    }
1056
1057    #[tokio::test]
1058    async fn test_open_nonexistent_without_schema_fails() {
1059        let temp_dir = TempDir::new().unwrap();
1060        // Create the directory so the path exists, but it's not a Delta table.
1061        let nonexistent_table = temp_dir.path().join("nonexistent");
1062        std::fs::create_dir_all(&nonexistent_table).unwrap();
1063        let table_path = nonexistent_table.to_str().unwrap();
1064
1065        // Open without schema when table doesn't exist should fail.
1066        let result = open_or_create_table(table_path, HashMap::new(), None).await;
1067        assert!(result.is_err());
1068        let err = result.unwrap_err().to_string();
1069        assert!(err.contains("schema"), "error should mention schema: {err}");
1070    }
1071
1072    #[tokio::test]
1073    async fn test_write_batch_creates_parquet() {
1074        let temp_dir = TempDir::new().unwrap();
1075        let table_path = temp_dir.path().to_str().unwrap();
1076
1077        // Create table.
1078        let schema = test_schema();
1079        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1080            .await
1081            .unwrap();
1082
1083        // Write a batch.
1084        let batch = test_batch(100);
1085        let (table, version) = write_batches(
1086            table,
1087            vec![batch],
1088            "test-writer",
1089            1,
1090            SaveMode::Append,
1091            None,
1092            false,
1093        )
1094        .await
1095        .unwrap();
1096
1097        assert_eq!(version, 1);
1098        assert_eq!(table.version(), Some(1));
1099
1100        // Verify Parquet files were created (in the table directory).
1101        let parquet_files: Vec<_> = std::fs::read_dir(temp_dir.path())
1102            .unwrap()
1103            .filter_map(Result::ok)
1104            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1105            .collect();
1106
1107        assert!(
1108            !parquet_files.is_empty(),
1109            "should have created Parquet files"
1110        );
1111    }
1112
1113    #[tokio::test]
1114    async fn test_exactly_once_epoch_skip() {
1115        let temp_dir = TempDir::new().unwrap();
1116        let table_path = temp_dir.path().to_str().unwrap();
1117        let writer_id = "exactly-once-writer";
1118
1119        // Create table and write epoch 1.
1120        let schema = test_schema();
1121        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1122            .await
1123            .unwrap();
1124
1125        let batch = test_batch(10);
1126        let (table, _) = write_batches(
1127            table,
1128            vec![batch.clone()],
1129            writer_id,
1130            1,
1131            SaveMode::Append,
1132            None,
1133            false,
1134        )
1135        .await
1136        .unwrap();
1137
1138        // Check last committed epoch.
1139        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1140        assert_eq!(last_epoch, 1);
1141
1142        // Simulate recovery: reopen table.
1143        let reopened_table = open_or_create_table(table_path, HashMap::new(), None)
1144            .await
1145            .unwrap();
1146
1147        // Verify we can read the last committed epoch.
1148        let recovered_epoch = get_last_committed_epoch(&reopened_table, writer_id).await;
1149        assert_eq!(recovered_epoch, 1);
1150    }
1151
1152    #[tokio::test]
1153    async fn test_multiple_epochs_sequential() {
1154        let temp_dir = TempDir::new().unwrap();
1155        let table_path = temp_dir.path().to_str().unwrap();
1156        let writer_id = "sequential-writer";
1157
1158        let schema = test_schema();
1159        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1160            .await
1161            .unwrap();
1162
1163        // Write epochs 1, 2, 3.
1164        for epoch in 1..=3 {
1165            let batch = test_batch(10);
1166            let result = write_batches(
1167                table,
1168                vec![batch],
1169                writer_id,
1170                epoch,
1171                SaveMode::Append,
1172                None,
1173                false,
1174            )
1175            .await
1176            .unwrap();
1177            table = result.0;
1178            assert_eq!(result.1, epoch as i64);
1179        }
1180
1181        // Final version should be 3.
1182        assert_eq!(table.version(), Some(3));
1183
1184        // Last committed epoch should be 3.
1185        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1186        assert_eq!(last_epoch, 3);
1187    }
1188
1189    #[tokio::test]
1190    async fn test_get_table_schema() {
1191        let temp_dir = TempDir::new().unwrap();
1192        let table_path = temp_dir.path().to_str().unwrap();
1193
1194        let expected_schema = test_schema();
1195        let table = open_or_create_table(table_path, HashMap::new(), Some(&expected_schema))
1196            .await
1197            .unwrap();
1198
1199        let actual_schema = get_table_schema(&table).unwrap();
1200
1201        // Verify field count and names match.
1202        assert_eq!(actual_schema.fields().len(), expected_schema.fields().len());
1203        for (expected, actual) in expected_schema.fields().iter().zip(actual_schema.fields()) {
1204            assert_eq!(expected.name(), actual.name());
1205        }
1206    }
1207
1208    #[tokio::test]
1209    async fn test_write_empty_batches() {
1210        let temp_dir = TempDir::new().unwrap();
1211        let table_path = temp_dir.path().to_str().unwrap();
1212
1213        let schema = test_schema();
1214        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1215            .await
1216            .unwrap();
1217
1218        // Write empty batch list - should be no-op.
1219        let (table, version) = write_batches(
1220            table,
1221            vec![],
1222            "test-writer",
1223            1,
1224            SaveMode::Append,
1225            None,
1226            false,
1227        )
1228        .await
1229        .unwrap();
1230
1231        // Version should still be 0 (no write happened).
1232        assert_eq!(version, 0);
1233        assert_eq!(table.version(), Some(0));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_write_multiple_batches() {
1238        // Test writing multiple batches in a single transaction.
1239        let temp_dir = TempDir::new().unwrap();
1240        let table_path = temp_dir.path().to_str().unwrap();
1241
1242        let schema = test_schema();
1243        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1244            .await
1245            .unwrap();
1246
1247        // Write multiple batches.
1248        let batch1 = test_batch(50);
1249        let batch2 = test_batch(50);
1250        let (table, version) = write_batches(
1251            table,
1252            vec![batch1, batch2],
1253            "multi-batch-writer",
1254            1,
1255            SaveMode::Append,
1256            None,
1257            false,
1258        )
1259        .await
1260        .unwrap();
1261
1262        assert_eq!(version, 1);
1263        assert_eq!(table.version(), Some(1));
1264
1265        // Reopen and verify we can read the state.
1266        let reopened = open_or_create_table(table_path, HashMap::new(), None)
1267            .await
1268            .unwrap();
1269        assert_eq!(reopened.version(), Some(1));
1270    }
1271
1272    #[test]
1273    fn test_path_to_url_local() {
1274        let temp_dir = TempDir::new().unwrap();
1275        let path = temp_dir.path().to_str().unwrap();
1276
1277        let url = path_to_url(path).unwrap();
1278        assert!(url.scheme() == "file");
1279    }
1280
1281    #[test]
1282    fn test_path_to_url_s3() {
1283        let url = path_to_url("s3://my-bucket/path/to/table").unwrap();
1284        assert_eq!(url.scheme(), "s3");
1285        assert_eq!(url.host_str(), Some("my-bucket"));
1286    }
1287
1288    #[test]
1289    fn test_path_to_url_azure() {
1290        let url = path_to_url("az://my-container/path/to/table").unwrap();
1291        assert_eq!(url.scheme(), "az");
1292    }
1293
1294    #[test]
1295    fn test_path_to_url_gcs() {
1296        let url = path_to_url("gs://my-bucket/path/to/table").unwrap();
1297        assert_eq!(url.scheme(), "gs");
1298    }
1299
1300    // ── End-to-end tests for new functionality ──
1301
1302    #[tokio::test]
1303    async fn test_get_latest_version() {
1304        let temp_dir = TempDir::new().unwrap();
1305        let table_path = temp_dir.path().to_str().unwrap();
1306
1307        let schema = test_schema();
1308        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1309            .await
1310            .unwrap();
1311
1312        // Initial version is 0.
1313        let v = get_latest_version(&mut table).await.unwrap();
1314        assert_eq!(v, 0);
1315
1316        // Write a batch -> version 1.
1317        let batch = test_batch(10);
1318        let (returned_table, version) = write_batches(
1319            table,
1320            vec![batch],
1321            "writer",
1322            1,
1323            SaveMode::Append,
1324            None,
1325            false,
1326        )
1327        .await
1328        .unwrap();
1329        assert_eq!(version, 1);
1330        table = returned_table;
1331
1332        let v = get_latest_version(&mut table).await.unwrap();
1333        assert_eq!(v, 1);
1334    }
1335
1336    #[tokio::test]
1337    async fn test_read_batches_at_version() {
1338        let temp_dir = TempDir::new().unwrap();
1339        let table_path = temp_dir.path().to_str().unwrap();
1340
1341        let schema = test_schema();
1342        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1343            .await
1344            .unwrap();
1345
1346        // Write 50 rows at version 1.
1347        let batch = test_batch(50);
1348        let (table, _) = write_batches(
1349            table,
1350            vec![batch],
1351            "writer",
1352            1,
1353            SaveMode::Append,
1354            None,
1355            false,
1356        )
1357        .await
1358        .unwrap();
1359
1360        // Write 30 more rows at version 2.
1361        let batch = test_batch(30);
1362        let (_table, _) = write_batches(
1363            table,
1364            vec![batch],
1365            "writer",
1366            2,
1367            SaveMode::Append,
1368            None,
1369            false,
1370        )
1371        .await
1372        .unwrap();
1373
1374        // Read version 1 — should get 50 rows.
1375        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1376            .await
1377            .unwrap();
1378        let batches = read_batches_at_version(&mut read_table, 1, 10000)
1379            .await
1380            .unwrap();
1381        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1382        assert_eq!(total_rows, 50);
1383
1384        // Read version 2 — should get 80 rows (cumulative).
1385        let batches = read_batches_at_version(&mut read_table, 2, 10000)
1386            .await
1387            .unwrap();
1388        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1389        assert_eq!(total_rows, 80);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_sink_source_roundtrip() {
1394        use super::super::delta::DeltaLakeSink;
1395        use super::super::delta_config::DeltaLakeSinkConfig;
1396        use super::super::delta_source::DeltaSource;
1397        use super::super::delta_source_config::DeltaSourceConfig;
1398        use crate::config::ConnectorConfig;
1399        use crate::connector::{SinkConnector, SourceConnector};
1400
1401        let temp_dir = TempDir::new().unwrap();
1402        let table_path = temp_dir.path().to_str().unwrap();
1403
1404        // Write data via sink.
1405        let sink_config = DeltaLakeSinkConfig::new(table_path);
1406        let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1407        let connector_config = ConnectorConfig::new("delta-lake");
1408        sink.open(&connector_config).await.unwrap();
1409
1410        sink.begin_epoch(1).await.unwrap();
1411        let batch = test_batch(25);
1412        sink.write_batch(&batch).await.unwrap();
1413        sink.pre_commit(1).await.unwrap();
1414        sink.commit_epoch(1).await.unwrap();
1415        sink.close().await.unwrap();
1416
1417        // Read data via source.
1418        let mut source_config = DeltaSourceConfig::new(table_path);
1419        source_config.starting_version = Some(0);
1420        let mut source = DeltaSource::new(source_config);
1421        let source_connector_config = ConnectorConfig::new("delta-lake");
1422        source.open(&source_connector_config).await.unwrap();
1423
1424        // Poll — should get version 1 data (25 rows).
1425        let result = source.poll_batch(10000).await.unwrap();
1426        assert!(result.is_some(), "should have received a batch");
1427        let total_rows: usize = {
1428            let mut rows = result.unwrap().records.num_rows();
1429            // Drain any remaining buffered batches.
1430            while let Ok(Some(batch)) = source.poll_batch(10000).await {
1431                rows += batch.records.num_rows();
1432            }
1433            rows
1434        };
1435        assert_eq!(total_rows, 25);
1436
1437        source.close().await.unwrap();
1438    }
1439
1440    #[tokio::test]
1441    async fn test_source_checkpoint_restore() {
1442        use super::super::delta_source::DeltaSource;
1443        use super::super::delta_source_config::DeltaSourceConfig;
1444        use crate::config::ConnectorConfig;
1445        use crate::connector::SourceConnector;
1446
1447        let temp_dir = TempDir::new().unwrap();
1448        let table_path = temp_dir.path().to_str().unwrap();
1449
1450        // Create table and write 2 versions.
1451        let schema = test_schema();
1452        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1453            .await
1454            .unwrap();
1455
1456        let (table, _) = write_batches(
1457            table,
1458            vec![test_batch(10)],
1459            "writer",
1460            1,
1461            SaveMode::Append,
1462            None,
1463            false,
1464        )
1465        .await
1466        .unwrap();
1467        let (_table, _) = write_batches(
1468            table,
1469            vec![test_batch(20)],
1470            "writer",
1471            2,
1472            SaveMode::Append,
1473            None,
1474            false,
1475        )
1476        .await
1477        .unwrap();
1478
1479        // Open source starting from version 0. The source jumps to the
1480        // latest version (2) in a single poll, reading the full snapshot.
1481        let mut source_config = DeltaSourceConfig::new(table_path);
1482        source_config.starting_version = Some(0);
1483        let mut source = DeltaSource::new(source_config.clone());
1484        let connector_config = ConnectorConfig::new("delta-lake");
1485        source.open(&connector_config).await.unwrap();
1486
1487        // Poll to consume latest version (2).
1488        let _ = source.poll_batch(10000).await.unwrap();
1489        // Drain buffered.
1490        while let Ok(Some(_)) = source.poll_batch(10000).await {}
1491
1492        // Checkpoint reflects the fully-consumed latest version.
1493        let cp = source.checkpoint();
1494        assert_eq!(cp.get_offset("delta_version"), Some("2"));
1495        source.close().await.unwrap();
1496
1497        // Restore from checkpoint — should resume at version 2.
1498        let mut source2 = DeltaSource::new(source_config);
1499        source2.open(&connector_config).await.unwrap();
1500        source2.restore(&cp).await.unwrap();
1501
1502        assert_eq!(source2.current_version(), 2);
1503
1504        // No new data — already at latest.
1505        let result = source2.poll_batch(10000).await.unwrap();
1506        assert!(result.is_none());
1507
1508        source2.close().await.unwrap();
1509    }
1510
1511    #[tokio::test]
1512    async fn test_auto_flush_writes_data() {
1513        use super::super::delta::DeltaLakeSink;
1514        use super::super::delta_config::DeltaLakeSinkConfig;
1515        use crate::config::ConnectorConfig;
1516        use crate::connector::SinkConnector;
1517
1518        let temp_dir = TempDir::new().unwrap();
1519        let table_path = temp_dir.path().to_str().unwrap();
1520
1521        // Configure a small buffer to trigger auto-flush.
1522        let mut sink_config = DeltaLakeSinkConfig::new(table_path);
1523        sink_config.max_buffer_records = 10;
1524        let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1525
1526        let connector_config = ConnectorConfig::new("delta-lake");
1527        sink.open(&connector_config).await.unwrap();
1528
1529        sink.begin_epoch(1).await.unwrap();
1530
1531        // Write 25 rows — should trigger auto-flush after 10.
1532        let batch = test_batch(25);
1533        sink.write_batch(&batch).await.unwrap();
1534
1535        // Commit the rest.
1536        sink.pre_commit(1).await.unwrap();
1537        sink.commit_epoch(1).await.unwrap();
1538        sink.close().await.unwrap();
1539
1540        // Verify all 25 rows are in the Delta table.
1541        let mut table = open_or_create_table(table_path, HashMap::new(), None)
1542            .await
1543            .unwrap();
1544        let latest = get_latest_version(&mut table).await.unwrap();
1545        assert!(latest >= 1, "should have at least 1 version");
1546
1547        let batches = read_batches_at_version(&mut table, latest, 10000)
1548            .await
1549            .unwrap();
1550        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1551        assert_eq!(
1552            total_rows, 25,
1553            "all 25 rows should be written, not dropped by auto-flush"
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn test_sink_exactly_once_epoch() {
1559        let temp_dir = TempDir::new().unwrap();
1560        let table_path = temp_dir.path().to_str().unwrap();
1561        let writer_id = "exactly-once-test";
1562
1563        let schema = test_schema();
1564        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1565            .await
1566            .unwrap();
1567
1568        // Write epoch 1 with 10 rows.
1569        let (table, v1) = write_batches(
1570            table,
1571            vec![test_batch(10)],
1572            writer_id,
1573            1,
1574            SaveMode::Append,
1575            None,
1576            false,
1577        )
1578        .await
1579        .unwrap();
1580        assert_eq!(v1, 1);
1581
1582        // Write epoch 2 with 15 rows using the same writer.
1583        let (table, v2) = write_batches(
1584            table,
1585            vec![test_batch(15)],
1586            writer_id,
1587            2,
1588            SaveMode::Append,
1589            None,
1590            false,
1591        )
1592        .await
1593        .unwrap();
1594        assert_eq!(v2, 2);
1595
1596        // Verify the last committed epoch is 2.
1597        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1598        assert_eq!(last_epoch, 2);
1599
1600        // Verify total rows = 25 (10 + 15).
1601        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1602            .await
1603            .unwrap();
1604        let batches = read_batches_at_version(&mut read_table, 2, 10000)
1605            .await
1606            .unwrap();
1607        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1608        assert_eq!(total_rows, 25);
1609    }
1610
1611    #[tokio::test]
1612    async fn test_schema_evolution_adds_column() {
1613        let temp_dir = TempDir::new().unwrap();
1614        let table_path = temp_dir.path().to_str().unwrap();
1615
1616        // Create table with 2-column schema.
1617        let schema_v1 = Arc::new(Schema::new(vec![
1618            Field::new("id", DataType::Int64, false),
1619            Field::new("name", DataType::Utf8, true),
1620        ]));
1621        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema_v1))
1622            .await
1623            .unwrap();
1624
1625        // Write batch with 2 columns.
1626        let batch_v1 = RecordBatch::try_new(
1627            schema_v1.clone(),
1628            vec![
1629                Arc::new(Int64Array::from(vec![1, 2])),
1630                Arc::new(StringArray::from(vec!["a", "b"])),
1631            ],
1632        )
1633        .unwrap();
1634        let (table, _) = write_batches(
1635            table,
1636            vec![batch_v1],
1637            "evo-writer",
1638            1,
1639            SaveMode::Append,
1640            None,
1641            true, // schema_evolution enabled
1642        )
1643        .await
1644        .unwrap();
1645
1646        // Write batch with 3 columns (extra "score" column).
1647        let schema_v2 = Arc::new(Schema::new(vec![
1648            Field::new("id", DataType::Int64, false),
1649            Field::new("name", DataType::Utf8, true),
1650            Field::new("score", DataType::Float64, true),
1651        ]));
1652        let batch_v2 = RecordBatch::try_new(
1653            schema_v2,
1654            vec![
1655                Arc::new(Int64Array::from(vec![3])),
1656                Arc::new(StringArray::from(vec!["c"])),
1657                Arc::new(Float64Array::from(vec![99.5])),
1658            ],
1659        )
1660        .unwrap();
1661        let (table, _) = write_batches(
1662            table,
1663            vec![batch_v2],
1664            "evo-writer",
1665            2,
1666            SaveMode::Append,
1667            None,
1668            true,
1669        )
1670        .await
1671        .unwrap();
1672
1673        // Verify table schema now has all 3 columns.
1674        let final_schema = get_table_schema(&table).unwrap();
1675        assert_eq!(final_schema.fields().len(), 3);
1676        assert_eq!(final_schema.field(0).name(), "id");
1677        assert_eq!(final_schema.field(1).name(), "name");
1678        assert_eq!(final_schema.field(2).name(), "score");
1679
1680        // Verify all rows are readable.
1681        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1682            .await
1683            .unwrap();
1684        let batches = read_batches_at_version(&mut read_table, 2, 10000)
1685            .await
1686            .unwrap();
1687        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1688        assert_eq!(total_rows, 3);
1689    }
1690
1691    #[tokio::test]
1692    async fn test_compaction_reduces_files() {
1693        let temp_dir = TempDir::new().unwrap();
1694        let table_path = temp_dir.path().to_str().unwrap();
1695
1696        let schema = test_schema();
1697        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1698            .await
1699            .unwrap();
1700
1701        // Write 10 small batches (1 file each = 10 versions).
1702        for epoch in 1..=10u64 {
1703            let batch = test_batch(5);
1704            let (t, _) = write_batches(
1705                table,
1706                vec![batch],
1707                "compaction-writer",
1708                epoch,
1709                SaveMode::Append,
1710                None,
1711                false,
1712            )
1713            .await
1714            .unwrap();
1715            table = t;
1716        }
1717
1718        // Count Parquet files before compaction.
1719        let parquet_before: Vec<_> = std::fs::read_dir(temp_dir.path())
1720            .unwrap()
1721            .filter_map(Result::ok)
1722            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1723            .collect();
1724        assert!(
1725            parquet_before.len() >= 10,
1726            "should have at least 10 Parquet files before compaction, got {}",
1727            parquet_before.len()
1728        );
1729
1730        // Run compaction.
1731        let (table, result) = run_compaction(table, 128 * 1024 * 1024, &[]).await.unwrap();
1732        assert!(
1733            result.files_removed > 0,
1734            "compaction should have removed files"
1735        );
1736
1737        // Run vacuum to physically delete old files.
1738        let (_table, files_deleted) = run_vacuum(table, std::time::Duration::from_secs(0))
1739            .await
1740            .unwrap();
1741
1742        // Verify fewer Parquet files remain.
1743        let parquet_after: Vec<_> = std::fs::read_dir(temp_dir.path())
1744            .unwrap()
1745            .filter_map(Result::ok)
1746            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1747            .collect();
1748        assert!(
1749            parquet_after.len() < parquet_before.len(),
1750            "should have fewer files after compaction+vacuum: before={}, after={}, vacuumed={}",
1751            parquet_before.len(),
1752            parquet_after.len(),
1753            files_deleted
1754        );
1755    }
1756
1757    // ── Merge (upsert) tests ──
1758
1759    #[tokio::test]
1760    async fn test_merge_insert_only() {
1761        let temp_dir = TempDir::new().unwrap();
1762        let table_path = temp_dir.path().to_str().unwrap();
1763
1764        // Create table with initial rows.
1765        let schema = test_schema();
1766        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1767            .await
1768            .unwrap();
1769        let initial = test_batch(3); // ids 0, 1, 2
1770        let (table, _) = write_batches(
1771            table,
1772            vec![initial],
1773            "merge-writer",
1774            1,
1775            SaveMode::Append,
1776            None,
1777            false,
1778        )
1779        .await
1780        .unwrap();
1781
1782        // Merge new rows (ids 10, 11) — all inserts.
1783        let source = RecordBatch::try_new(
1784            test_schema(),
1785            vec![
1786                Arc::new(Int64Array::from(vec![10, 11])),
1787                Arc::new(StringArray::from(vec!["x", "y"])),
1788                Arc::new(Float64Array::from(vec![10.0, 11.0])),
1789            ],
1790        )
1791        .unwrap();
1792
1793        let (table, result) =
1794            merge_batches(table, source, &["id".to_string()], "merge-writer", 2, false)
1795                .await
1796                .unwrap();
1797
1798        assert_eq!(result.rows_inserted, 2);
1799        assert_eq!(result.rows_updated, 0);
1800
1801        // Verify final row count = 5.
1802        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1803            .await
1804            .unwrap();
1805        let latest = get_latest_version(&mut read_table).await.unwrap();
1806        let batches = read_batches_at_version(&mut read_table, latest, 10000)
1807            .await
1808            .unwrap();
1809        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1810        assert_eq!(total_rows, 5);
1811
1812        drop(table);
1813    }
1814
1815    #[tokio::test]
1816    async fn test_merge_update() {
1817        let temp_dir = TempDir::new().unwrap();
1818        let table_path = temp_dir.path().to_str().unwrap();
1819
1820        // Create table with initial rows.
1821        let schema = test_schema();
1822        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1823            .await
1824            .unwrap();
1825        let initial = RecordBatch::try_new(
1826            test_schema(),
1827            vec![
1828                Arc::new(Int64Array::from(vec![1, 2, 3])),
1829                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1830                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
1831            ],
1832        )
1833        .unwrap();
1834        let (table, _) = write_batches(
1835            table,
1836            vec![initial],
1837            "merge-writer",
1838            1,
1839            SaveMode::Append,
1840            None,
1841            false,
1842        )
1843        .await
1844        .unwrap();
1845
1846        // Merge with updated values for existing keys.
1847        let source = RecordBatch::try_new(
1848            test_schema(),
1849            vec![
1850                Arc::new(Int64Array::from(vec![1, 2])),
1851                Arc::new(StringArray::from(vec!["a_updated", "b_updated"])),
1852                Arc::new(Float64Array::from(vec![100.0, 200.0])),
1853            ],
1854        )
1855        .unwrap();
1856
1857        let (table, result) =
1858            merge_batches(table, source, &["id".to_string()], "merge-writer", 2, false)
1859                .await
1860                .unwrap();
1861
1862        assert_eq!(result.rows_updated, 2);
1863        assert_eq!(result.rows_inserted, 0);
1864
1865        // Verify row count is still 3 (no new rows added).
1866        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1867            .await
1868            .unwrap();
1869        let latest = get_latest_version(&mut read_table).await.unwrap();
1870        let batches = read_batches_at_version(&mut read_table, latest, 10000)
1871            .await
1872            .unwrap();
1873        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1874        assert_eq!(total_rows, 3);
1875
1876        drop(table);
1877    }
1878
1879    #[tokio::test]
1880    async fn test_merge_delete() {
1881        let temp_dir = TempDir::new().unwrap();
1882        let table_path = temp_dir.path().to_str().unwrap();
1883
1884        // Create table with initial rows.
1885        let schema = test_schema();
1886        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1887            .await
1888            .unwrap();
1889        let initial = RecordBatch::try_new(
1890            test_schema(),
1891            vec![
1892                Arc::new(Int64Array::from(vec![1, 2, 3])),
1893                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1894                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
1895            ],
1896        )
1897        .unwrap();
1898        let (table, _) = write_batches(
1899            table,
1900            vec![initial],
1901            "merge-writer",
1902            1,
1903            SaveMode::Append,
1904            None,
1905            false,
1906        )
1907        .await
1908        .unwrap();
1909
1910        // Delete rows with id=1 and id=3.
1911        let delete_batch = RecordBatch::try_new(
1912            test_schema(),
1913            vec![
1914                Arc::new(Int64Array::from(vec![1, 3])),
1915                Arc::new(StringArray::from(vec!["a", "c"])),
1916                Arc::new(Float64Array::from(vec![1.0, 3.0])),
1917            ],
1918        )
1919        .unwrap();
1920
1921        let (table, rows_deleted) =
1922            delete_by_merge(table, delete_batch, &["id".to_string()], "merge-writer", 2)
1923                .await
1924                .unwrap();
1925
1926        assert_eq!(rows_deleted, 2);
1927
1928        // Verify only 1 row remains.
1929        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1930            .await
1931            .unwrap();
1932        let latest = get_latest_version(&mut read_table).await.unwrap();
1933        let batches = read_batches_at_version(&mut read_table, latest, 10000)
1934            .await
1935            .unwrap();
1936        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1937        assert_eq!(total_rows, 1);
1938
1939        drop(table);
1940    }
1941
1942    #[tokio::test]
1943    async fn test_merge_empty_batch_noop() {
1944        let temp_dir = TempDir::new().unwrap();
1945        let table_path = temp_dir.path().to_str().unwrap();
1946
1947        let schema = test_schema();
1948        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1949            .await
1950            .unwrap();
1951
1952        // Merge with empty batch should be a no-op.
1953        let empty = RecordBatch::new_empty(test_schema());
1954        let (table, result) =
1955            merge_batches(table, empty, &["id".to_string()], "merge-writer", 1, false)
1956                .await
1957                .unwrap();
1958        assert_eq!(result.rows_inserted, 0);
1959        assert_eq!(result.rows_updated, 0);
1960
1961        // Delete with empty batch should also be a no-op.
1962        let empty_del = RecordBatch::new_empty(test_schema());
1963        let (_table, deleted) =
1964            delete_by_merge(table, empty_del, &["id".to_string()], "merge-writer", 2)
1965                .await
1966                .unwrap();
1967        assert_eq!(deleted, 0);
1968    }
1969}