Skip to main content

laminar_connectors/lakehouse/
delta_io.rs

1//! Delta Lake I/O integration module.
2//!
3//! This module provides the actual I/O operations for Delta Lake tables via the
4//! `deltalake` crate. All functions are feature-gated behind `delta-lake`.
5//!
6//! # Architecture
7//!
8//! The I/O module is separate from the business logic in [`delta.rs`](super::delta)
9//! to allow:
10//! - Testing business logic without the `deltalake` dependency
11//! - Clean separation of concerns (buffering/epoch management vs. actual writes)
12//! - Easy mocking for unit tests
13//!
14//! # Exactly-Once Semantics
15//!
16//! Delta Lake's transaction log supports application-level transaction metadata
17//! via the `txn` action. We use this to store `(writer_id, epoch)` pairs, enabling
18//! exactly-once semantics:
19//!
20//! 1. On recovery, read `txn` metadata to find the last committed epoch for this writer
21//! 2. Skip epochs <= last committed (idempotent replay)
22//! 3. Each write includes the epoch in `txn` metadata
23
24#[cfg(feature = "delta-lake")]
25use std::collections::HashMap;
26
27#[cfg(feature = "delta-lake")]
28use std::sync::Arc;
29
30#[cfg(feature = "delta-lake")]
31use arrow_array::RecordBatch;
32
33#[cfg(feature = "delta-lake")]
34use arrow_schema::SchemaRef;
35
36// delta_kernel's TryIntoKernel trait is re-exported via deltalake.
37#[cfg(feature = "delta-lake")]
38use deltalake::kernel::engine::arrow_conversion::TryIntoKernel as _;
39
40#[cfg(feature = "delta-lake")]
41use deltalake::kernel::transaction::CommitProperties;
42
43#[cfg(feature = "delta-lake")]
44use deltalake::kernel::Transaction;
45
46#[cfg(feature = "delta-lake")]
47use deltalake::operations::write::SchemaMode;
48
49#[cfg(feature = "delta-lake")]
50use deltalake::protocol::SaveMode;
51
52#[cfg(feature = "delta-lake")]
53use deltalake::DeltaTable;
54
55#[cfg(feature = "delta-lake")]
56use tracing::{debug, info, warn};
57
58#[cfg(feature = "delta-lake")]
59use url::Url;
60
61#[cfg(feature = "delta-lake")]
62use crate::error::ConnectorError;
63
64/// Converts a path string to a URL.
65#[cfg(feature = "delta-lake")]
66fn path_to_url(path: &str) -> Result<Url, ConnectorError> {
67    // If it already looks like a URL, parse it directly.
68    if path.contains("://") {
69        Url::parse(path)
70            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid URL '{path}': {e}")))
71    } else {
72        // Local path - convert to file URL.
73        // First canonicalize if it exists, otherwise use as-is.
74        let path_buf = std::path::Path::new(path);
75        let normalized = if path_buf.exists() {
76            std::fs::canonicalize(path_buf).map_err(|e| {
77                ConnectorError::ConfigurationError(format!("invalid path '{path}': {e}"))
78            })?
79        } else {
80            // For new tables, the path might not exist yet.
81            // Use absolute path if possible.
82            if path_buf.is_absolute() {
83                path_buf.to_path_buf()
84            } else {
85                std::env::current_dir()
86                    .map_err(|e| {
87                        ConnectorError::ConfigurationError(format!("cannot get current dir: {e}"))
88                    })?
89                    .join(path_buf)
90            }
91        };
92
93        Url::from_directory_path(&normalized).map_err(|()| {
94            ConnectorError::ConfigurationError(format!(
95                "cannot convert path to URL: {}",
96                normalized.display()
97            ))
98        })
99    }
100}
101
102/// Opens an existing Delta Lake table or creates a new one.
103///
104/// # Arguments
105///
106/// * `table_path` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
107/// * `storage_options` - Storage credentials and configuration
108/// * `schema` - Optional Arrow schema for table creation (required if table doesn't exist)
109///
110/// # Returns
111///
112/// The opened `DeltaTable` handle.
113///
114/// # Errors
115///
116/// Returns `ConnectorError::ConnectionFailed` if the table cannot be opened or created.
117#[cfg(feature = "delta-lake")]
118#[allow(clippy::implicit_hasher)]
119pub async fn open_or_create_table(
120    table_path: &str,
121    storage_options: HashMap<String, String>,
122    schema: Option<&SchemaRef>,
123) -> Result<DeltaTable, ConnectorError> {
124    info!(table_path, "opening Delta Lake table");
125
126    let url = path_to_url(table_path)?;
127
128    // Try to open or initialize the table. `url` and `storage_options` are
129    // not referenced after this call, so move rather than clone — a cloned
130    // HashMap per compaction tick / table reopen was pure overhead.
131    let table = DeltaTable::try_from_url_with_storage_options(url, storage_options)
132        .await
133        .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to open table: {e}")))?;
134
135    // Check if the table is initialized (has state).
136    if table.version().is_some() {
137        info!(
138            table_path,
139            version = table.version(),
140            "opened existing Delta Lake table"
141        );
142        return Ok(table);
143    }
144
145    // Table doesn't exist — create if we have a schema, otherwise defer to first write_batch().
146    let Some(schema) = schema else {
147        info!(
148            table_path,
149            "table does not exist yet; will create on first write"
150        );
151        return Ok(table);
152    };
153
154    info!(table_path, "creating new Delta Lake table");
155
156    // Convert Arrow schema to Delta Lake schema using TryIntoKernel.
157    let delta_schema: deltalake::kernel::StructType = schema
158        .as_ref()
159        .try_into_kernel()
160        .map_err(|e| ConnectorError::SchemaMismatch(format!("schema conversion failed: {e}")))?;
161
162    // Create the table.
163    let table = table
164        .create()
165        .with_columns(delta_schema.fields().cloned())
166        .await
167        .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to create table: {e}")))?;
168
169    info!(
170        table_path,
171        version = table.version(),
172        "created new Delta Lake table"
173    );
174
175    Ok(table)
176}
177
178/// Writes batches to a Delta Lake table with exactly-once semantics.
179///
180/// # Arguments
181///
182/// * `table` - The Delta Lake table handle (consumed and returned)
183/// * `batches` - Record batches to write
184/// * `writer_id` - Unique writer identifier for exactly-once deduplication
185/// * `epoch` - The epoch number for this write (stored in txn metadata)
186/// * `save_mode` - Delta Lake save mode (Append, Overwrite, etc.)
187/// * `partition_columns` - Optional partition column name slice
188/// * `schema_evolution` - If true, auto-merge new columns into the table schema
189///
190/// # Returns
191///
192/// A tuple of (updated table handle, new Delta version).
193///
194/// # Errors
195///
196/// Returns `ConnectorError::WriteError` if the write fails.
197#[cfg(feature = "delta-lake")]
198#[allow(clippy::too_many_arguments)]
199pub async fn write_batches(
200    table: DeltaTable,
201    batches: Vec<RecordBatch>,
202    writer_id: &str,
203    epoch: u64,
204    save_mode: SaveMode,
205    partition_columns: Option<&[String]>,
206    schema_evolution: bool,
207    target_file_size: Option<usize>,
208    create_checkpoint: bool,
209    writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
210) -> Result<(DeltaTable, i64), ConnectorError> {
211    if batches.is_empty() {
212        debug!("no batches to write, skipping");
213        let version = table.version().unwrap_or(0);
214        return Ok((table, version));
215    }
216
217    let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
218
219    debug!(
220        writer_id,
221        epoch,
222        total_rows,
223        num_batches = batches.len(),
224        "writing batches to Delta Lake"
225    );
226
227    // Build the write operation with transaction metadata for exactly-once.
228    // Note: Delta Lake uses i64 for epoch, but our API uses u64. This is safe
229    // as epochs won't exceed i64::MAX in practice.
230    #[allow(clippy::cast_possible_wrap)]
231    let epoch_i64 = epoch as i64;
232
233    let mut write_builder = table
234        .write(batches)
235        .with_save_mode(save_mode)
236        .with_commit_properties(
237            CommitProperties::default()
238                .with_application_transaction(Transaction::new(writer_id, epoch_i64))
239                .with_create_checkpoint(create_checkpoint),
240        );
241
242    // Forward target file size to delta-rs so Parquet files match the
243    // user's configured size, not just the internal default.
244    if let Some(size) = target_file_size {
245        write_builder = write_builder.with_target_file_size(size);
246    }
247
248    // Enable schema evolution (additive column merge) if requested.
249    if schema_evolution {
250        write_builder = write_builder.with_schema_mode(SchemaMode::Merge);
251    }
252
253    // Add partition columns if specified.
254    if let Some(cols) = partition_columns {
255        if !cols.is_empty() {
256            write_builder = write_builder.with_partition_columns(cols.to_vec());
257        }
258    }
259
260    if let Some(props) = writer_properties {
261        write_builder = write_builder.with_writer_properties(props);
262    }
263
264    // Execute the write.
265    let table = write_builder
266        .await
267        .map_err(|e| ConnectorError::WriteError(format!("Delta Lake write failed: {e}")))?;
268
269    let version = table.version().unwrap_or(0);
270
271    info!(
272        writer_id,
273        epoch, version, total_rows, "committed Delta Lake transaction"
274    );
275
276    Ok((table, version))
277}
278
279/// Retrieves the last committed epoch for a writer from Delta Lake's txn metadata.
280///
281/// This is used for exactly-once recovery: on startup, we check what epoch was
282/// last committed and skip any epochs <= that value.
283///
284/// # Arguments
285///
286/// * `table` - The Delta Lake table handle
287/// * `writer_id` - The writer identifier to look up
288///
289/// # Returns
290///
291/// The last committed epoch for this writer, or 0 if no commits found.
292#[cfg(feature = "delta-lake")]
293pub async fn get_last_committed_epoch(table: &DeltaTable, writer_id: &str) -> u64 {
294    // Query the table's application transaction version.
295    let Ok(snapshot) = table.snapshot() else {
296        debug!(writer_id, "no snapshot available, assuming epoch 0");
297        return 0;
298    };
299
300    match snapshot
301        .transaction_version(&table.log_store(), writer_id)
302        .await
303    {
304        Ok(Some(version)) => {
305            // Note: Delta Lake uses i64 for version, but our epoch is u64.
306            // Versions are always non-negative, so this is safe.
307            #[allow(clippy::cast_sign_loss)]
308            let epoch = version as u64;
309            debug!(
310                writer_id,
311                epoch, "found last committed epoch from txn metadata"
312            );
313            epoch
314        }
315        Ok(None) => {
316            debug!(
317                writer_id,
318                "no txn metadata found for writer, assuming epoch 0"
319            );
320            0
321        }
322        Err(e) => {
323            warn!(writer_id, error = %e, "failed to read txn metadata, assuming epoch 0");
324            0
325        }
326    }
327}
328
329/// Returns the table's partition columns, or an empty list if the snapshot is
330/// unavailable. Best-effort: used for clustering diagnostics, never for
331/// correctness, so a missing snapshot is not an error.
332#[cfg(feature = "delta-lake")]
333#[must_use]
334pub fn get_partition_columns(table: &DeltaTable) -> Vec<String> {
335    match table.snapshot() {
336        Ok(snapshot) => snapshot.snapshot().metadata().partition_columns().clone(),
337        Err(_) => Vec::new(),
338    }
339}
340
341/// Extracts the Arrow schema from a Delta Lake table.
342///
343/// # Arguments
344///
345/// * `table` - The Delta Lake table handle
346///
347/// # Returns
348///
349/// The table's Arrow schema.
350///
351/// # Errors
352///
353/// Returns `ConnectorError::SchemaMismatch` if schema extraction fails.
354#[cfg(feature = "delta-lake")]
355pub fn get_table_schema(table: &DeltaTable) -> Result<SchemaRef, ConnectorError> {
356    let state = table
357        .snapshot()
358        .map_err(|e| ConnectorError::SchemaMismatch(format!("table has no snapshot: {e}")))?;
359
360    // Use the pre-computed Arrow schema from the EagerSnapshot.
361    Ok(state.snapshot().arrow_schema())
362}
363
364/// Returns the latest committed version via the log store.
365///
366/// # Errors
367///
368/// Returns `ConnectorError::ReadError` on failure.
369#[cfg(feature = "delta-lake")]
370pub async fn get_latest_version(table: &mut DeltaTable) -> Result<i64, ConnectorError> {
371    let log_store = table.log_store();
372    let current = table.version().unwrap_or(0);
373    log_store
374        .get_latest_version(current)
375        .await
376        .map_err(|e| ConnectorError::ReadError(format!("failed to get latest version: {e}")))
377}
378
379/// Reads record batches from a specific Delta Lake table version.
380///
381/// Loads the requested version, applies a `LIMIT` to bound memory usage,
382/// then streams results via `execute_stream` to avoid materializing the
383/// entire version in memory.
384///
385/// # Arguments
386///
387/// * `table` - Mutable reference to the Delta Lake table handle
388/// * `version` - The table version to read
389/// * `max_records` - Maximum number of records to return. Pass `usize::MAX`
390///   to read all records (unbounded).
391///
392/// # Errors
393///
394/// Returns `ConnectorError::ReadError` if the version cannot be loaded or scanned.
395///
396/// Returns `(batches, fully_consumed)` — `fully_consumed` is `false` when
397/// `max_records` truncated the result and more rows remain.
398#[cfg(feature = "delta-lake")]
399pub async fn read_batches_at_version(
400    table: &mut DeltaTable,
401    version: i64,
402    max_records: usize,
403) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
404    use datafusion::prelude::SessionContext;
405    use tokio_stream::StreamExt;
406
407    // Load the specific version.
408    table
409        .load_version(version)
410        .await
411        .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
412
413    debug!(version, "Delta Lake: loaded version for reading");
414
415    // Build a DeltaTableProvider via the builder and register it with DataFusion.
416    let provider =
417        table.table_provider().build().await.map_err(|e| {
418            ConnectorError::ReadError(format!("failed to build table provider: {e}"))
419        })?;
420
421    let ctx = SessionContext::new();
422    ctx.register_table("delta_source_scan", Arc::new(provider))
423        .map_err(|e| ConnectorError::ReadError(format!("failed to register scan table: {e}")))?;
424
425    // Apply LIMIT to bound memory: prevents OOM on large versions.
426    let df = ctx
427        .sql("SELECT * FROM delta_source_scan")
428        .await
429        .map_err(|e| ConnectorError::ReadError(format!("scan query failed: {e}")))?;
430
431    let df = if max_records < usize::MAX {
432        df.limit(0, Some(max_records))
433            .map_err(|e| ConnectorError::ReadError(format!("limit failed: {e}")))?
434    } else {
435        df
436    };
437
438    // Stream results instead of collect() to avoid materializing everything.
439    let mut stream = df
440        .execute_stream()
441        .await
442        .map_err(|e| ConnectorError::ReadError(format!("stream execution failed: {e}")))?;
443
444    let mut batches = Vec::new();
445    let mut total_rows: usize = 0;
446
447    while let Some(result) = stream.next().await {
448        let batch =
449            result.map_err(|e| ConnectorError::ReadError(format!("stream batch failed: {e}")))?;
450        if batch.num_rows() == 0 {
451            continue;
452        }
453        total_rows += batch.num_rows();
454        batches.push(batch);
455
456        // Respect max_records even between DataFusion batches.
457        if total_rows >= max_records {
458            break;
459        }
460    }
461
462    // If we stopped due to max_records, probe whether the stream has more.
463    // Without this, a version with exactly max_records rows would be
464    // misclassified as truncated and re-read forever.
465    let fully_consumed = if total_rows >= max_records {
466        stream.next().await.is_none()
467    } else {
468        true
469    };
470
471    debug!(
472        version,
473        num_batches = batches.len(),
474        total_rows,
475        fully_consumed,
476        "Delta Lake: scanned version"
477    );
478
479    Ok((batches, fully_consumed))
480}
481
482/// Reads only the rows added in a specific Delta Lake version.
483///
484/// Parses `_delta_log/{version:020}.json` for `add` actions, then reads
485/// only those Parquet files via the table's object store. This is
486/// `O(new_files)` per version, not `O(table_size)`.
487///
488/// For version 0, delegates to [`read_batches_at_version`] (full snapshot).
489///
490/// # Errors
491///
492/// Returns `ConnectorError::ReadError` if the version cannot be loaded or read.
493///
494/// Returns `(batches, fully_consumed)` — see [`read_batches_at_version`].
495#[cfg(feature = "delta-lake")]
496#[allow(clippy::too_many_lines)]
497pub async fn read_version_diff(
498    table: &mut DeltaTable,
499    version: i64,
500    max_records: usize,
501    partition_filter: Option<&str>,
502) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
503    // Maximum file size (256 MB) for direct in-memory Parquet reads.
504    // Files larger than this fall back to DataFusion's streaming scan.
505    const MAX_DIRECT_READ_BYTES: u64 = 256 * 1024 * 1024;
506
507    // For version 0, read the full snapshot (no previous version to diff).
508    if version <= 0 {
509        return read_batches_at_version(table, version, max_records).await;
510    }
511
512    // Read the commit JSON via delta-rs's LogStore API (handles path
513    // resolution, checkpoints, and retries correctly).
514    let log_store = table.log_store();
515    let store = log_store.object_store(None);
516
517    let commit_data = log_store
518        .read_commit_entry(version)
519        .await
520        .map_err(|e| ConnectorError::ReadError(format!("read commit {version}: {e}")))?
521        .ok_or_else(|| {
522            ConnectorError::ReadError(format!(
523                "version {version} not available (cleaned up or never existed)"
524            ))
525        })?;
526    let commit_str = std::str::from_utf8(&commit_data)
527        .map_err(|e| ConnectorError::ReadError(format!("commit log is not valid UTF-8: {e}")))?;
528
529    // Each line in the commit JSON is a separate action object.
530    // Collect both add and remove actions to compute the net-new files.
531    let mut added_paths = Vec::new();
532    let mut removed_paths = std::collections::HashSet::new();
533    for line in commit_str.lines() {
534        let line = line.trim();
535        if line.is_empty() {
536            continue;
537        }
538        if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
539            if let Some(add) = obj.get("add") {
540                if let Some(path) = add.get("path").and_then(|p| p.as_str()) {
541                    added_paths.push(decode_delta_path(path));
542                }
543            }
544            if let Some(remove) = obj.get("remove") {
545                if let Some(path) = remove.get("path").and_then(|p| p.as_str()) {
546                    removed_paths.insert(decode_delta_path(path));
547                }
548            }
549        }
550    }
551
552    // Exclude any added file whose path also appears in a remove action.
553    added_paths.retain(|p| !removed_paths.contains(p));
554
555    if added_paths.is_empty() {
556        debug!(
557            version,
558            num_removed = removed_paths.len(),
559            "Delta Lake: no net-new add actions in version"
560        );
561        return Ok((Vec::new(), true));
562    }
563
564    debug!(
565        version,
566        num_added_files = added_paths.len(),
567        num_removed_files = removed_paths.len(),
568        "Delta Lake: reading added files"
569    );
570
571    // Load the version so we have the correct schema.
572    table
573        .load_version(version)
574        .await
575        .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
576
577    let table_schema = table
578        .snapshot()
579        .map(|s| s.snapshot().arrow_schema())
580        .map_err(|e| ConnectorError::ReadError(format!("no snapshot at version {version}: {e}")))?;
581
582    // Filter file paths by partition predicate if provided.
583    // Supports simple Hive-style equality: "col = 'val'" matches "col=val/" in path.
584    let added_paths = if let Some(filter) = partition_filter {
585        filter_paths_by_partition(&added_paths, filter)
586    } else {
587        added_paths
588    };
589
590    // Read each added Parquet file as raw bytes via delta-rs's object_store,
591    // then parse with parquet's in-memory ArrowReaderBuilder (avoids the
592    // object_store 0.12 vs 0.13 version mismatch).
593    let mut batches = Vec::new();
594    let mut total_rows: usize = 0;
595
596    for file_path in &added_paths {
597        if total_rows >= max_records {
598            break;
599        }
600
601        let obj_path = deltalake::Path::from(file_path.as_str());
602
603        // Check file size before downloading. Large files fall back to
604        // DataFusion scan to avoid OOM on multi-GB Parquet files.
605        let file_meta = store
606            .head(&obj_path)
607            .await
608            .map_err(|e| ConnectorError::ReadError(format!("failed to stat '{file_path}': {e}")))?;
609        if file_meta.size > MAX_DIRECT_READ_BYTES {
610            warn!(
611                file_path,
612                file_size = file_meta.size,
613                "file too large for direct read, falling back to DataFusion scan"
614            );
615            return read_batches_at_version(table, version, max_records).await;
616        }
617
618        let file_bytes = get_with_retry(&store, &obj_path, file_path).await?;
619
620        let parquet_reader =
621            deltalake::parquet::arrow::arrow_reader::ArrowReaderBuilder::try_new(file_bytes)
622                .map_err(|e| {
623                    ConnectorError::ReadError(format!(
624                        "failed to open Parquet file '{file_path}': {e}"
625                    ))
626                })?;
627
628        // Read one extra row to probe whether the version is fully consumed.
629        let remaining = max_records.saturating_sub(total_rows).saturating_add(1);
630        let reader = parquet_reader.with_limit(remaining).build().map_err(|e| {
631            ConnectorError::ReadError(format!("failed to build reader for '{file_path}': {e}"))
632        })?;
633
634        for result in reader {
635            let batch: RecordBatch = result.map_err(|e| {
636                ConnectorError::ReadError(format!("Parquet read error in '{file_path}': {e}"))
637            })?;
638            if batch.num_rows() == 0 {
639                continue;
640            }
641
642            // Align the batch schema to the table schema (added files may
643            // predate schema evolution and have fewer columns).
644            let batch = if batch.schema() == table_schema {
645                batch
646            } else {
647                align_batch_to_schema(&batch, &table_schema)?
648            };
649
650            total_rows += batch.num_rows();
651            batches.push(batch);
652
653            if total_rows >= max_records {
654                break;
655            }
656        }
657    }
658
659    // We probed one extra row per file. If total_rows > max_records, there's
660    // more data — trim the excess and report not fully consumed.
661    let fully_consumed = total_rows <= max_records;
662    if !fully_consumed {
663        // Trim the last batch to remove the probe row(s).
664        let excess = total_rows - max_records;
665        let len = batches.len();
666        if len > 0 {
667            let last = &batches[len - 1];
668            if last.num_rows() > excess {
669                batches[len - 1] = last.slice(0, last.num_rows() - excess);
670            } else {
671                batches.pop();
672            }
673        }
674    }
675
676    debug!(
677        version,
678        num_batches = batches.len(),
679        fully_consumed,
680        num_added_files = added_paths.len(),
681        "Delta Lake: read version diff"
682    );
683
684    Ok((batches, fully_consumed))
685}
686
687/// Reads a file from `object_store` with retry (3x, exponential backoff).
688/// Does not retry 404s.
689#[cfg(feature = "delta-lake")]
690async fn get_with_retry(
691    store: &Arc<dyn deltalake::ObjectStore>,
692    path: &deltalake::Path,
693    display_path: &str,
694) -> Result<bytes::Bytes, ConnectorError> {
695    let backoff = [200u64, 1000, 4000];
696    let mut last_err = None;
697
698    for attempt in 0..=backoff.len() {
699        match store.get(path).await {
700            Ok(result) => {
701                return result.bytes().await.map_err(|e| {
702                    ConnectorError::ReadError(format!(
703                        "failed to read bytes of '{display_path}': {e}"
704                    ))
705                });
706            }
707            Err(e) => {
708                let msg = e.to_string();
709                if msg.contains("not found") || msg.contains("404") {
710                    return Err(ConnectorError::ReadError(format!(
711                        "file not found '{display_path}': {e}"
712                    )));
713                }
714                if let Some(&delay) = backoff.get(attempt) {
715                    warn!(
716                        attempt = attempt + 1,
717                        delay_ms = delay,
718                        error = %e,
719                        path = display_path,
720                        "object_store read failed, retrying"
721                    );
722                    tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
723                }
724                last_err = Some(e);
725            }
726        }
727    }
728
729    Err(ConnectorError::ReadError(format!(
730        "failed to read '{display_path}' after {} attempts: {}",
731        backoff.len() + 1,
732        last_err.map_or_else(|| "unknown".to_string(), |e| e.to_string())
733    )))
734}
735
736/// Filters file paths by a Hive-style partition predicate.
737///
738/// Supports simple equality predicates: `col = 'val'` matches paths
739/// containing `col=val/`. Multiple predicates joined by `AND` are all
740/// required to match. Predicates that can't be parsed are ignored
741/// (all paths pass through).
742#[cfg(feature = "delta-lake")]
743fn filter_paths_by_partition(paths: &[String], filter: &str) -> Vec<String> {
744    // Parse simple "col = 'val'" or "col = val" predicates from AND-joined expressions.
745    let mut required_segments: Vec<String> = Vec::new();
746    for clause in filter
747        .split_whitespace()
748        .collect::<Vec<_>>()
749        .join(" ")
750        .split(" AND ")
751    {
752        let clause = clause.trim();
753        if let Some((col, val)) = clause.split_once('=') {
754            let col = col.trim();
755            let val = val.trim().trim_matches('\'').trim_matches('"');
756            if !col.is_empty() && !val.is_empty() {
757                required_segments.push(format!("{col}={val}"));
758            }
759        }
760    }
761
762    if required_segments.is_empty() {
763        return paths.to_vec();
764    }
765
766    paths
767        .iter()
768        .filter(|path| required_segments.iter().all(|seg| path.contains(seg)))
769        .cloned()
770        .collect()
771}
772
773/// Percent-decodes a file path from a Delta Lake commit JSON.
774///
775/// Delta Lake spec requires paths in `add`/`remove` actions to be
776/// percent-encoded (e.g., `part%3D1/file.parquet` for `part=1/file.parquet`).
777#[cfg(feature = "delta-lake")]
778fn decode_delta_path(encoded: &str) -> String {
779    url::Url::parse(&format!("file:///{encoded}")).map_or_else(
780        |_| encoded.to_string(),
781        |u| {
782            let p = u.path();
783            p.strip_prefix('/').unwrap_or(p).to_string()
784        },
785    )
786}
787
788/// Aligns a `RecordBatch` to a target schema by adding null columns for
789/// missing fields. Used when reading Parquet files that predate schema
790/// evolution (fewer columns than the current table schema).
791#[cfg(feature = "delta-lake")]
792fn align_batch_to_schema(
793    batch: &RecordBatch,
794    target_schema: &SchemaRef,
795) -> Result<RecordBatch, ConnectorError> {
796    use arrow_array::new_null_array;
797
798    let mut columns = Vec::with_capacity(target_schema.fields().len());
799    for field in target_schema.fields() {
800        if let Ok(col_idx) = batch.schema().index_of(field.name()) {
801            columns.push(batch.column(col_idx).clone());
802        } else {
803            columns.push(new_null_array(field.data_type(), batch.num_rows()));
804        }
805    }
806
807    RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| {
808        ConnectorError::ReadError(format!("failed to align batch to table schema: {e}"))
809    })
810}
811
812/// Reads CDF batches for a version range via `scan_cdf()`.
813///
814/// `scan_cdf(self)` consumes the `DeltaTable` — caller must re-open afterward.
815/// Output includes `_change_type`, `_commit_version`, `_commit_timestamp`.
816///
817/// # Errors
818///
819/// Returns `ConnectorError::ReadError` on scan failure.
820#[cfg(feature = "delta-lake")]
821pub async fn read_cdf_batches(
822    table: DeltaTable,
823    start_version: i64,
824    end_version: i64,
825) -> Result<Vec<RecordBatch>, ConnectorError> {
826    use datafusion::prelude::SessionContext;
827    use tokio_stream::StreamExt;
828
829    debug!(start_version, end_version, "reading CDF batches");
830
831    let ctx = SessionContext::new();
832
833    // Clone session state so the RwLockReadGuard is dropped before await.
834    let session_state = ctx.state();
835
836    let cdf_builder = table
837        .scan_cdf()
838        .with_starting_version(start_version)
839        .with_ending_version(end_version);
840
841    let plan = cdf_builder
842        .build(&session_state, None)
843        .await
844        .map_err(|e| ConnectorError::ReadError(format!("CDF scan build failed: {e}")))?;
845
846    // Execute the plan via DataFusion to get record batches.
847    let task_ctx = ctx.task_ctx();
848    let mut stream = datafusion::physical_plan::execute_stream(plan, task_ctx)
849        .map_err(|e| ConnectorError::ReadError(format!("CDF stream execution failed: {e}")))?;
850
851    let mut batches = Vec::new();
852    while let Some(result) = stream.next().await {
853        let batch: RecordBatch = result
854            .map_err(|e| ConnectorError::ReadError(format!("CDF stream batch failed: {e}")))?;
855        if batch.num_rows() > 0 {
856            batches.push(batch);
857        }
858    }
859
860    debug!(
861        start_version,
862        end_version,
863        num_batches = batches.len(),
864        "CDF scan complete"
865    );
866
867    Ok(batches)
868}
869
870/// Maps CDF `_change_type` → `_op` (`I`/`U`/`D`), drops `update_preimage`
871/// rows and CDF metadata columns (`_change_type`, `_commit_version`,
872/// `_commit_timestamp`). Returns `None` if all rows were preimages.
873///
874/// # Errors
875///
876/// Returns `ConnectorError::ReadError` on Arrow operation failure.
877#[cfg(feature = "delta-lake")]
878pub fn map_cdf_to_changelog(batch: &RecordBatch) -> Result<Option<RecordBatch>, ConnectorError> {
879    use arrow_array::StringArray;
880
881    let schema = batch.schema();
882    let Ok(ct_idx) = schema.index_of("_change_type") else {
883        return Ok(Some(batch.clone()));
884    };
885
886    let change_type = batch
887        .column(ct_idx)
888        .as_any()
889        .downcast_ref::<StringArray>()
890        .ok_or_else(|| ConnectorError::ReadError("_change_type is not Utf8".into()))?;
891
892    // Build filter (drop preimage rows) and mapped _op values in one pass.
893    let (keep, ops): (Vec<bool>, Vec<Option<&str>>) = (0..batch.num_rows())
894        .map(|i| match change_type.value(i) {
895            "update_postimage" => (true, Some("U")),
896            "delete" => (true, Some("D")),
897            "update_preimage" => (false, Some("")),
898            _ => (true, Some("I")), // insert + unknown → I
899        })
900        .unzip();
901
902    let filter = arrow_array::BooleanArray::from(keep);
903    let filtered = arrow_select::filter::filter_record_batch(batch, &filter)
904        .map_err(|e| ConnectorError::ReadError(format!("CDF filter failed: {e}")))?;
905    if filtered.num_rows() == 0 {
906        return Ok(None);
907    }
908
909    // Build _op column from filtered ops.
910    let op_arr: StringArray = ops.into_iter().collect();
911    let op_filtered = arrow_select::filter::filter(&op_arr, &filter)
912        .map_err(|e| ConnectorError::ReadError(format!("CDF op filter: {e}")))?;
913
914    // Rebuild batch: keep user columns, drop CDF metadata, append _op.
915    let cdf_meta = ["_change_type", "_commit_version", "_commit_timestamp"];
916    let mut fields = Vec::new();
917    let mut columns: Vec<Arc<dyn arrow_array::Array>> = Vec::new();
918    for (i, field) in filtered.schema().fields().iter().enumerate() {
919        if !cdf_meta.contains(&field.name().as_str()) {
920            fields.push(field.clone());
921            columns.push(filtered.column(i).clone());
922        }
923    }
924    fields.push(Arc::new(arrow_schema::Field::new(
925        "_op",
926        arrow_schema::DataType::Utf8,
927        false,
928    )));
929    columns.push(op_filtered);
930
931    RecordBatch::try_new(Arc::new(arrow_schema::Schema::new(fields)), columns)
932        .map(Some)
933        .map_err(|e| ConnectorError::ReadError(format!("CDF batch rebuild: {e}")))
934}
935
936/// Result of a MERGE (upsert) operation.
937#[cfg(feature = "delta-lake")]
938#[derive(Debug)]
939pub struct MergeResult {
940    /// Number of rows inserted.
941    pub rows_inserted: usize,
942    /// Number of rows updated.
943    pub rows_updated: usize,
944    /// Number of rows deleted.
945    pub rows_deleted: usize,
946}
947
948/// Atomic changelog MERGE: inserts, updates, and deletes in one Delta commit.
949///
950/// The source batch must contain an `_op` column (Utf8) with values:
951/// - `"I"`, `"U"`, `"r"` → upsert (update if matched, insert if not)
952/// - `"D"` → delete matched rows
953///
954/// Columns prefixed with `_` are excluded from SET clauses but remain
955/// in the source `DataFrame` for predicate filtering.
956///
957/// # Errors
958///
959/// Returns `ConnectorError::WriteError` if the merge fails.
960#[cfg(feature = "delta-lake")]
961#[allow(clippy::too_many_lines)]
962#[allow(clippy::too_many_arguments)]
963pub async fn merge_changelog(
964    table: DeltaTable,
965    source_batch: RecordBatch,
966    key_columns: &[String],
967    writer_id: &str,
968    epoch: u64,
969    schema_evolution: bool,
970    writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
971    ctx: &datafusion::prelude::SessionContext,
972) -> Result<(DeltaTable, MergeResult), ConnectorError> {
973    use datafusion::prelude::*;
974    use deltalake::kernel::transaction::CommitProperties;
975    use deltalake::kernel::Transaction;
976
977    const CDC_COLUMNS: &[&str] = &["_op", "_ts_ms"];
978
979    if source_batch.num_rows() == 0 {
980        return Ok((
981            table,
982            MergeResult {
983                rows_inserted: 0,
984                rows_updated: 0,
985                rows_deleted: 0,
986            },
987        ));
988    }
989
990    debug!(
991        key_columns = ?key_columns,
992        source_rows = source_batch.num_rows(),
993        "performing atomic changelog MERGE"
994    );
995
996    let source_df = ctx.read_batch(source_batch).map_err(|e| {
997        ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
998    })?;
999
1000    // Join predicate: target.k1 = source.k1 AND ...
1001    let predicate = key_columns
1002        .iter()
1003        .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
1004        .reduce(Expr::and)
1005        .ok_or_else(|| {
1006            ConnectorError::ConfigurationError("merge requires at least one key column".into())
1007        })?;
1008
1009    let source_schema = source_df.schema().clone();
1010    let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
1011
1012    // Exclude CDC metadata columns from SET clauses (preserve user columns like _id).
1013    let all_user_columns: Vec<String> = source_schema
1014        .fields()
1015        .iter()
1016        .map(|f| f.name().clone())
1017        .filter(|name| !CDC_COLUMNS.contains(&name.as_str()))
1018        .collect();
1019
1020    let non_key_user_columns: Vec<String> = all_user_columns
1021        .iter()
1022        .filter(|c| !key_set.contains(c.as_str()))
1023        .cloned()
1024        .collect();
1025
1026    // Predicates for conditional clause execution.
1027    let upsert_pred = col("source._op").in_list(vec![lit("I"), lit("U"), lit("r")], false);
1028    let delete_pred = col("source._op").eq(lit("D"));
1029
1030    #[allow(clippy::cast_possible_wrap)]
1031    let epoch_i64 = epoch as i64;
1032
1033    let non_key_for_update = non_key_user_columns;
1034    let all_for_insert = all_user_columns;
1035
1036    let mut merge_builder = table
1037        .merge(source_df, predicate)
1038        .with_source_alias("source")
1039        .with_target_alias("target")
1040        .with_commit_properties(
1041            CommitProperties::default()
1042                .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
1043        )
1044        .when_matched_update(|update| {
1045            let mut u = update.predicate(upsert_pred.clone());
1046            for col_name in &non_key_for_update {
1047                u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
1048            }
1049            u
1050        })
1051        .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
1052        .when_matched_delete(|delete| delete.predicate(delete_pred))
1053        .map_err(|e| ConnectorError::WriteError(format!("merge matched-delete failed: {e}")))?
1054        .when_not_matched_insert(|insert| {
1055            let mut ins = insert.predicate(upsert_pred);
1056            for col_name in &all_for_insert {
1057                ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
1058            }
1059            ins
1060        })
1061        .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
1062
1063    if schema_evolution {
1064        merge_builder = merge_builder.with_merge_schema(true);
1065    }
1066
1067    if let Some(props) = writer_properties {
1068        merge_builder = merge_builder.with_writer_properties(props);
1069    }
1070
1071    let (table, metrics) = merge_builder.await.map_err(|e| {
1072        ConnectorError::WriteError(format!("Delta Lake changelog MERGE failed: {e}"))
1073    })?;
1074
1075    let result = MergeResult {
1076        rows_inserted: metrics.num_target_rows_inserted,
1077        rows_updated: metrics.num_target_rows_updated,
1078        rows_deleted: metrics.num_target_rows_deleted,
1079    };
1080
1081    info!(
1082        writer_id,
1083        epoch,
1084        rows_inserted = result.rows_inserted,
1085        rows_updated = result.rows_updated,
1086        rows_deleted = result.rows_deleted,
1087        "Delta Lake changelog MERGE complete"
1088    );
1089
1090    Ok((table, result))
1091}
1092
1093/// Result of a compaction (OPTIMIZE) operation.
1094#[cfg(feature = "delta-lake")]
1095#[derive(Debug)]
1096pub struct CompactionResult {
1097    /// Number of new optimized files written.
1098    pub files_added: u64,
1099    /// Number of small files removed.
1100    pub files_removed: u64,
1101    /// Number of partitions that were optimized.
1102    pub partitions_optimized: u64,
1103}
1104
1105/// Runs an OPTIMIZE compaction on a Delta Lake table.
1106///
1107/// Compacts small Parquet files into larger ones (target size), optionally
1108/// applying Z-ORDER clustering.
1109///
1110/// # Errors
1111///
1112/// Returns `ConnectorError::Internal` if the operation fails.
1113#[cfg(feature = "delta-lake")]
1114pub async fn run_compaction(
1115    table: DeltaTable,
1116    target_file_size: u64,
1117    z_order_columns: &[String],
1118    writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
1119) -> Result<(DeltaTable, CompactionResult), ConnectorError> {
1120    use deltalake::operations::optimize::OptimizeType;
1121
1122    info!(target_file_size, "running Delta Lake compaction (OPTIMIZE)");
1123
1124    let optimize_type = if z_order_columns.is_empty() {
1125        OptimizeType::Compact
1126    } else {
1127        OptimizeType::ZOrder(z_order_columns.to_vec())
1128    };
1129
1130    let mut optimize_builder = table
1131        .optimize()
1132        .with_type(optimize_type)
1133        .with_target_size(target_file_size);
1134
1135    if let Some(props) = writer_properties {
1136        optimize_builder = optimize_builder.with_writer_properties(props);
1137    }
1138
1139    let (table, metrics) = optimize_builder
1140        .await
1141        .map_err(|e| ConnectorError::Internal(format!("compaction failed: {e}")))?;
1142
1143    let result = CompactionResult {
1144        files_added: metrics.num_files_added,
1145        files_removed: metrics.num_files_removed,
1146        partitions_optimized: metrics.partitions_optimized,
1147    };
1148
1149    info!(
1150        files_added = result.files_added,
1151        files_removed = result.files_removed,
1152        partitions_optimized = result.partitions_optimized,
1153        "Delta Lake compaction complete"
1154    );
1155
1156    Ok((table, result))
1157}
1158
1159/// Runs VACUUM on a Delta Lake table, deleting old unreferenced files.
1160///
1161/// # Errors
1162///
1163/// Returns `ConnectorError::Internal` if the operation fails.
1164#[cfg(feature = "delta-lake")]
1165pub async fn run_vacuum(
1166    table: DeltaTable,
1167    retention: std::time::Duration,
1168) -> Result<(DeltaTable, usize), ConnectorError> {
1169    let retention_hours = retention.as_secs() / 3600;
1170    info!(retention_hours, "running Delta Lake VACUUM");
1171
1172    let chrono_duration =
1173        chrono::Duration::from_std(retention).unwrap_or_else(|_| chrono::Duration::hours(168)); // fallback: 7 days
1174
1175    let (table, metrics) = table
1176        .vacuum()
1177        .with_retention_period(chrono_duration)
1178        .await
1179        .map_err(|e| ConnectorError::Internal(format!("vacuum failed: {e}")))?;
1180
1181    let files_deleted = metrics.files_deleted.len();
1182
1183    info!(files_deleted, "Delta Lake VACUUM complete");
1184
1185    Ok((table, files_deleted))
1186}
1187
1188/// Resolves catalog-aware table URI and merges catalog-specific storage options.
1189///
1190/// - `None`: returns table path and storage options as-is.
1191/// - `Glue`: calls AWS Glue API to resolve the table's S3 location.
1192/// - `Unity`: injects workspace URL and access token into storage options.
1193///
1194/// # Errors
1195///
1196/// Returns `ConnectorError` if catalog resolution fails.
1197#[cfg(feature = "delta-lake")]
1198#[allow(clippy::implicit_hasher, clippy::unused_async)]
1199pub async fn resolve_catalog_options(
1200    catalog: &super::delta_config::DeltaCatalogType,
1201    #[allow(unused_variables)] catalog_database: Option<&str>,
1202    #[allow(unused_variables)] catalog_name: Option<&str>,
1203    _catalog_schema: Option<&str>,
1204    table_path: &str,
1205    base_storage_options: &HashMap<String, String>,
1206) -> Result<(String, HashMap<String, String>), ConnectorError> {
1207    use super::delta_config::DeltaCatalogType;
1208
1209    match catalog {
1210        DeltaCatalogType::None => Ok((table_path.to_string(), base_storage_options.clone())),
1211        #[cfg(feature = "delta-lake-glue")]
1212        DeltaCatalogType::Glue => {
1213            use deltalake::DataCatalog;
1214            let database = catalog_database.ok_or_else(|| {
1215                ConnectorError::ConfigurationError(
1216                    "Glue catalog requires 'catalog.database'".into(),
1217                )
1218            })?;
1219            let glue = deltalake_catalog_glue::GlueDataCatalog::from_env()
1220                .await
1221                .map_err(|e| {
1222                    ConnectorError::ConnectionFailed(format!("failed to init Glue catalog: {e}"))
1223                })?;
1224            let resolved = glue
1225                .get_table_storage_location(catalog_name.map(String::from), database, table_path)
1226                .await
1227                .map_err(|e| {
1228                    ConnectorError::ConnectionFailed(format!(
1229                        "Glue catalog lookup failed for '{database}.{table_path}': {e}"
1230                    ))
1231                })?;
1232            info!(
1233                glue_database = database,
1234                table = table_path,
1235                resolved_path = %resolved,
1236                "resolved table path via Glue catalog"
1237            );
1238            Ok((resolved, base_storage_options.clone()))
1239        }
1240        #[cfg(not(feature = "delta-lake-glue"))]
1241        DeltaCatalogType::Glue => Err(ConnectorError::ConfigurationError(
1242            "Glue catalog requires the 'delta-lake-glue' feature. \
1243             Build with: cargo build --features delta-lake-glue"
1244                .into(),
1245        )),
1246        #[cfg(feature = "delta-lake-unity")]
1247        DeltaCatalogType::Unity {
1248            workspace_url,
1249            access_token,
1250        } => {
1251            // Resolve the table's actual storage location from Unity Catalog
1252            // via REST API, then return that direct path (s3://, az://, gs://)
1253            // instead of the uc:// URI. This bypasses delta-rs's built-in
1254            // uc:// handling which requires credential vending — a feature
1255            // that is denied outside Databricks compute environments.
1256            let full_name = table_path.strip_prefix("uc://").unwrap_or(table_path);
1257
1258            let storage_location = super::unity_catalog::get_table_storage_location(
1259                workspace_url,
1260                access_token,
1261                full_name,
1262            )
1263            .await?;
1264
1265            Ok((storage_location, base_storage_options.clone()))
1266        }
1267        #[cfg(not(feature = "delta-lake-unity"))]
1268        DeltaCatalogType::Unity { .. } => Err(ConnectorError::ConfigurationError(
1269            "Unity catalog requires the 'delta-lake-unity' feature. \
1270             Build with: cargo build --features delta-lake-unity"
1271                .into(),
1272        )),
1273    }
1274}
1275
1276// ============================================================================
1277// Integration tests (require delta-lake feature)
1278// ============================================================================
1279
1280#[cfg(all(test, feature = "delta-lake"))]
1281mod tests {
1282    use super::*;
1283    use arrow_array::{Float64Array, Int64Array, StringArray};
1284    use arrow_schema::{DataType, Field, Schema};
1285    use std::sync::Arc;
1286    use tempfile::TempDir;
1287
1288    fn test_schema() -> SchemaRef {
1289        Arc::new(Schema::new(vec![
1290            Field::new("id", DataType::Int64, false),
1291            Field::new("name", DataType::Utf8, true),
1292            Field::new("value", DataType::Float64, true),
1293        ]))
1294    }
1295
1296    fn test_batch(n: usize) -> RecordBatch {
1297        let ids: Vec<i64> = (0..n as i64).collect();
1298        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1299        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1300
1301        RecordBatch::try_new(
1302            test_schema(),
1303            vec![
1304                Arc::new(Int64Array::from(ids)),
1305                Arc::new(StringArray::from(names)),
1306                Arc::new(Float64Array::from(values)),
1307            ],
1308        )
1309        .unwrap()
1310    }
1311
1312    #[tokio::test]
1313    async fn test_open_creates_table() {
1314        let temp_dir = TempDir::new().unwrap();
1315        let table_path = temp_dir.path().to_str().unwrap();
1316
1317        // Open with schema should create the table.
1318        let schema = test_schema();
1319        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1320            .await
1321            .unwrap();
1322
1323        assert_eq!(table.version(), Some(0));
1324
1325        // Verify _delta_log directory was created.
1326        let delta_log = temp_dir.path().join("_delta_log");
1327        assert!(delta_log.exists(), "_delta_log directory should exist");
1328    }
1329
1330    #[tokio::test]
1331    async fn test_open_existing_table() {
1332        let temp_dir = TempDir::new().unwrap();
1333        let table_path = temp_dir.path().to_str().unwrap();
1334
1335        // Create the table.
1336        let schema = test_schema();
1337        let _ = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1338            .await
1339            .unwrap();
1340
1341        // Reopen without schema - should work.
1342        let table = open_or_create_table(table_path, HashMap::new(), None)
1343            .await
1344            .unwrap();
1345
1346        assert_eq!(table.version(), Some(0));
1347    }
1348
1349    #[tokio::test]
1350    async fn test_open_nonexistent_without_schema_defers() {
1351        let temp_dir = TempDir::new().unwrap();
1352        let nonexistent_table = temp_dir.path().join("nonexistent");
1353        std::fs::create_dir_all(&nonexistent_table).unwrap();
1354        let table_path = nonexistent_table.to_str().unwrap();
1355
1356        // Open without schema returns an uninitialized table (deferred creation).
1357        let result = open_or_create_table(table_path, HashMap::new(), None).await;
1358        assert!(result.is_ok());
1359        let table = result.unwrap();
1360        assert!(table.version().is_none(), "table should be uninitialized");
1361    }
1362
1363    #[tokio::test]
1364    async fn test_write_batch_creates_parquet() {
1365        let temp_dir = TempDir::new().unwrap();
1366        let table_path = temp_dir.path().to_str().unwrap();
1367
1368        // Create table.
1369        let schema = test_schema();
1370        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1371            .await
1372            .unwrap();
1373
1374        // Write a batch.
1375        let batch = test_batch(100);
1376        let (table, version) = write_batches(
1377            table,
1378            vec![batch],
1379            "test-writer",
1380            1,
1381            SaveMode::Append,
1382            None,
1383            false,
1384            None,
1385            false,
1386            None,
1387        )
1388        .await
1389        .unwrap();
1390
1391        assert_eq!(version, 1);
1392        assert_eq!(table.version(), Some(1));
1393
1394        // Verify Parquet files were created (in the table directory).
1395        let parquet_files: Vec<_> = std::fs::read_dir(temp_dir.path())
1396            .unwrap()
1397            .filter_map(Result::ok)
1398            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1399            .collect();
1400
1401        assert!(
1402            !parquet_files.is_empty(),
1403            "should have created Parquet files"
1404        );
1405    }
1406
1407    #[tokio::test]
1408    async fn test_exactly_once_epoch_skip() {
1409        let temp_dir = TempDir::new().unwrap();
1410        let table_path = temp_dir.path().to_str().unwrap();
1411        let writer_id = "exactly-once-writer";
1412
1413        // Create table and write epoch 1.
1414        let schema = test_schema();
1415        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1416            .await
1417            .unwrap();
1418
1419        let batch = test_batch(10);
1420        let (table, _) = write_batches(
1421            table,
1422            vec![batch.clone()],
1423            writer_id,
1424            1,
1425            SaveMode::Append,
1426            None,
1427            false,
1428            None,
1429            false,
1430            None,
1431        )
1432        .await
1433        .unwrap();
1434
1435        // Check last committed epoch.
1436        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1437        assert_eq!(last_epoch, 1);
1438
1439        // Simulate recovery: reopen table.
1440        let reopened_table = open_or_create_table(table_path, HashMap::new(), None)
1441            .await
1442            .unwrap();
1443
1444        // Verify we can read the last committed epoch.
1445        let recovered_epoch = get_last_committed_epoch(&reopened_table, writer_id).await;
1446        assert_eq!(recovered_epoch, 1);
1447    }
1448
1449    #[tokio::test]
1450    async fn test_multiple_epochs_sequential() {
1451        let temp_dir = TempDir::new().unwrap();
1452        let table_path = temp_dir.path().to_str().unwrap();
1453        let writer_id = "sequential-writer";
1454
1455        let schema = test_schema();
1456        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1457            .await
1458            .unwrap();
1459
1460        // Write epochs 1, 2, 3.
1461        for epoch in 1..=3 {
1462            let batch = test_batch(10);
1463            let result = write_batches(
1464                table,
1465                vec![batch],
1466                writer_id,
1467                epoch,
1468                SaveMode::Append,
1469                None,
1470                false,
1471                None,
1472                false,
1473                None,
1474            )
1475            .await
1476            .unwrap();
1477            table = result.0;
1478            assert_eq!(result.1, epoch as i64);
1479        }
1480
1481        // Final version should be 3.
1482        assert_eq!(table.version(), Some(3));
1483
1484        // Last committed epoch should be 3.
1485        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1486        assert_eq!(last_epoch, 3);
1487    }
1488
1489    #[tokio::test]
1490    async fn test_get_table_schema() {
1491        let temp_dir = TempDir::new().unwrap();
1492        let table_path = temp_dir.path().to_str().unwrap();
1493
1494        let expected_schema = test_schema();
1495        let table = open_or_create_table(table_path, HashMap::new(), Some(&expected_schema))
1496            .await
1497            .unwrap();
1498
1499        let actual_schema = get_table_schema(&table).unwrap();
1500
1501        // Verify field count and names match.
1502        assert_eq!(actual_schema.fields().len(), expected_schema.fields().len());
1503        for (expected, actual) in expected_schema.fields().iter().zip(actual_schema.fields()) {
1504            assert_eq!(expected.name(), actual.name());
1505        }
1506    }
1507
1508    #[tokio::test]
1509    async fn test_write_empty_batches() {
1510        let temp_dir = TempDir::new().unwrap();
1511        let table_path = temp_dir.path().to_str().unwrap();
1512
1513        let schema = test_schema();
1514        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1515            .await
1516            .unwrap();
1517
1518        // Write empty batch list - should be no-op.
1519        let (table, version) = write_batches(
1520            table,
1521            vec![],
1522            "test-writer",
1523            1,
1524            SaveMode::Append,
1525            None,
1526            false,
1527            None,
1528            false,
1529            None,
1530        )
1531        .await
1532        .unwrap();
1533
1534        // Version should still be 0 (no write happened).
1535        assert_eq!(version, 0);
1536        assert_eq!(table.version(), Some(0));
1537    }
1538
1539    #[tokio::test]
1540    async fn test_write_multiple_batches() {
1541        // Test writing multiple batches in a single transaction.
1542        let temp_dir = TempDir::new().unwrap();
1543        let table_path = temp_dir.path().to_str().unwrap();
1544
1545        let schema = test_schema();
1546        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1547            .await
1548            .unwrap();
1549
1550        // Write multiple batches.
1551        let batch1 = test_batch(50);
1552        let batch2 = test_batch(50);
1553        let (table, version) = write_batches(
1554            table,
1555            vec![batch1, batch2],
1556            "multi-batch-writer",
1557            1,
1558            SaveMode::Append,
1559            None,
1560            false,
1561            None,
1562            false,
1563            None,
1564        )
1565        .await
1566        .unwrap();
1567
1568        assert_eq!(version, 1);
1569        assert_eq!(table.version(), Some(1));
1570
1571        // Reopen and verify we can read the state.
1572        let reopened = open_or_create_table(table_path, HashMap::new(), None)
1573            .await
1574            .unwrap();
1575        assert_eq!(reopened.version(), Some(1));
1576    }
1577
1578    #[test]
1579    fn test_path_to_url_local() {
1580        let temp_dir = TempDir::new().unwrap();
1581        let path = temp_dir.path().to_str().unwrap();
1582
1583        let url = path_to_url(path).unwrap();
1584        assert!(url.scheme() == "file");
1585    }
1586
1587    #[test]
1588    fn test_path_to_url_s3() {
1589        let url = path_to_url("s3://my-bucket/path/to/table").unwrap();
1590        assert_eq!(url.scheme(), "s3");
1591        assert_eq!(url.host_str(), Some("my-bucket"));
1592    }
1593
1594    #[test]
1595    fn test_path_to_url_azure() {
1596        let url = path_to_url("az://my-container/path/to/table").unwrap();
1597        assert_eq!(url.scheme(), "az");
1598    }
1599
1600    #[test]
1601    fn test_path_to_url_gcs() {
1602        let url = path_to_url("gs://my-bucket/path/to/table").unwrap();
1603        assert_eq!(url.scheme(), "gs");
1604    }
1605
1606    // ── End-to-end tests for new functionality ──
1607
1608    #[tokio::test]
1609    async fn test_get_latest_version() {
1610        let temp_dir = TempDir::new().unwrap();
1611        let table_path = temp_dir.path().to_str().unwrap();
1612
1613        let schema = test_schema();
1614        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1615            .await
1616            .unwrap();
1617
1618        // Initial version is 0.
1619        let v = get_latest_version(&mut table).await.unwrap();
1620        assert_eq!(v, 0);
1621
1622        // Write a batch -> version 1.
1623        let batch = test_batch(10);
1624        let (returned_table, version) = write_batches(
1625            table,
1626            vec![batch],
1627            "writer",
1628            1,
1629            SaveMode::Append,
1630            None,
1631            false,
1632            None,
1633            false,
1634            None,
1635        )
1636        .await
1637        .unwrap();
1638        assert_eq!(version, 1);
1639        table = returned_table;
1640
1641        let v = get_latest_version(&mut table).await.unwrap();
1642        assert_eq!(v, 1);
1643    }
1644
1645    #[tokio::test]
1646    async fn test_read_batches_at_version() {
1647        let temp_dir = TempDir::new().unwrap();
1648        let table_path = temp_dir.path().to_str().unwrap();
1649
1650        let schema = test_schema();
1651        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1652            .await
1653            .unwrap();
1654
1655        // Write 50 rows at version 1.
1656        let batch = test_batch(50);
1657        let (table, _) = write_batches(
1658            table,
1659            vec![batch],
1660            "writer",
1661            1,
1662            SaveMode::Append,
1663            None,
1664            false,
1665            None,
1666            false,
1667            None,
1668        )
1669        .await
1670        .unwrap();
1671
1672        // Write 30 more rows at version 2.
1673        let batch = test_batch(30);
1674        let (_table, _) = write_batches(
1675            table,
1676            vec![batch],
1677            "writer",
1678            2,
1679            SaveMode::Append,
1680            None,
1681            false,
1682            None,
1683            false,
1684            None,
1685        )
1686        .await
1687        .unwrap();
1688
1689        // Read version 1 — should get 50 rows.
1690        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1691            .await
1692            .unwrap();
1693        let (batches, _) = read_batches_at_version(&mut read_table, 1, 10000)
1694            .await
1695            .unwrap();
1696        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1697        assert_eq!(total_rows, 50);
1698
1699        // Read version 2 — should get 80 rows (cumulative).
1700        let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1701            .await
1702            .unwrap();
1703        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1704        assert_eq!(total_rows, 80);
1705    }
1706
1707    #[tokio::test]
1708    async fn test_sink_source_roundtrip() {
1709        use super::super::delta::DeltaLakeSink;
1710        use super::super::delta_config::DeltaLakeSinkConfig;
1711        use super::super::delta_source::DeltaSource;
1712        use super::super::delta_source_config::DeltaSourceConfig;
1713        use crate::config::ConnectorConfig;
1714        use crate::connector::{SinkConnector, SourceConnector};
1715
1716        let temp_dir = TempDir::new().unwrap();
1717        let table_path = temp_dir.path().to_str().unwrap();
1718
1719        // Write data via sink.
1720        let sink_config = DeltaLakeSinkConfig::new(table_path);
1721        let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1722        let connector_config = ConnectorConfig::new("delta-lake");
1723        sink.open(&connector_config).await.unwrap();
1724
1725        sink.begin_epoch(1).await.unwrap();
1726        let batch = test_batch(25);
1727        sink.write_batch(&batch).await.unwrap();
1728        sink.pre_commit(1).await.unwrap();
1729        sink.commit_epoch(1).await.unwrap();
1730        sink.close().await.unwrap();
1731
1732        // Read data via source.
1733        let mut source_config = DeltaSourceConfig::new(table_path);
1734        source_config.starting_version = Some(0);
1735        let mut source = DeltaSource::new(source_config, None);
1736        let source_connector_config = ConnectorConfig::new("delta-lake");
1737        source.open(&source_connector_config).await.unwrap();
1738
1739        // Poll — should get version 1 data (25 rows).
1740        let result = source.poll_batch(10000).await.unwrap();
1741        assert!(result.is_some(), "should have received a batch");
1742        let total_rows: usize = {
1743            let mut rows = result.unwrap().records.num_rows();
1744            // Drain any remaining buffered batches.
1745            while let Ok(Some(batch)) = source.poll_batch(10000).await {
1746                rows += batch.records.num_rows();
1747            }
1748            rows
1749        };
1750        assert_eq!(total_rows, 25);
1751
1752        source.close().await.unwrap();
1753    }
1754
1755    #[tokio::test]
1756    async fn test_source_checkpoint_restore() {
1757        use super::super::delta_source::DeltaSource;
1758        use super::super::delta_source_config::DeltaSourceConfig;
1759        use crate::config::ConnectorConfig;
1760        use crate::connector::SourceConnector;
1761
1762        let temp_dir = TempDir::new().unwrap();
1763        let table_path = temp_dir.path().to_str().unwrap();
1764
1765        // Create table and write 2 versions.
1766        let schema = test_schema();
1767        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1768            .await
1769            .unwrap();
1770
1771        let (table, _) = write_batches(
1772            table,
1773            vec![test_batch(10)],
1774            "writer",
1775            1,
1776            SaveMode::Append,
1777            None,
1778            false,
1779            None,
1780            false,
1781            None,
1782        )
1783        .await
1784        .unwrap();
1785        let (_table, _) = write_batches(
1786            table,
1787            vec![test_batch(20)],
1788            "writer",
1789            2,
1790            SaveMode::Append,
1791            None,
1792            false,
1793            None,
1794            false,
1795            None,
1796        )
1797        .await
1798        .unwrap();
1799
1800        // Open source starting from version 0. The source jumps to the
1801        // latest version (2) in a single poll, reading the full snapshot.
1802        let mut source_config = DeltaSourceConfig::new(table_path);
1803        source_config.starting_version = Some(0);
1804        let mut source = DeltaSource::new(source_config.clone(), None);
1805        let connector_config = ConnectorConfig::new("delta-lake");
1806        source.open(&connector_config).await.unwrap();
1807
1808        // Poll to consume latest version (2).
1809        let _ = source.poll_batch(10000).await.unwrap();
1810        // Drain buffered.
1811        while let Ok(Some(_)) = source.poll_batch(10000).await {}
1812
1813        // Checkpoint reflects the fully-consumed latest version.
1814        let cp = source.checkpoint();
1815        assert_eq!(cp.get_offset("delta_version"), Some("2"));
1816        source.close().await.unwrap();
1817
1818        // Restore from checkpoint — should resume at version 2.
1819        let mut source2 = DeltaSource::new(source_config, None);
1820        source2.open(&connector_config).await.unwrap();
1821        source2.restore(&cp).await.unwrap();
1822
1823        assert_eq!(source2.current_version(), 2);
1824
1825        // No new data — already at latest.
1826        let result = source2.poll_batch(10000).await.unwrap();
1827        assert!(result.is_none());
1828
1829        source2.close().await.unwrap();
1830    }
1831
1832    #[tokio::test]
1833    async fn test_auto_flush_writes_data() {
1834        use super::super::delta::DeltaLakeSink;
1835        use super::super::delta_config::DeltaLakeSinkConfig;
1836        use crate::config::ConnectorConfig;
1837        use crate::connector::SinkConnector;
1838
1839        let temp_dir = TempDir::new().unwrap();
1840        let table_path = temp_dir.path().to_str().unwrap();
1841
1842        // Configure a small buffer to trigger auto-flush.
1843        let mut sink_config = DeltaLakeSinkConfig::new(table_path);
1844        sink_config.max_buffer_records = 10;
1845        let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1846
1847        let connector_config = ConnectorConfig::new("delta-lake");
1848        sink.open(&connector_config).await.unwrap();
1849
1850        sink.begin_epoch(1).await.unwrap();
1851
1852        // Write 25 rows — should trigger auto-flush after 10.
1853        let batch = test_batch(25);
1854        sink.write_batch(&batch).await.unwrap();
1855
1856        // Commit the rest.
1857        sink.pre_commit(1).await.unwrap();
1858        sink.commit_epoch(1).await.unwrap();
1859        sink.close().await.unwrap();
1860
1861        // Verify all 25 rows are in the Delta table.
1862        let mut table = open_or_create_table(table_path, HashMap::new(), None)
1863            .await
1864            .unwrap();
1865        let latest = get_latest_version(&mut table).await.unwrap();
1866        assert!(latest >= 1, "should have at least 1 version");
1867
1868        let (batches, _) = read_batches_at_version(&mut table, latest, 10000)
1869            .await
1870            .unwrap();
1871        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1872        assert_eq!(
1873            total_rows, 25,
1874            "all 25 rows should be written, not dropped by auto-flush"
1875        );
1876    }
1877
1878    #[tokio::test]
1879    async fn test_sink_exactly_once_epoch() {
1880        let temp_dir = TempDir::new().unwrap();
1881        let table_path = temp_dir.path().to_str().unwrap();
1882        let writer_id = "exactly-once-test";
1883
1884        let schema = test_schema();
1885        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1886            .await
1887            .unwrap();
1888
1889        // Write epoch 1 with 10 rows.
1890        let (table, v1) = write_batches(
1891            table,
1892            vec![test_batch(10)],
1893            writer_id,
1894            1,
1895            SaveMode::Append,
1896            None,
1897            false,
1898            None,
1899            false,
1900            None,
1901        )
1902        .await
1903        .unwrap();
1904        assert_eq!(v1, 1);
1905
1906        // Write epoch 2 with 15 rows using the same writer.
1907        let (table, v2) = write_batches(
1908            table,
1909            vec![test_batch(15)],
1910            writer_id,
1911            2,
1912            SaveMode::Append,
1913            None,
1914            false,
1915            None,
1916            false,
1917            None,
1918        )
1919        .await
1920        .unwrap();
1921        assert_eq!(v2, 2);
1922
1923        // Verify the last committed epoch is 2.
1924        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1925        assert_eq!(last_epoch, 2);
1926
1927        // Verify total rows = 25 (10 + 15).
1928        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1929            .await
1930            .unwrap();
1931        let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1932            .await
1933            .unwrap();
1934        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1935        assert_eq!(total_rows, 25);
1936    }
1937
1938    #[tokio::test]
1939    async fn test_schema_evolution_adds_column() {
1940        let temp_dir = TempDir::new().unwrap();
1941        let table_path = temp_dir.path().to_str().unwrap();
1942
1943        // Create table with 2-column schema.
1944        let schema_v1 = Arc::new(Schema::new(vec![
1945            Field::new("id", DataType::Int64, false),
1946            Field::new("name", DataType::Utf8, true),
1947        ]));
1948        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema_v1))
1949            .await
1950            .unwrap();
1951
1952        // Write batch with 2 columns.
1953        let batch_v1 = RecordBatch::try_new(
1954            schema_v1.clone(),
1955            vec![
1956                Arc::new(Int64Array::from(vec![1, 2])),
1957                Arc::new(StringArray::from(vec!["a", "b"])),
1958            ],
1959        )
1960        .unwrap();
1961        let (table, _) = write_batches(
1962            table,
1963            vec![batch_v1],
1964            "evo-writer",
1965            1,
1966            SaveMode::Append,
1967            None,
1968            true, // schema_evolution enabled
1969            None,
1970            false,
1971            None,
1972        )
1973        .await
1974        .unwrap();
1975
1976        // Write batch with 3 columns (extra "score" column).
1977        let schema_v2 = Arc::new(Schema::new(vec![
1978            Field::new("id", DataType::Int64, false),
1979            Field::new("name", DataType::Utf8, true),
1980            Field::new("score", DataType::Float64, true),
1981        ]));
1982        let batch_v2 = RecordBatch::try_new(
1983            schema_v2,
1984            vec![
1985                Arc::new(Int64Array::from(vec![3])),
1986                Arc::new(StringArray::from(vec!["c"])),
1987                Arc::new(Float64Array::from(vec![99.5])),
1988            ],
1989        )
1990        .unwrap();
1991        let (table, _) = write_batches(
1992            table,
1993            vec![batch_v2],
1994            "evo-writer",
1995            2,
1996            SaveMode::Append,
1997            None,
1998            true,
1999            None,
2000            false,
2001            None,
2002        )
2003        .await
2004        .unwrap();
2005
2006        // Verify table schema now has all 3 columns.
2007        let final_schema = get_table_schema(&table).unwrap();
2008        assert_eq!(final_schema.fields().len(), 3);
2009        assert_eq!(final_schema.field(0).name(), "id");
2010        assert_eq!(final_schema.field(1).name(), "name");
2011        assert_eq!(final_schema.field(2).name(), "score");
2012
2013        // Verify all rows are readable.
2014        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
2015            .await
2016            .unwrap();
2017        let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
2018            .await
2019            .unwrap();
2020        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2021        assert_eq!(total_rows, 3);
2022    }
2023
2024    #[tokio::test]
2025    async fn test_compaction_reduces_files() {
2026        let temp_dir = TempDir::new().unwrap();
2027        let table_path = temp_dir.path().to_str().unwrap();
2028
2029        let schema = test_schema();
2030        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
2031            .await
2032            .unwrap();
2033
2034        // Write 10 small batches (1 file each = 10 versions).
2035        for epoch in 1..=10u64 {
2036            let batch = test_batch(5);
2037            let (t, _) = write_batches(
2038                table,
2039                vec![batch],
2040                "compaction-writer",
2041                epoch,
2042                SaveMode::Append,
2043                None,
2044                false,
2045                None,
2046                false,
2047                None,
2048            )
2049            .await
2050            .unwrap();
2051            table = t;
2052        }
2053
2054        // Count Parquet files before compaction.
2055        let parquet_before: Vec<_> = std::fs::read_dir(temp_dir.path())
2056            .unwrap()
2057            .filter_map(Result::ok)
2058            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2059            .collect();
2060        assert!(
2061            parquet_before.len() >= 10,
2062            "should have at least 10 Parquet files before compaction, got {}",
2063            parquet_before.len()
2064        );
2065
2066        // Run compaction.
2067        let (table, result) = run_compaction(table, 128 * 1024 * 1024, &[], None)
2068            .await
2069            .unwrap();
2070        assert!(
2071            result.files_removed > 0,
2072            "compaction should have removed files"
2073        );
2074
2075        // Compaction itself proves file reduction via metrics.
2076        // Vacuum is not tested here — delta-rs enforces a 168h minimum
2077        // retention, and test files are too fresh to be vacuumed.
2078        drop(table);
2079    }
2080}