Skip to main content

laminar_connectors/lakehouse/
delta_io.rs

1//! Delta Lake I/O integration module.
2//!
3//! This module provides the actual I/O operations for Delta Lake tables via the
4//! `deltalake` crate. All functions are feature-gated behind `delta-lake`.
5//!
6//! # Architecture
7//!
8//! The I/O module is separate from the business logic in [`delta.rs`](super::delta)
9//! to allow:
10//! - Testing business logic without the `deltalake` dependency
11//! - Clean separation of concerns (buffering/epoch management vs. actual writes)
12//! - Easy mocking for unit tests
13//!
14//! # Exactly-Once Semantics
15//!
16//! Delta Lake's transaction log supports application-level transaction metadata
17//! via the `txn` action. We use this to store `(writer_id, epoch)` pairs, enabling
18//! exactly-once semantics:
19//!
20//! 1. On recovery, read `txn` metadata to find the last committed epoch for this writer
21//! 2. Skip epochs <= last committed (idempotent replay)
22//! 3. Each write includes the epoch in `txn` metadata
23
24#[cfg(feature = "delta-lake")]
25use std::collections::HashMap;
26
27#[cfg(feature = "delta-lake")]
28use std::sync::Arc;
29
30#[cfg(feature = "delta-lake")]
31use arrow_array::RecordBatch;
32
33#[cfg(feature = "delta-lake")]
34use arrow_schema::SchemaRef;
35
36// delta_kernel's TryIntoKernel trait is re-exported via deltalake.
37#[cfg(feature = "delta-lake")]
38use deltalake::kernel::engine::arrow_conversion::TryIntoKernel as _;
39
40#[cfg(feature = "delta-lake")]
41use deltalake::kernel::transaction::CommitProperties;
42
43#[cfg(feature = "delta-lake")]
44use deltalake::kernel::Transaction;
45
46#[cfg(feature = "delta-lake")]
47use deltalake::operations::write::SchemaMode;
48
49#[cfg(feature = "delta-lake")]
50use deltalake::protocol::SaveMode;
51
52#[cfg(feature = "delta-lake")]
53use deltalake::DeltaTable;
54
55#[cfg(feature = "delta-lake")]
56use tracing::{debug, info, warn};
57
58#[cfg(feature = "delta-lake")]
59use url::Url;
60
61#[cfg(feature = "delta-lake")]
62use crate::error::ConnectorError;
63
64/// Converts a path string to a URL.
65#[cfg(feature = "delta-lake")]
66fn path_to_url(path: &str) -> Result<Url, ConnectorError> {
67    // If it already looks like a URL, parse it directly.
68    if path.contains("://") {
69        Url::parse(path)
70            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid URL '{path}': {e}")))
71    } else {
72        // Local path - convert to file URL.
73        // First canonicalize if it exists, otherwise use as-is.
74        let path_buf = std::path::Path::new(path);
75        let normalized = if path_buf.exists() {
76            std::fs::canonicalize(path_buf).map_err(|e| {
77                ConnectorError::ConfigurationError(format!("invalid path '{path}': {e}"))
78            })?
79        } else {
80            // For new tables, the path might not exist yet.
81            // Use absolute path if possible.
82            if path_buf.is_absolute() {
83                path_buf.to_path_buf()
84            } else {
85                std::env::current_dir()
86                    .map_err(|e| {
87                        ConnectorError::ConfigurationError(format!("cannot get current dir: {e}"))
88                    })?
89                    .join(path_buf)
90            }
91        };
92
93        Url::from_directory_path(&normalized).map_err(|()| {
94            ConnectorError::ConfigurationError(format!(
95                "cannot convert path to URL: {}",
96                normalized.display()
97            ))
98        })
99    }
100}
101
102/// Opens an existing Delta Lake table or creates a new one.
103///
104/// # Arguments
105///
106/// * `table_path` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
107/// * `storage_options` - Storage credentials and configuration
108/// * `schema` - Optional Arrow schema for table creation (required if table doesn't exist)
109///
110/// # Returns
111///
112/// The opened `DeltaTable` handle.
113///
114/// # Errors
115///
116/// Returns `ConnectorError::ConnectionFailed` if the table cannot be opened or created.
117#[cfg(feature = "delta-lake")]
118#[allow(clippy::implicit_hasher)]
119pub async fn open_or_create_table(
120    table_path: &str,
121    storage_options: HashMap<String, String>,
122    schema: Option<&SchemaRef>,
123) -> Result<DeltaTable, ConnectorError> {
124    info!(table_path, "opening Delta Lake table");
125
126    let url = path_to_url(table_path)?;
127
128    // Try to open or initialize the table. `url` and `storage_options` are
129    // not referenced after this call, so move rather than clone — a cloned
130    // HashMap per compaction tick / table reopen was pure overhead.
131    let table = DeltaTable::try_from_url_with_storage_options(url, storage_options)
132        .await
133        .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to open table: {e}")))?;
134
135    // Check if the table is initialized (has state).
136    if table.version().is_some() {
137        info!(
138            table_path,
139            version = table.version(),
140            "opened existing Delta Lake table"
141        );
142        return Ok(table);
143    }
144
145    // Table doesn't exist — create if we have a schema, otherwise defer to first write_batch().
146    let Some(schema) = schema else {
147        info!(
148            table_path,
149            "table does not exist yet; will create on first write"
150        );
151        return Ok(table);
152    };
153
154    info!(table_path, "creating new Delta Lake table");
155
156    // Convert Arrow schema to Delta Lake schema using TryIntoKernel.
157    let delta_schema: deltalake::kernel::StructType = schema
158        .as_ref()
159        .try_into_kernel()
160        .map_err(|e| ConnectorError::SchemaMismatch(format!("schema conversion failed: {e}")))?;
161
162    // Create the table.
163    let table = table
164        .create()
165        .with_columns(delta_schema.fields().cloned())
166        .await
167        .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to create table: {e}")))?;
168
169    info!(
170        table_path,
171        version = table.version(),
172        "created new Delta Lake table"
173    );
174
175    Ok(table)
176}
177
178/// Writes batches to a Delta Lake table with exactly-once semantics.
179///
180/// # Arguments
181///
182/// * `table` - The Delta Lake table handle (consumed and returned)
183/// * `batches` - Record batches to write
184/// * `writer_id` - Unique writer identifier for exactly-once deduplication
185/// * `epoch` - The epoch number for this write (stored in txn metadata)
186/// * `save_mode` - Delta Lake save mode (Append, Overwrite, etc.)
187/// * `partition_columns` - Optional partition column name slice
188/// * `schema_evolution` - If true, auto-merge new columns into the table schema
189///
190/// # Returns
191///
192/// A tuple of (updated table handle, new Delta version).
193///
194/// # Errors
195///
196/// Returns `ConnectorError::WriteError` if the write fails.
197#[cfg(feature = "delta-lake")]
198#[allow(clippy::too_many_arguments)]
199pub async fn write_batches(
200    table: DeltaTable,
201    batches: Vec<RecordBatch>,
202    writer_id: &str,
203    epoch: u64,
204    save_mode: SaveMode,
205    partition_columns: Option<&[String]>,
206    schema_evolution: bool,
207    target_file_size: Option<usize>,
208    create_checkpoint: bool,
209    writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
210) -> Result<(DeltaTable, i64), ConnectorError> {
211    if batches.is_empty() {
212        debug!("no batches to write, skipping");
213        let version = table.version().unwrap_or(0);
214        return Ok((table, version));
215    }
216
217    let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
218
219    debug!(
220        writer_id,
221        epoch,
222        total_rows,
223        num_batches = batches.len(),
224        "writing batches to Delta Lake"
225    );
226
227    // Build the write operation with transaction metadata for exactly-once.
228    // Note: Delta Lake uses i64 for epoch, but our API uses u64. This is safe
229    // as epochs won't exceed i64::MAX in practice.
230    #[allow(clippy::cast_possible_wrap)]
231    let epoch_i64 = epoch as i64;
232
233    let mut write_builder = table
234        .write(batches)
235        .with_save_mode(save_mode)
236        .with_commit_properties(
237            CommitProperties::default()
238                .with_application_transaction(Transaction::new(writer_id, epoch_i64))
239                .with_create_checkpoint(create_checkpoint),
240        );
241
242    // Forward target file size to delta-rs so Parquet files match the
243    // user's configured size, not just the internal default.
244    if let Some(size) = target_file_size {
245        write_builder = write_builder.with_target_file_size(size);
246    }
247
248    // Enable schema evolution (additive column merge) if requested.
249    if schema_evolution {
250        write_builder = write_builder.with_schema_mode(SchemaMode::Merge);
251    }
252
253    // Add partition columns if specified.
254    if let Some(cols) = partition_columns {
255        if !cols.is_empty() {
256            write_builder = write_builder.with_partition_columns(cols.to_vec());
257        }
258    }
259
260    if let Some(props) = writer_properties {
261        write_builder = write_builder.with_writer_properties(props);
262    }
263
264    // Execute the write.
265    let table = write_builder
266        .await
267        .map_err(|e| ConnectorError::WriteError(format!("Delta Lake write failed: {e}")))?;
268
269    let version = table.version().unwrap_or(0);
270
271    info!(
272        writer_id,
273        epoch, version, total_rows, "committed Delta Lake transaction"
274    );
275
276    Ok((table, version))
277}
278
279/// Retrieves the last committed epoch for a writer from Delta Lake's txn metadata.
280///
281/// This is used for exactly-once recovery: on startup, we check what epoch was
282/// last committed and skip any epochs <= that value.
283///
284/// # Arguments
285///
286/// * `table` - The Delta Lake table handle
287/// * `writer_id` - The writer identifier to look up
288///
289/// # Returns
290///
291/// The last committed epoch for this writer, or 0 if no commits found.
292#[cfg(feature = "delta-lake")]
293pub async fn get_last_committed_epoch(table: &DeltaTable, writer_id: &str) -> u64 {
294    // Query the table's application transaction version.
295    let Ok(snapshot) = table.snapshot() else {
296        debug!(writer_id, "no snapshot available, assuming epoch 0");
297        return 0;
298    };
299
300    match snapshot
301        .transaction_version(&table.log_store(), writer_id)
302        .await
303    {
304        Ok(Some(version)) => {
305            // Note: Delta Lake uses i64 for version, but our epoch is u64.
306            // Versions are always non-negative, so this is safe.
307            #[allow(clippy::cast_sign_loss)]
308            let epoch = version as u64;
309            debug!(
310                writer_id,
311                epoch, "found last committed epoch from txn metadata"
312            );
313            epoch
314        }
315        Ok(None) => {
316            debug!(
317                writer_id,
318                "no txn metadata found for writer, assuming epoch 0"
319            );
320            0
321        }
322        Err(e) => {
323            warn!(writer_id, error = %e, "failed to read txn metadata, assuming epoch 0");
324            0
325        }
326    }
327}
328
329/// Extracts the Arrow schema from a Delta Lake table.
330///
331/// # Arguments
332///
333/// * `table` - The Delta Lake table handle
334///
335/// # Returns
336///
337/// The table's Arrow schema.
338///
339/// # Errors
340///
341/// Returns `ConnectorError::SchemaMismatch` if schema extraction fails.
342#[cfg(feature = "delta-lake")]
343pub fn get_table_schema(table: &DeltaTable) -> Result<SchemaRef, ConnectorError> {
344    let state = table
345        .snapshot()
346        .map_err(|e| ConnectorError::SchemaMismatch(format!("table has no snapshot: {e}")))?;
347
348    // Use the pre-computed Arrow schema from the EagerSnapshot.
349    Ok(state.snapshot().arrow_schema())
350}
351
352/// Returns the latest committed version via the log store.
353///
354/// # Errors
355///
356/// Returns `ConnectorError::ReadError` on failure.
357#[cfg(feature = "delta-lake")]
358pub async fn get_latest_version(table: &mut DeltaTable) -> Result<i64, ConnectorError> {
359    let log_store = table.log_store();
360    let current = table.version().unwrap_or(0);
361    log_store
362        .get_latest_version(current)
363        .await
364        .map_err(|e| ConnectorError::ReadError(format!("failed to get latest version: {e}")))
365}
366
367/// Reads record batches from a specific Delta Lake table version.
368///
369/// Loads the requested version, applies a `LIMIT` to bound memory usage,
370/// then streams results via `execute_stream` to avoid materializing the
371/// entire version in memory.
372///
373/// # Arguments
374///
375/// * `table` - Mutable reference to the Delta Lake table handle
376/// * `version` - The table version to read
377/// * `max_records` - Maximum number of records to return. Pass `usize::MAX`
378///   to read all records (unbounded).
379///
380/// # Errors
381///
382/// Returns `ConnectorError::ReadError` if the version cannot be loaded or scanned.
383///
384/// Returns `(batches, fully_consumed)` — `fully_consumed` is `false` when
385/// `max_records` truncated the result and more rows remain.
386#[cfg(feature = "delta-lake")]
387pub async fn read_batches_at_version(
388    table: &mut DeltaTable,
389    version: i64,
390    max_records: usize,
391) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
392    use datafusion::prelude::SessionContext;
393    use tokio_stream::StreamExt;
394
395    // Load the specific version.
396    table
397        .load_version(version)
398        .await
399        .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
400
401    debug!(version, "Delta Lake: loaded version for reading");
402
403    // Build a DeltaTableProvider via the builder and register it with DataFusion.
404    let provider =
405        table.table_provider().build().await.map_err(|e| {
406            ConnectorError::ReadError(format!("failed to build table provider: {e}"))
407        })?;
408
409    let ctx = SessionContext::new();
410    ctx.register_table("delta_source_scan", Arc::new(provider))
411        .map_err(|e| ConnectorError::ReadError(format!("failed to register scan table: {e}")))?;
412
413    // Apply LIMIT to bound memory: prevents OOM on large versions.
414    let df = ctx
415        .sql("SELECT * FROM delta_source_scan")
416        .await
417        .map_err(|e| ConnectorError::ReadError(format!("scan query failed: {e}")))?;
418
419    let df = if max_records < usize::MAX {
420        df.limit(0, Some(max_records))
421            .map_err(|e| ConnectorError::ReadError(format!("limit failed: {e}")))?
422    } else {
423        df
424    };
425
426    // Stream results instead of collect() to avoid materializing everything.
427    let mut stream = df
428        .execute_stream()
429        .await
430        .map_err(|e| ConnectorError::ReadError(format!("stream execution failed: {e}")))?;
431
432    let mut batches = Vec::new();
433    let mut total_rows: usize = 0;
434
435    while let Some(result) = stream.next().await {
436        let batch =
437            result.map_err(|e| ConnectorError::ReadError(format!("stream batch failed: {e}")))?;
438        if batch.num_rows() == 0 {
439            continue;
440        }
441        total_rows += batch.num_rows();
442        batches.push(batch);
443
444        // Respect max_records even between DataFusion batches.
445        if total_rows >= max_records {
446            break;
447        }
448    }
449
450    // If we stopped due to max_records, probe whether the stream has more.
451    // Without this, a version with exactly max_records rows would be
452    // misclassified as truncated and re-read forever.
453    let fully_consumed = if total_rows >= max_records {
454        stream.next().await.is_none()
455    } else {
456        true
457    };
458
459    debug!(
460        version,
461        num_batches = batches.len(),
462        total_rows,
463        fully_consumed,
464        "Delta Lake: scanned version"
465    );
466
467    Ok((batches, fully_consumed))
468}
469
470/// Reads only the rows added in a specific Delta Lake version.
471///
472/// Parses `_delta_log/{version:020}.json` for `add` actions, then reads
473/// only those Parquet files via the table's object store. This is
474/// `O(new_files)` per version, not `O(table_size)`.
475///
476/// For version 0, delegates to [`read_batches_at_version`] (full snapshot).
477///
478/// # Errors
479///
480/// Returns `ConnectorError::ReadError` if the version cannot be loaded or read.
481///
482/// Returns `(batches, fully_consumed)` — see [`read_batches_at_version`].
483#[cfg(feature = "delta-lake")]
484#[allow(clippy::too_many_lines)]
485pub async fn read_version_diff(
486    table: &mut DeltaTable,
487    version: i64,
488    max_records: usize,
489    partition_filter: Option<&str>,
490) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
491    // Maximum file size (256 MB) for direct in-memory Parquet reads.
492    // Files larger than this fall back to DataFusion's streaming scan.
493    const MAX_DIRECT_READ_BYTES: u64 = 256 * 1024 * 1024;
494
495    // For version 0, read the full snapshot (no previous version to diff).
496    if version <= 0 {
497        return read_batches_at_version(table, version, max_records).await;
498    }
499
500    // Read the commit JSON via delta-rs's LogStore API (handles path
501    // resolution, checkpoints, and retries correctly).
502    let log_store = table.log_store();
503    let store = log_store.object_store(None);
504
505    let commit_data = log_store
506        .read_commit_entry(version)
507        .await
508        .map_err(|e| ConnectorError::ReadError(format!("read commit {version}: {e}")))?
509        .ok_or_else(|| {
510            ConnectorError::ReadError(format!(
511                "version {version} not available (cleaned up or never existed)"
512            ))
513        })?;
514    let commit_str = std::str::from_utf8(&commit_data)
515        .map_err(|e| ConnectorError::ReadError(format!("commit log is not valid UTF-8: {e}")))?;
516
517    // Each line in the commit JSON is a separate action object.
518    // Collect both add and remove actions to compute the net-new files.
519    let mut added_paths = Vec::new();
520    let mut removed_paths = std::collections::HashSet::new();
521    for line in commit_str.lines() {
522        let line = line.trim();
523        if line.is_empty() {
524            continue;
525        }
526        if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
527            if let Some(add) = obj.get("add") {
528                if let Some(path) = add.get("path").and_then(|p| p.as_str()) {
529                    added_paths.push(decode_delta_path(path));
530                }
531            }
532            if let Some(remove) = obj.get("remove") {
533                if let Some(path) = remove.get("path").and_then(|p| p.as_str()) {
534                    removed_paths.insert(decode_delta_path(path));
535                }
536            }
537        }
538    }
539
540    // Exclude any added file whose path also appears in a remove action.
541    added_paths.retain(|p| !removed_paths.contains(p));
542
543    if added_paths.is_empty() {
544        debug!(
545            version,
546            num_removed = removed_paths.len(),
547            "Delta Lake: no net-new add actions in version"
548        );
549        return Ok((Vec::new(), true));
550    }
551
552    debug!(
553        version,
554        num_added_files = added_paths.len(),
555        num_removed_files = removed_paths.len(),
556        "Delta Lake: reading added files"
557    );
558
559    // Load the version so we have the correct schema.
560    table
561        .load_version(version)
562        .await
563        .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
564
565    let table_schema = table
566        .snapshot()
567        .map(|s| s.snapshot().arrow_schema())
568        .map_err(|e| ConnectorError::ReadError(format!("no snapshot at version {version}: {e}")))?;
569
570    // Filter file paths by partition predicate if provided.
571    // Supports simple Hive-style equality: "col = 'val'" matches "col=val/" in path.
572    let added_paths = if let Some(filter) = partition_filter {
573        filter_paths_by_partition(&added_paths, filter)
574    } else {
575        added_paths
576    };
577
578    // Read each added Parquet file as raw bytes via delta-rs's object_store,
579    // then parse with parquet's in-memory ArrowReaderBuilder (avoids the
580    // object_store 0.12 vs 0.13 version mismatch).
581    let mut batches = Vec::new();
582    let mut total_rows: usize = 0;
583
584    for file_path in &added_paths {
585        if total_rows >= max_records {
586            break;
587        }
588
589        let obj_path = deltalake::Path::from(file_path.as_str());
590
591        // Check file size before downloading. Large files fall back to
592        // DataFusion scan to avoid OOM on multi-GB Parquet files.
593        let file_meta = store
594            .head(&obj_path)
595            .await
596            .map_err(|e| ConnectorError::ReadError(format!("failed to stat '{file_path}': {e}")))?;
597        if file_meta.size > MAX_DIRECT_READ_BYTES {
598            warn!(
599                file_path,
600                file_size = file_meta.size,
601                "file too large for direct read, falling back to DataFusion scan"
602            );
603            return read_batches_at_version(table, version, max_records).await;
604        }
605
606        let file_bytes = get_with_retry(&store, &obj_path, file_path).await?;
607
608        let parquet_reader =
609            deltalake::parquet::arrow::arrow_reader::ArrowReaderBuilder::try_new(file_bytes)
610                .map_err(|e| {
611                    ConnectorError::ReadError(format!(
612                        "failed to open Parquet file '{file_path}': {e}"
613                    ))
614                })?;
615
616        // Read one extra row to probe whether the version is fully consumed.
617        let remaining = max_records.saturating_sub(total_rows).saturating_add(1);
618        let reader = parquet_reader.with_limit(remaining).build().map_err(|e| {
619            ConnectorError::ReadError(format!("failed to build reader for '{file_path}': {e}"))
620        })?;
621
622        for result in reader {
623            let batch: RecordBatch = result.map_err(|e| {
624                ConnectorError::ReadError(format!("Parquet read error in '{file_path}': {e}"))
625            })?;
626            if batch.num_rows() == 0 {
627                continue;
628            }
629
630            // Align the batch schema to the table schema (added files may
631            // predate schema evolution and have fewer columns).
632            let batch = if batch.schema() == table_schema {
633                batch
634            } else {
635                align_batch_to_schema(&batch, &table_schema)?
636            };
637
638            total_rows += batch.num_rows();
639            batches.push(batch);
640
641            if total_rows >= max_records {
642                break;
643            }
644        }
645    }
646
647    // We probed one extra row per file. If total_rows > max_records, there's
648    // more data — trim the excess and report not fully consumed.
649    let fully_consumed = total_rows <= max_records;
650    if !fully_consumed {
651        // Trim the last batch to remove the probe row(s).
652        let excess = total_rows - max_records;
653        let len = batches.len();
654        if len > 0 {
655            let last = &batches[len - 1];
656            if last.num_rows() > excess {
657                batches[len - 1] = last.slice(0, last.num_rows() - excess);
658            } else {
659                batches.pop();
660            }
661        }
662    }
663
664    debug!(
665        version,
666        num_batches = batches.len(),
667        fully_consumed,
668        num_added_files = added_paths.len(),
669        "Delta Lake: read version diff"
670    );
671
672    Ok((batches, fully_consumed))
673}
674
675/// Reads a file from `object_store` with retry (3x, exponential backoff).
676/// Does not retry 404s.
677#[cfg(feature = "delta-lake")]
678async fn get_with_retry(
679    store: &Arc<dyn deltalake::ObjectStore>,
680    path: &deltalake::Path,
681    display_path: &str,
682) -> Result<bytes::Bytes, ConnectorError> {
683    let backoff = [200u64, 1000, 4000];
684    let mut last_err = None;
685
686    for attempt in 0..=backoff.len() {
687        match store.get(path).await {
688            Ok(result) => {
689                return result.bytes().await.map_err(|e| {
690                    ConnectorError::ReadError(format!(
691                        "failed to read bytes of '{display_path}': {e}"
692                    ))
693                });
694            }
695            Err(e) => {
696                let msg = e.to_string();
697                if msg.contains("not found") || msg.contains("404") {
698                    return Err(ConnectorError::ReadError(format!(
699                        "file not found '{display_path}': {e}"
700                    )));
701                }
702                if let Some(&delay) = backoff.get(attempt) {
703                    warn!(
704                        attempt = attempt + 1,
705                        delay_ms = delay,
706                        error = %e,
707                        path = display_path,
708                        "object_store read failed, retrying"
709                    );
710                    tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
711                }
712                last_err = Some(e);
713            }
714        }
715    }
716
717    Err(ConnectorError::ReadError(format!(
718        "failed to read '{display_path}' after {} attempts: {}",
719        backoff.len() + 1,
720        last_err.map_or_else(|| "unknown".to_string(), |e| e.to_string())
721    )))
722}
723
724/// Filters file paths by a Hive-style partition predicate.
725///
726/// Supports simple equality predicates: `col = 'val'` matches paths
727/// containing `col=val/`. Multiple predicates joined by `AND` are all
728/// required to match. Predicates that can't be parsed are ignored
729/// (all paths pass through).
730#[cfg(feature = "delta-lake")]
731fn filter_paths_by_partition(paths: &[String], filter: &str) -> Vec<String> {
732    // Parse simple "col = 'val'" or "col = val" predicates from AND-joined expressions.
733    let mut required_segments: Vec<String> = Vec::new();
734    for clause in filter
735        .split_whitespace()
736        .collect::<Vec<_>>()
737        .join(" ")
738        .split(" AND ")
739    {
740        let clause = clause.trim();
741        if let Some((col, val)) = clause.split_once('=') {
742            let col = col.trim();
743            let val = val.trim().trim_matches('\'').trim_matches('"');
744            if !col.is_empty() && !val.is_empty() {
745                required_segments.push(format!("{col}={val}"));
746            }
747        }
748    }
749
750    if required_segments.is_empty() {
751        return paths.to_vec();
752    }
753
754    paths
755        .iter()
756        .filter(|path| required_segments.iter().all(|seg| path.contains(seg)))
757        .cloned()
758        .collect()
759}
760
761/// Percent-decodes a file path from a Delta Lake commit JSON.
762///
763/// Delta Lake spec requires paths in `add`/`remove` actions to be
764/// percent-encoded (e.g., `part%3D1/file.parquet` for `part=1/file.parquet`).
765#[cfg(feature = "delta-lake")]
766fn decode_delta_path(encoded: &str) -> String {
767    url::Url::parse(&format!("file:///{encoded}")).map_or_else(
768        |_| encoded.to_string(),
769        |u| {
770            let p = u.path();
771            p.strip_prefix('/').unwrap_or(p).to_string()
772        },
773    )
774}
775
776/// Aligns a `RecordBatch` to a target schema by adding null columns for
777/// missing fields. Used when reading Parquet files that predate schema
778/// evolution (fewer columns than the current table schema).
779#[cfg(feature = "delta-lake")]
780fn align_batch_to_schema(
781    batch: &RecordBatch,
782    target_schema: &SchemaRef,
783) -> Result<RecordBatch, ConnectorError> {
784    use arrow_array::new_null_array;
785
786    let mut columns = Vec::with_capacity(target_schema.fields().len());
787    for field in target_schema.fields() {
788        if let Ok(col_idx) = batch.schema().index_of(field.name()) {
789            columns.push(batch.column(col_idx).clone());
790        } else {
791            columns.push(new_null_array(field.data_type(), batch.num_rows()));
792        }
793    }
794
795    RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| {
796        ConnectorError::ReadError(format!("failed to align batch to table schema: {e}"))
797    })
798}
799
800/// Reads CDF batches for a version range via `scan_cdf()`.
801///
802/// `scan_cdf(self)` consumes the `DeltaTable` — caller must re-open afterward.
803/// Output includes `_change_type`, `_commit_version`, `_commit_timestamp`.
804///
805/// # Errors
806///
807/// Returns `ConnectorError::ReadError` on scan failure.
808#[cfg(feature = "delta-lake")]
809pub async fn read_cdf_batches(
810    table: DeltaTable,
811    start_version: i64,
812    end_version: i64,
813) -> Result<Vec<RecordBatch>, ConnectorError> {
814    use datafusion::prelude::SessionContext;
815    use tokio_stream::StreamExt;
816
817    debug!(start_version, end_version, "reading CDF batches");
818
819    let ctx = SessionContext::new();
820
821    // Clone session state so the RwLockReadGuard is dropped before await.
822    let session_state = ctx.state();
823
824    let cdf_builder = table
825        .scan_cdf()
826        .with_starting_version(start_version)
827        .with_ending_version(end_version);
828
829    let plan = cdf_builder
830        .build(&session_state, None)
831        .await
832        .map_err(|e| ConnectorError::ReadError(format!("CDF scan build failed: {e}")))?;
833
834    // Execute the plan via DataFusion to get record batches.
835    let task_ctx = ctx.task_ctx();
836    let mut stream = datafusion::physical_plan::execute_stream(plan, task_ctx)
837        .map_err(|e| ConnectorError::ReadError(format!("CDF stream execution failed: {e}")))?;
838
839    let mut batches = Vec::new();
840    while let Some(result) = stream.next().await {
841        let batch: RecordBatch = result
842            .map_err(|e| ConnectorError::ReadError(format!("CDF stream batch failed: {e}")))?;
843        if batch.num_rows() > 0 {
844            batches.push(batch);
845        }
846    }
847
848    debug!(
849        start_version,
850        end_version,
851        num_batches = batches.len(),
852        "CDF scan complete"
853    );
854
855    Ok(batches)
856}
857
858/// Maps CDF `_change_type` → `_op` (`I`/`U`/`D`), drops `update_preimage`
859/// rows and CDF metadata columns (`_change_type`, `_commit_version`,
860/// `_commit_timestamp`). Returns `None` if all rows were preimages.
861///
862/// # Errors
863///
864/// Returns `ConnectorError::ReadError` on Arrow operation failure.
865#[cfg(feature = "delta-lake")]
866pub fn map_cdf_to_changelog(batch: &RecordBatch) -> Result<Option<RecordBatch>, ConnectorError> {
867    use arrow_array::StringArray;
868
869    let schema = batch.schema();
870    let Ok(ct_idx) = schema.index_of("_change_type") else {
871        return Ok(Some(batch.clone()));
872    };
873
874    let change_type = batch
875        .column(ct_idx)
876        .as_any()
877        .downcast_ref::<StringArray>()
878        .ok_or_else(|| ConnectorError::ReadError("_change_type is not Utf8".into()))?;
879
880    // Build filter (drop preimage rows) and mapped _op values in one pass.
881    let (keep, ops): (Vec<bool>, Vec<Option<&str>>) = (0..batch.num_rows())
882        .map(|i| match change_type.value(i) {
883            "update_postimage" => (true, Some("U")),
884            "delete" => (true, Some("D")),
885            "update_preimage" => (false, Some("")),
886            _ => (true, Some("I")), // insert + unknown → I
887        })
888        .unzip();
889
890    let filter = arrow_array::BooleanArray::from(keep);
891    let filtered = arrow_select::filter::filter_record_batch(batch, &filter)
892        .map_err(|e| ConnectorError::ReadError(format!("CDF filter failed: {e}")))?;
893    if filtered.num_rows() == 0 {
894        return Ok(None);
895    }
896
897    // Build _op column from filtered ops.
898    let op_arr: StringArray = ops.into_iter().collect();
899    let op_filtered = arrow_select::filter::filter(&op_arr, &filter)
900        .map_err(|e| ConnectorError::ReadError(format!("CDF op filter: {e}")))?;
901
902    // Rebuild batch: keep user columns, drop CDF metadata, append _op.
903    let cdf_meta = ["_change_type", "_commit_version", "_commit_timestamp"];
904    let mut fields = Vec::new();
905    let mut columns: Vec<Arc<dyn arrow_array::Array>> = Vec::new();
906    for (i, field) in filtered.schema().fields().iter().enumerate() {
907        if !cdf_meta.contains(&field.name().as_str()) {
908            fields.push(field.clone());
909            columns.push(filtered.column(i).clone());
910        }
911    }
912    fields.push(Arc::new(arrow_schema::Field::new(
913        "_op",
914        arrow_schema::DataType::Utf8,
915        false,
916    )));
917    columns.push(op_filtered);
918
919    RecordBatch::try_new(Arc::new(arrow_schema::Schema::new(fields)), columns)
920        .map(Some)
921        .map_err(|e| ConnectorError::ReadError(format!("CDF batch rebuild: {e}")))
922}
923
924/// Result of a MERGE (upsert) operation.
925#[cfg(feature = "delta-lake")]
926#[derive(Debug)]
927pub struct MergeResult {
928    /// Number of rows inserted.
929    pub rows_inserted: usize,
930    /// Number of rows updated.
931    pub rows_updated: usize,
932    /// Number of rows deleted.
933    pub rows_deleted: usize,
934}
935
936/// Atomic changelog MERGE: inserts, updates, and deletes in one Delta commit.
937///
938/// The source batch must contain an `_op` column (Utf8) with values:
939/// - `"I"`, `"U"`, `"r"` → upsert (update if matched, insert if not)
940/// - `"D"` → delete matched rows
941///
942/// Columns prefixed with `_` are excluded from SET clauses but remain
943/// in the source `DataFrame` for predicate filtering.
944///
945/// # Errors
946///
947/// Returns `ConnectorError::WriteError` if the merge fails.
948#[cfg(feature = "delta-lake")]
949#[allow(clippy::too_many_lines)]
950#[allow(clippy::too_many_arguments)]
951pub async fn merge_changelog(
952    table: DeltaTable,
953    source_batch: RecordBatch,
954    key_columns: &[String],
955    writer_id: &str,
956    epoch: u64,
957    schema_evolution: bool,
958    writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
959    ctx: &datafusion::prelude::SessionContext,
960) -> Result<(DeltaTable, MergeResult), ConnectorError> {
961    use datafusion::prelude::*;
962    use deltalake::kernel::transaction::CommitProperties;
963    use deltalake::kernel::Transaction;
964
965    const CDC_COLUMNS: &[&str] = &["_op", "_ts_ms"];
966
967    if source_batch.num_rows() == 0 {
968        return Ok((
969            table,
970            MergeResult {
971                rows_inserted: 0,
972                rows_updated: 0,
973                rows_deleted: 0,
974            },
975        ));
976    }
977
978    debug!(
979        key_columns = ?key_columns,
980        source_rows = source_batch.num_rows(),
981        "performing atomic changelog MERGE"
982    );
983
984    let source_df = ctx.read_batch(source_batch).map_err(|e| {
985        ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
986    })?;
987
988    // Join predicate: target.k1 = source.k1 AND ...
989    let predicate = key_columns
990        .iter()
991        .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
992        .reduce(Expr::and)
993        .ok_or_else(|| {
994            ConnectorError::ConfigurationError("merge requires at least one key column".into())
995        })?;
996
997    let source_schema = source_df.schema().clone();
998    let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
999
1000    // Exclude CDC metadata columns from SET clauses (preserve user columns like _id).
1001    let all_user_columns: Vec<String> = source_schema
1002        .fields()
1003        .iter()
1004        .map(|f| f.name().clone())
1005        .filter(|name| !CDC_COLUMNS.contains(&name.as_str()))
1006        .collect();
1007
1008    let non_key_user_columns: Vec<String> = all_user_columns
1009        .iter()
1010        .filter(|c| !key_set.contains(c.as_str()))
1011        .cloned()
1012        .collect();
1013
1014    // Predicates for conditional clause execution.
1015    let upsert_pred = col("source._op").in_list(vec![lit("I"), lit("U"), lit("r")], false);
1016    let delete_pred = col("source._op").eq(lit("D"));
1017
1018    #[allow(clippy::cast_possible_wrap)]
1019    let epoch_i64 = epoch as i64;
1020
1021    let non_key_for_update = non_key_user_columns;
1022    let all_for_insert = all_user_columns;
1023
1024    let mut merge_builder = table
1025        .merge(source_df, predicate)
1026        .with_source_alias("source")
1027        .with_target_alias("target")
1028        .with_commit_properties(
1029            CommitProperties::default()
1030                .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
1031        )
1032        .when_matched_update(|update| {
1033            let mut u = update.predicate(upsert_pred.clone());
1034            for col_name in &non_key_for_update {
1035                u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
1036            }
1037            u
1038        })
1039        .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
1040        .when_matched_delete(|delete| delete.predicate(delete_pred))
1041        .map_err(|e| ConnectorError::WriteError(format!("merge matched-delete failed: {e}")))?
1042        .when_not_matched_insert(|insert| {
1043            let mut ins = insert.predicate(upsert_pred);
1044            for col_name in &all_for_insert {
1045                ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
1046            }
1047            ins
1048        })
1049        .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
1050
1051    if schema_evolution {
1052        merge_builder = merge_builder.with_merge_schema(true);
1053    }
1054
1055    if let Some(props) = writer_properties {
1056        merge_builder = merge_builder.with_writer_properties(props);
1057    }
1058
1059    let (table, metrics) = merge_builder.await.map_err(|e| {
1060        ConnectorError::WriteError(format!("Delta Lake changelog MERGE failed: {e}"))
1061    })?;
1062
1063    let result = MergeResult {
1064        rows_inserted: metrics.num_target_rows_inserted,
1065        rows_updated: metrics.num_target_rows_updated,
1066        rows_deleted: metrics.num_target_rows_deleted,
1067    };
1068
1069    info!(
1070        writer_id,
1071        epoch,
1072        rows_inserted = result.rows_inserted,
1073        rows_updated = result.rows_updated,
1074        rows_deleted = result.rows_deleted,
1075        "Delta Lake changelog MERGE complete"
1076    );
1077
1078    Ok((table, result))
1079}
1080
1081/// Result of a compaction (OPTIMIZE) operation.
1082#[cfg(feature = "delta-lake")]
1083#[derive(Debug)]
1084pub struct CompactionResult {
1085    /// Number of new optimized files written.
1086    pub files_added: u64,
1087    /// Number of small files removed.
1088    pub files_removed: u64,
1089    /// Number of partitions that were optimized.
1090    pub partitions_optimized: u64,
1091}
1092
1093/// Runs an OPTIMIZE compaction on a Delta Lake table.
1094///
1095/// Compacts small Parquet files into larger ones (target size), optionally
1096/// applying Z-ORDER clustering.
1097///
1098/// # Errors
1099///
1100/// Returns `ConnectorError::Internal` if the operation fails.
1101#[cfg(feature = "delta-lake")]
1102pub async fn run_compaction(
1103    table: DeltaTable,
1104    target_file_size: u64,
1105    z_order_columns: &[String],
1106    writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
1107) -> Result<(DeltaTable, CompactionResult), ConnectorError> {
1108    use deltalake::operations::optimize::OptimizeType;
1109
1110    info!(target_file_size, "running Delta Lake compaction (OPTIMIZE)");
1111
1112    let optimize_type = if z_order_columns.is_empty() {
1113        OptimizeType::Compact
1114    } else {
1115        OptimizeType::ZOrder(z_order_columns.to_vec())
1116    };
1117
1118    let mut optimize_builder = table
1119        .optimize()
1120        .with_type(optimize_type)
1121        .with_target_size(target_file_size);
1122
1123    if let Some(props) = writer_properties {
1124        optimize_builder = optimize_builder.with_writer_properties(props);
1125    }
1126
1127    let (table, metrics) = optimize_builder
1128        .await
1129        .map_err(|e| ConnectorError::Internal(format!("compaction failed: {e}")))?;
1130
1131    let result = CompactionResult {
1132        files_added: metrics.num_files_added,
1133        files_removed: metrics.num_files_removed,
1134        partitions_optimized: metrics.partitions_optimized,
1135    };
1136
1137    info!(
1138        files_added = result.files_added,
1139        files_removed = result.files_removed,
1140        partitions_optimized = result.partitions_optimized,
1141        "Delta Lake compaction complete"
1142    );
1143
1144    Ok((table, result))
1145}
1146
1147/// Runs VACUUM on a Delta Lake table, deleting old unreferenced files.
1148///
1149/// # Errors
1150///
1151/// Returns `ConnectorError::Internal` if the operation fails.
1152#[cfg(feature = "delta-lake")]
1153pub async fn run_vacuum(
1154    table: DeltaTable,
1155    retention: std::time::Duration,
1156) -> Result<(DeltaTable, usize), ConnectorError> {
1157    let retention_hours = retention.as_secs() / 3600;
1158    info!(retention_hours, "running Delta Lake VACUUM");
1159
1160    let chrono_duration =
1161        chrono::Duration::from_std(retention).unwrap_or_else(|_| chrono::Duration::hours(168)); // fallback: 7 days
1162
1163    let (table, metrics) = table
1164        .vacuum()
1165        .with_retention_period(chrono_duration)
1166        .await
1167        .map_err(|e| ConnectorError::Internal(format!("vacuum failed: {e}")))?;
1168
1169    let files_deleted = metrics.files_deleted.len();
1170
1171    info!(files_deleted, "Delta Lake VACUUM complete");
1172
1173    Ok((table, files_deleted))
1174}
1175
1176/// Resolves catalog-aware table URI and merges catalog-specific storage options.
1177///
1178/// - `None`: returns table path and storage options as-is.
1179/// - `Glue`: calls AWS Glue API to resolve the table's S3 location.
1180/// - `Unity`: injects workspace URL and access token into storage options.
1181///
1182/// # Errors
1183///
1184/// Returns `ConnectorError` if catalog resolution fails.
1185#[cfg(feature = "delta-lake")]
1186#[allow(clippy::implicit_hasher, clippy::unused_async)]
1187pub async fn resolve_catalog_options(
1188    catalog: &super::delta_config::DeltaCatalogType,
1189    #[allow(unused_variables)] catalog_database: Option<&str>,
1190    #[allow(unused_variables)] catalog_name: Option<&str>,
1191    _catalog_schema: Option<&str>,
1192    table_path: &str,
1193    base_storage_options: &HashMap<String, String>,
1194) -> Result<(String, HashMap<String, String>), ConnectorError> {
1195    use super::delta_config::DeltaCatalogType;
1196
1197    match catalog {
1198        DeltaCatalogType::None => Ok((table_path.to_string(), base_storage_options.clone())),
1199        #[cfg(feature = "delta-lake-glue")]
1200        DeltaCatalogType::Glue => {
1201            use deltalake::DataCatalog;
1202            let database = catalog_database.ok_or_else(|| {
1203                ConnectorError::ConfigurationError(
1204                    "Glue catalog requires 'catalog.database'".into(),
1205                )
1206            })?;
1207            let glue = deltalake_catalog_glue::GlueDataCatalog::from_env()
1208                .await
1209                .map_err(|e| {
1210                    ConnectorError::ConnectionFailed(format!("failed to init Glue catalog: {e}"))
1211                })?;
1212            let resolved = glue
1213                .get_table_storage_location(catalog_name.map(String::from), database, table_path)
1214                .await
1215                .map_err(|e| {
1216                    ConnectorError::ConnectionFailed(format!(
1217                        "Glue catalog lookup failed for '{database}.{table_path}': {e}"
1218                    ))
1219                })?;
1220            info!(
1221                glue_database = database,
1222                table = table_path,
1223                resolved_path = %resolved,
1224                "resolved table path via Glue catalog"
1225            );
1226            Ok((resolved, base_storage_options.clone()))
1227        }
1228        #[cfg(not(feature = "delta-lake-glue"))]
1229        DeltaCatalogType::Glue => Err(ConnectorError::ConfigurationError(
1230            "Glue catalog requires the 'delta-lake-glue' feature. \
1231             Build with: cargo build --features delta-lake-glue"
1232                .into(),
1233        )),
1234        #[cfg(feature = "delta-lake-unity")]
1235        DeltaCatalogType::Unity {
1236            workspace_url,
1237            access_token,
1238        } => {
1239            // Resolve the table's actual storage location from Unity Catalog
1240            // via REST API, then return that direct path (s3://, az://, gs://)
1241            // instead of the uc:// URI. This bypasses delta-rs's built-in
1242            // uc:// handling which requires credential vending — a feature
1243            // that is denied outside Databricks compute environments.
1244            let full_name = table_path.strip_prefix("uc://").unwrap_or(table_path);
1245
1246            let storage_location = super::unity_catalog::get_table_storage_location(
1247                workspace_url,
1248                access_token,
1249                full_name,
1250            )
1251            .await?;
1252
1253            Ok((storage_location, base_storage_options.clone()))
1254        }
1255        #[cfg(not(feature = "delta-lake-unity"))]
1256        DeltaCatalogType::Unity { .. } => Err(ConnectorError::ConfigurationError(
1257            "Unity catalog requires the 'delta-lake-unity' feature. \
1258             Build with: cargo build --features delta-lake-unity"
1259                .into(),
1260        )),
1261    }
1262}
1263
1264// ============================================================================
1265// Integration tests (require delta-lake feature)
1266// ============================================================================
1267
1268#[cfg(all(test, feature = "delta-lake"))]
1269mod tests {
1270    use super::*;
1271    use arrow_array::{Float64Array, Int64Array, StringArray};
1272    use arrow_schema::{DataType, Field, Schema};
1273    use std::sync::Arc;
1274    use tempfile::TempDir;
1275
1276    fn test_schema() -> SchemaRef {
1277        Arc::new(Schema::new(vec![
1278            Field::new("id", DataType::Int64, false),
1279            Field::new("name", DataType::Utf8, true),
1280            Field::new("value", DataType::Float64, true),
1281        ]))
1282    }
1283
1284    fn test_batch(n: usize) -> RecordBatch {
1285        let ids: Vec<i64> = (0..n as i64).collect();
1286        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1287        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1288
1289        RecordBatch::try_new(
1290            test_schema(),
1291            vec![
1292                Arc::new(Int64Array::from(ids)),
1293                Arc::new(StringArray::from(names)),
1294                Arc::new(Float64Array::from(values)),
1295            ],
1296        )
1297        .unwrap()
1298    }
1299
1300    #[tokio::test]
1301    async fn test_open_creates_table() {
1302        let temp_dir = TempDir::new().unwrap();
1303        let table_path = temp_dir.path().to_str().unwrap();
1304
1305        // Open with schema should create the table.
1306        let schema = test_schema();
1307        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1308            .await
1309            .unwrap();
1310
1311        assert_eq!(table.version(), Some(0));
1312
1313        // Verify _delta_log directory was created.
1314        let delta_log = temp_dir.path().join("_delta_log");
1315        assert!(delta_log.exists(), "_delta_log directory should exist");
1316    }
1317
1318    #[tokio::test]
1319    async fn test_open_existing_table() {
1320        let temp_dir = TempDir::new().unwrap();
1321        let table_path = temp_dir.path().to_str().unwrap();
1322
1323        // Create the table.
1324        let schema = test_schema();
1325        let _ = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1326            .await
1327            .unwrap();
1328
1329        // Reopen without schema - should work.
1330        let table = open_or_create_table(table_path, HashMap::new(), None)
1331            .await
1332            .unwrap();
1333
1334        assert_eq!(table.version(), Some(0));
1335    }
1336
1337    #[tokio::test]
1338    async fn test_open_nonexistent_without_schema_defers() {
1339        let temp_dir = TempDir::new().unwrap();
1340        let nonexistent_table = temp_dir.path().join("nonexistent");
1341        std::fs::create_dir_all(&nonexistent_table).unwrap();
1342        let table_path = nonexistent_table.to_str().unwrap();
1343
1344        // Open without schema returns an uninitialized table (deferred creation).
1345        let result = open_or_create_table(table_path, HashMap::new(), None).await;
1346        assert!(result.is_ok());
1347        let table = result.unwrap();
1348        assert!(table.version().is_none(), "table should be uninitialized");
1349    }
1350
1351    #[tokio::test]
1352    async fn test_write_batch_creates_parquet() {
1353        let temp_dir = TempDir::new().unwrap();
1354        let table_path = temp_dir.path().to_str().unwrap();
1355
1356        // Create table.
1357        let schema = test_schema();
1358        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1359            .await
1360            .unwrap();
1361
1362        // Write a batch.
1363        let batch = test_batch(100);
1364        let (table, version) = write_batches(
1365            table,
1366            vec![batch],
1367            "test-writer",
1368            1,
1369            SaveMode::Append,
1370            None,
1371            false,
1372            None,
1373            false,
1374            None,
1375        )
1376        .await
1377        .unwrap();
1378
1379        assert_eq!(version, 1);
1380        assert_eq!(table.version(), Some(1));
1381
1382        // Verify Parquet files were created (in the table directory).
1383        let parquet_files: Vec<_> = std::fs::read_dir(temp_dir.path())
1384            .unwrap()
1385            .filter_map(Result::ok)
1386            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1387            .collect();
1388
1389        assert!(
1390            !parquet_files.is_empty(),
1391            "should have created Parquet files"
1392        );
1393    }
1394
1395    #[tokio::test]
1396    async fn test_exactly_once_epoch_skip() {
1397        let temp_dir = TempDir::new().unwrap();
1398        let table_path = temp_dir.path().to_str().unwrap();
1399        let writer_id = "exactly-once-writer";
1400
1401        // Create table and write epoch 1.
1402        let schema = test_schema();
1403        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1404            .await
1405            .unwrap();
1406
1407        let batch = test_batch(10);
1408        let (table, _) = write_batches(
1409            table,
1410            vec![batch.clone()],
1411            writer_id,
1412            1,
1413            SaveMode::Append,
1414            None,
1415            false,
1416            None,
1417            false,
1418            None,
1419        )
1420        .await
1421        .unwrap();
1422
1423        // Check last committed epoch.
1424        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1425        assert_eq!(last_epoch, 1);
1426
1427        // Simulate recovery: reopen table.
1428        let reopened_table = open_or_create_table(table_path, HashMap::new(), None)
1429            .await
1430            .unwrap();
1431
1432        // Verify we can read the last committed epoch.
1433        let recovered_epoch = get_last_committed_epoch(&reopened_table, writer_id).await;
1434        assert_eq!(recovered_epoch, 1);
1435    }
1436
1437    #[tokio::test]
1438    async fn test_multiple_epochs_sequential() {
1439        let temp_dir = TempDir::new().unwrap();
1440        let table_path = temp_dir.path().to_str().unwrap();
1441        let writer_id = "sequential-writer";
1442
1443        let schema = test_schema();
1444        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1445            .await
1446            .unwrap();
1447
1448        // Write epochs 1, 2, 3.
1449        for epoch in 1..=3 {
1450            let batch = test_batch(10);
1451            let result = write_batches(
1452                table,
1453                vec![batch],
1454                writer_id,
1455                epoch,
1456                SaveMode::Append,
1457                None,
1458                false,
1459                None,
1460                false,
1461                None,
1462            )
1463            .await
1464            .unwrap();
1465            table = result.0;
1466            assert_eq!(result.1, epoch as i64);
1467        }
1468
1469        // Final version should be 3.
1470        assert_eq!(table.version(), Some(3));
1471
1472        // Last committed epoch should be 3.
1473        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1474        assert_eq!(last_epoch, 3);
1475    }
1476
1477    #[tokio::test]
1478    async fn test_get_table_schema() {
1479        let temp_dir = TempDir::new().unwrap();
1480        let table_path = temp_dir.path().to_str().unwrap();
1481
1482        let expected_schema = test_schema();
1483        let table = open_or_create_table(table_path, HashMap::new(), Some(&expected_schema))
1484            .await
1485            .unwrap();
1486
1487        let actual_schema = get_table_schema(&table).unwrap();
1488
1489        // Verify field count and names match.
1490        assert_eq!(actual_schema.fields().len(), expected_schema.fields().len());
1491        for (expected, actual) in expected_schema.fields().iter().zip(actual_schema.fields()) {
1492            assert_eq!(expected.name(), actual.name());
1493        }
1494    }
1495
1496    #[tokio::test]
1497    async fn test_write_empty_batches() {
1498        let temp_dir = TempDir::new().unwrap();
1499        let table_path = temp_dir.path().to_str().unwrap();
1500
1501        let schema = test_schema();
1502        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1503            .await
1504            .unwrap();
1505
1506        // Write empty batch list - should be no-op.
1507        let (table, version) = write_batches(
1508            table,
1509            vec![],
1510            "test-writer",
1511            1,
1512            SaveMode::Append,
1513            None,
1514            false,
1515            None,
1516            false,
1517            None,
1518        )
1519        .await
1520        .unwrap();
1521
1522        // Version should still be 0 (no write happened).
1523        assert_eq!(version, 0);
1524        assert_eq!(table.version(), Some(0));
1525    }
1526
1527    #[tokio::test]
1528    async fn test_write_multiple_batches() {
1529        // Test writing multiple batches in a single transaction.
1530        let temp_dir = TempDir::new().unwrap();
1531        let table_path = temp_dir.path().to_str().unwrap();
1532
1533        let schema = test_schema();
1534        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1535            .await
1536            .unwrap();
1537
1538        // Write multiple batches.
1539        let batch1 = test_batch(50);
1540        let batch2 = test_batch(50);
1541        let (table, version) = write_batches(
1542            table,
1543            vec![batch1, batch2],
1544            "multi-batch-writer",
1545            1,
1546            SaveMode::Append,
1547            None,
1548            false,
1549            None,
1550            false,
1551            None,
1552        )
1553        .await
1554        .unwrap();
1555
1556        assert_eq!(version, 1);
1557        assert_eq!(table.version(), Some(1));
1558
1559        // Reopen and verify we can read the state.
1560        let reopened = open_or_create_table(table_path, HashMap::new(), None)
1561            .await
1562            .unwrap();
1563        assert_eq!(reopened.version(), Some(1));
1564    }
1565
1566    #[test]
1567    fn test_path_to_url_local() {
1568        let temp_dir = TempDir::new().unwrap();
1569        let path = temp_dir.path().to_str().unwrap();
1570
1571        let url = path_to_url(path).unwrap();
1572        assert!(url.scheme() == "file");
1573    }
1574
1575    #[test]
1576    fn test_path_to_url_s3() {
1577        let url = path_to_url("s3://my-bucket/path/to/table").unwrap();
1578        assert_eq!(url.scheme(), "s3");
1579        assert_eq!(url.host_str(), Some("my-bucket"));
1580    }
1581
1582    #[test]
1583    fn test_path_to_url_azure() {
1584        let url = path_to_url("az://my-container/path/to/table").unwrap();
1585        assert_eq!(url.scheme(), "az");
1586    }
1587
1588    #[test]
1589    fn test_path_to_url_gcs() {
1590        let url = path_to_url("gs://my-bucket/path/to/table").unwrap();
1591        assert_eq!(url.scheme(), "gs");
1592    }
1593
1594    // ── End-to-end tests for new functionality ──
1595
1596    #[tokio::test]
1597    async fn test_get_latest_version() {
1598        let temp_dir = TempDir::new().unwrap();
1599        let table_path = temp_dir.path().to_str().unwrap();
1600
1601        let schema = test_schema();
1602        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1603            .await
1604            .unwrap();
1605
1606        // Initial version is 0.
1607        let v = get_latest_version(&mut table).await.unwrap();
1608        assert_eq!(v, 0);
1609
1610        // Write a batch -> version 1.
1611        let batch = test_batch(10);
1612        let (returned_table, version) = write_batches(
1613            table,
1614            vec![batch],
1615            "writer",
1616            1,
1617            SaveMode::Append,
1618            None,
1619            false,
1620            None,
1621            false,
1622            None,
1623        )
1624        .await
1625        .unwrap();
1626        assert_eq!(version, 1);
1627        table = returned_table;
1628
1629        let v = get_latest_version(&mut table).await.unwrap();
1630        assert_eq!(v, 1);
1631    }
1632
1633    #[tokio::test]
1634    async fn test_read_batches_at_version() {
1635        let temp_dir = TempDir::new().unwrap();
1636        let table_path = temp_dir.path().to_str().unwrap();
1637
1638        let schema = test_schema();
1639        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1640            .await
1641            .unwrap();
1642
1643        // Write 50 rows at version 1.
1644        let batch = test_batch(50);
1645        let (table, _) = write_batches(
1646            table,
1647            vec![batch],
1648            "writer",
1649            1,
1650            SaveMode::Append,
1651            None,
1652            false,
1653            None,
1654            false,
1655            None,
1656        )
1657        .await
1658        .unwrap();
1659
1660        // Write 30 more rows at version 2.
1661        let batch = test_batch(30);
1662        let (_table, _) = write_batches(
1663            table,
1664            vec![batch],
1665            "writer",
1666            2,
1667            SaveMode::Append,
1668            None,
1669            false,
1670            None,
1671            false,
1672            None,
1673        )
1674        .await
1675        .unwrap();
1676
1677        // Read version 1 — should get 50 rows.
1678        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1679            .await
1680            .unwrap();
1681        let (batches, _) = read_batches_at_version(&mut read_table, 1, 10000)
1682            .await
1683            .unwrap();
1684        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1685        assert_eq!(total_rows, 50);
1686
1687        // Read version 2 — should get 80 rows (cumulative).
1688        let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1689            .await
1690            .unwrap();
1691        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1692        assert_eq!(total_rows, 80);
1693    }
1694
1695    #[tokio::test]
1696    async fn test_sink_source_roundtrip() {
1697        use super::super::delta::DeltaLakeSink;
1698        use super::super::delta_config::DeltaLakeSinkConfig;
1699        use super::super::delta_source::DeltaSource;
1700        use super::super::delta_source_config::DeltaSourceConfig;
1701        use crate::config::ConnectorConfig;
1702        use crate::connector::{SinkConnector, SourceConnector};
1703
1704        let temp_dir = TempDir::new().unwrap();
1705        let table_path = temp_dir.path().to_str().unwrap();
1706
1707        // Write data via sink.
1708        let sink_config = DeltaLakeSinkConfig::new(table_path);
1709        let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1710        let connector_config = ConnectorConfig::new("delta-lake");
1711        sink.open(&connector_config).await.unwrap();
1712
1713        sink.begin_epoch(1).await.unwrap();
1714        let batch = test_batch(25);
1715        sink.write_batch(&batch).await.unwrap();
1716        sink.pre_commit(1).await.unwrap();
1717        sink.commit_epoch(1).await.unwrap();
1718        sink.close().await.unwrap();
1719
1720        // Read data via source.
1721        let mut source_config = DeltaSourceConfig::new(table_path);
1722        source_config.starting_version = Some(0);
1723        let mut source = DeltaSource::new(source_config, None);
1724        let source_connector_config = ConnectorConfig::new("delta-lake");
1725        source.open(&source_connector_config).await.unwrap();
1726
1727        // Poll — should get version 1 data (25 rows).
1728        let result = source.poll_batch(10000).await.unwrap();
1729        assert!(result.is_some(), "should have received a batch");
1730        let total_rows: usize = {
1731            let mut rows = result.unwrap().records.num_rows();
1732            // Drain any remaining buffered batches.
1733            while let Ok(Some(batch)) = source.poll_batch(10000).await {
1734                rows += batch.records.num_rows();
1735            }
1736            rows
1737        };
1738        assert_eq!(total_rows, 25);
1739
1740        source.close().await.unwrap();
1741    }
1742
1743    #[tokio::test]
1744    async fn test_source_checkpoint_restore() {
1745        use super::super::delta_source::DeltaSource;
1746        use super::super::delta_source_config::DeltaSourceConfig;
1747        use crate::config::ConnectorConfig;
1748        use crate::connector::SourceConnector;
1749
1750        let temp_dir = TempDir::new().unwrap();
1751        let table_path = temp_dir.path().to_str().unwrap();
1752
1753        // Create table and write 2 versions.
1754        let schema = test_schema();
1755        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1756            .await
1757            .unwrap();
1758
1759        let (table, _) = write_batches(
1760            table,
1761            vec![test_batch(10)],
1762            "writer",
1763            1,
1764            SaveMode::Append,
1765            None,
1766            false,
1767            None,
1768            false,
1769            None,
1770        )
1771        .await
1772        .unwrap();
1773        let (_table, _) = write_batches(
1774            table,
1775            vec![test_batch(20)],
1776            "writer",
1777            2,
1778            SaveMode::Append,
1779            None,
1780            false,
1781            None,
1782            false,
1783            None,
1784        )
1785        .await
1786        .unwrap();
1787
1788        // Open source starting from version 0. The source jumps to the
1789        // latest version (2) in a single poll, reading the full snapshot.
1790        let mut source_config = DeltaSourceConfig::new(table_path);
1791        source_config.starting_version = Some(0);
1792        let mut source = DeltaSource::new(source_config.clone(), None);
1793        let connector_config = ConnectorConfig::new("delta-lake");
1794        source.open(&connector_config).await.unwrap();
1795
1796        // Poll to consume latest version (2).
1797        let _ = source.poll_batch(10000).await.unwrap();
1798        // Drain buffered.
1799        while let Ok(Some(_)) = source.poll_batch(10000).await {}
1800
1801        // Checkpoint reflects the fully-consumed latest version.
1802        let cp = source.checkpoint();
1803        assert_eq!(cp.get_offset("delta_version"), Some("2"));
1804        source.close().await.unwrap();
1805
1806        // Restore from checkpoint — should resume at version 2.
1807        let mut source2 = DeltaSource::new(source_config, None);
1808        source2.open(&connector_config).await.unwrap();
1809        source2.restore(&cp).await.unwrap();
1810
1811        assert_eq!(source2.current_version(), 2);
1812
1813        // No new data — already at latest.
1814        let result = source2.poll_batch(10000).await.unwrap();
1815        assert!(result.is_none());
1816
1817        source2.close().await.unwrap();
1818    }
1819
1820    #[tokio::test]
1821    async fn test_auto_flush_writes_data() {
1822        use super::super::delta::DeltaLakeSink;
1823        use super::super::delta_config::DeltaLakeSinkConfig;
1824        use crate::config::ConnectorConfig;
1825        use crate::connector::SinkConnector;
1826
1827        let temp_dir = TempDir::new().unwrap();
1828        let table_path = temp_dir.path().to_str().unwrap();
1829
1830        // Configure a small buffer to trigger auto-flush.
1831        let mut sink_config = DeltaLakeSinkConfig::new(table_path);
1832        sink_config.max_buffer_records = 10;
1833        let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1834
1835        let connector_config = ConnectorConfig::new("delta-lake");
1836        sink.open(&connector_config).await.unwrap();
1837
1838        sink.begin_epoch(1).await.unwrap();
1839
1840        // Write 25 rows — should trigger auto-flush after 10.
1841        let batch = test_batch(25);
1842        sink.write_batch(&batch).await.unwrap();
1843
1844        // Commit the rest.
1845        sink.pre_commit(1).await.unwrap();
1846        sink.commit_epoch(1).await.unwrap();
1847        sink.close().await.unwrap();
1848
1849        // Verify all 25 rows are in the Delta table.
1850        let mut table = open_or_create_table(table_path, HashMap::new(), None)
1851            .await
1852            .unwrap();
1853        let latest = get_latest_version(&mut table).await.unwrap();
1854        assert!(latest >= 1, "should have at least 1 version");
1855
1856        let (batches, _) = read_batches_at_version(&mut table, latest, 10000)
1857            .await
1858            .unwrap();
1859        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1860        assert_eq!(
1861            total_rows, 25,
1862            "all 25 rows should be written, not dropped by auto-flush"
1863        );
1864    }
1865
1866    #[tokio::test]
1867    async fn test_sink_exactly_once_epoch() {
1868        let temp_dir = TempDir::new().unwrap();
1869        let table_path = temp_dir.path().to_str().unwrap();
1870        let writer_id = "exactly-once-test";
1871
1872        let schema = test_schema();
1873        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1874            .await
1875            .unwrap();
1876
1877        // Write epoch 1 with 10 rows.
1878        let (table, v1) = write_batches(
1879            table,
1880            vec![test_batch(10)],
1881            writer_id,
1882            1,
1883            SaveMode::Append,
1884            None,
1885            false,
1886            None,
1887            false,
1888            None,
1889        )
1890        .await
1891        .unwrap();
1892        assert_eq!(v1, 1);
1893
1894        // Write epoch 2 with 15 rows using the same writer.
1895        let (table, v2) = write_batches(
1896            table,
1897            vec![test_batch(15)],
1898            writer_id,
1899            2,
1900            SaveMode::Append,
1901            None,
1902            false,
1903            None,
1904            false,
1905            None,
1906        )
1907        .await
1908        .unwrap();
1909        assert_eq!(v2, 2);
1910
1911        // Verify the last committed epoch is 2.
1912        let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1913        assert_eq!(last_epoch, 2);
1914
1915        // Verify total rows = 25 (10 + 15).
1916        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1917            .await
1918            .unwrap();
1919        let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1920            .await
1921            .unwrap();
1922        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1923        assert_eq!(total_rows, 25);
1924    }
1925
1926    #[tokio::test]
1927    async fn test_schema_evolution_adds_column() {
1928        let temp_dir = TempDir::new().unwrap();
1929        let table_path = temp_dir.path().to_str().unwrap();
1930
1931        // Create table with 2-column schema.
1932        let schema_v1 = Arc::new(Schema::new(vec![
1933            Field::new("id", DataType::Int64, false),
1934            Field::new("name", DataType::Utf8, true),
1935        ]));
1936        let table = open_or_create_table(table_path, HashMap::new(), Some(&schema_v1))
1937            .await
1938            .unwrap();
1939
1940        // Write batch with 2 columns.
1941        let batch_v1 = RecordBatch::try_new(
1942            schema_v1.clone(),
1943            vec![
1944                Arc::new(Int64Array::from(vec![1, 2])),
1945                Arc::new(StringArray::from(vec!["a", "b"])),
1946            ],
1947        )
1948        .unwrap();
1949        let (table, _) = write_batches(
1950            table,
1951            vec![batch_v1],
1952            "evo-writer",
1953            1,
1954            SaveMode::Append,
1955            None,
1956            true, // schema_evolution enabled
1957            None,
1958            false,
1959            None,
1960        )
1961        .await
1962        .unwrap();
1963
1964        // Write batch with 3 columns (extra "score" column).
1965        let schema_v2 = Arc::new(Schema::new(vec![
1966            Field::new("id", DataType::Int64, false),
1967            Field::new("name", DataType::Utf8, true),
1968            Field::new("score", DataType::Float64, true),
1969        ]));
1970        let batch_v2 = RecordBatch::try_new(
1971            schema_v2,
1972            vec![
1973                Arc::new(Int64Array::from(vec![3])),
1974                Arc::new(StringArray::from(vec!["c"])),
1975                Arc::new(Float64Array::from(vec![99.5])),
1976            ],
1977        )
1978        .unwrap();
1979        let (table, _) = write_batches(
1980            table,
1981            vec![batch_v2],
1982            "evo-writer",
1983            2,
1984            SaveMode::Append,
1985            None,
1986            true,
1987            None,
1988            false,
1989            None,
1990        )
1991        .await
1992        .unwrap();
1993
1994        // Verify table schema now has all 3 columns.
1995        let final_schema = get_table_schema(&table).unwrap();
1996        assert_eq!(final_schema.fields().len(), 3);
1997        assert_eq!(final_schema.field(0).name(), "id");
1998        assert_eq!(final_schema.field(1).name(), "name");
1999        assert_eq!(final_schema.field(2).name(), "score");
2000
2001        // Verify all rows are readable.
2002        let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
2003            .await
2004            .unwrap();
2005        let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
2006            .await
2007            .unwrap();
2008        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2009        assert_eq!(total_rows, 3);
2010    }
2011
2012    #[tokio::test]
2013    async fn test_compaction_reduces_files() {
2014        let temp_dir = TempDir::new().unwrap();
2015        let table_path = temp_dir.path().to_str().unwrap();
2016
2017        let schema = test_schema();
2018        let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
2019            .await
2020            .unwrap();
2021
2022        // Write 10 small batches (1 file each = 10 versions).
2023        for epoch in 1..=10u64 {
2024            let batch = test_batch(5);
2025            let (t, _) = write_batches(
2026                table,
2027                vec![batch],
2028                "compaction-writer",
2029                epoch,
2030                SaveMode::Append,
2031                None,
2032                false,
2033                None,
2034                false,
2035                None,
2036            )
2037            .await
2038            .unwrap();
2039            table = t;
2040        }
2041
2042        // Count Parquet files before compaction.
2043        let parquet_before: Vec<_> = std::fs::read_dir(temp_dir.path())
2044            .unwrap()
2045            .filter_map(Result::ok)
2046            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2047            .collect();
2048        assert!(
2049            parquet_before.len() >= 10,
2050            "should have at least 10 Parquet files before compaction, got {}",
2051            parquet_before.len()
2052        );
2053
2054        // Run compaction.
2055        let (table, result) = run_compaction(table, 128 * 1024 * 1024, &[], None)
2056            .await
2057            .unwrap();
2058        assert!(
2059            result.files_removed > 0,
2060            "compaction should have removed files"
2061        );
2062
2063        // Compaction itself proves file reduction via metrics.
2064        // Vacuum is not tested here — delta-rs enforces a 168h minimum
2065        // retention, and test files are too fresh to be vacuumed.
2066        drop(table);
2067    }
2068}