Skip to main content

laminar_connectors/lakehouse/
iceberg_incremental.rs

1//! Incremental snapshot diff for Iceberg tables.
2//!
3//! Reads only data files added since a previous snapshot by walking the
4//! manifest tree and filtering on `ManifestFile.added_snapshot_id`.
5#![cfg(feature = "iceberg")]
6
7use arrow_array::RecordBatch;
8use iceberg::spec::ManifestStatus;
9use iceberg::table::Table;
10use tracing::{debug, warn};
11
12use crate::error::ConnectorError;
13
14/// Reads only data files added between `old_snapshot_id` and `new_snapshot_id`.
15///
16/// Walks the new snapshot's manifest list, selects manifests where
17/// `added_snapshot_id > old_snapshot_id`, loads those manifests, and reads
18/// the `ADDED` data files via the table's scan API.
19///
20/// If any manifest entry has a `Deleted` status (compaction, overwrite, or
21/// explicit delete), the incremental path is unsafe — falls back to a full
22/// scan of the new snapshot so the caller gets the complete table state.
23///
24/// # Errors
25///
26/// Returns `ConnectorError::ReadError` on I/O or scan failure.
27pub async fn scan_incremental(
28    table: &Table,
29    old_snapshot_id: i64,
30    new_snapshot_id: i64,
31    select_columns: &[String],
32) -> Result<Vec<RecordBatch>, ConnectorError> {
33    let metadata = table.metadata();
34    let file_io = table.file_io();
35
36    let snapshot = metadata.snapshot_by_id(new_snapshot_id).ok_or_else(|| {
37        ConnectorError::ReadError(format!("snapshot {new_snapshot_id} not found"))
38    })?;
39
40    let manifest_list = snapshot
41        .load_manifest_list(file_io, metadata)
42        .await
43        .map_err(|e| ConnectorError::ReadError(format!("load manifest list: {e}")))?;
44
45    let mut new_file_paths = Vec::new();
46
47    for manifest_file in manifest_list.entries() {
48        if manifest_file.added_snapshot_id <= old_snapshot_id {
49            continue;
50        }
51
52        let manifest = manifest_file
53            .load_manifest(file_io)
54            .await
55            .map_err(|e| ConnectorError::ReadError(format!("load manifest: {e}")))?;
56
57        for entry in manifest.entries() {
58            match entry.status {
59                ManifestStatus::Added => {
60                    new_file_paths.push(entry.file_path().to_string());
61                }
62                ManifestStatus::Deleted => {
63                    // Non-append snapshot (compaction, overwrite, or delete).
64                    // Incremental diff cannot represent deletions — fall back
65                    // to a full scan so the caller gets the complete state.
66                    warn!(
67                        old_snapshot_id,
68                        new_snapshot_id,
69                        deleted_file = entry.file_path(),
70                        "incremental scan: detected deleted entry, falling back to full scan"
71                    );
72                    return super::iceberg_io::scan_table(
73                        table,
74                        Some(new_snapshot_id),
75                        select_columns,
76                    )
77                    .await;
78                }
79                ManifestStatus::Existing => {}
80            }
81        }
82    }
83
84    if new_file_paths.is_empty() {
85        debug!(
86            old_snapshot_id,
87            new_snapshot_id, "incremental scan: no new data files"
88        );
89        return Ok(Vec::new());
90    }
91
92    debug!(
93        old_snapshot_id,
94        new_snapshot_id,
95        files = new_file_paths.len(),
96        "incremental scan: reading new data files"
97    );
98
99    read_files_via_scan(table, new_snapshot_id, select_columns, &new_file_paths).await
100}
101
102/// Scans the snapshot but only reads files whose paths are in `file_paths`.
103async fn read_files_via_scan(
104    table: &Table,
105    snapshot_id: i64,
106    select_columns: &[String],
107    file_paths: &[String],
108) -> Result<Vec<RecordBatch>, ConnectorError> {
109    use tokio_stream::StreamExt;
110
111    let mut scan_builder = table.scan().snapshot_id(snapshot_id);
112
113    if select_columns.is_empty() {
114        scan_builder = scan_builder.select_all();
115    } else {
116        scan_builder = scan_builder.select(select_columns.iter().map(String::as_str));
117    }
118
119    let scan = scan_builder
120        .build()
121        .map_err(|e| ConnectorError::ReadError(format!("build scan: {e}")))?;
122
123    // plan_files returns FileScanTasks — filter to only our target paths.
124    let file_tasks = scan
125        .plan_files()
126        .await
127        .map_err(|e| ConnectorError::ReadError(format!("plan files: {e}")))?;
128
129    let mut file_tasks = std::pin::pin!(file_tasks);
130    let mut target_tasks = Vec::new();
131    while let Some(task_result) = file_tasks.next().await {
132        let task = task_result.map_err(|e| ConnectorError::ReadError(format!("file task: {e}")))?;
133        if file_paths.iter().any(|p| task.data_file_path() == p) {
134            target_tasks.push(task);
135        }
136    }
137
138    if target_tasks.is_empty() {
139        return Ok(Vec::new());
140    }
141
142    // Read the filtered tasks via ArrowReader.
143    let reader = table.reader_builder().with_batch_size(8192).build();
144
145    let batch_stream = reader
146        .read(Box::pin(tokio_stream::iter(
147            target_tasks.into_iter().map(Ok),
148        )))
149        .map_err(|e| ConnectorError::ReadError(format!("build reader: {e}")))?;
150
151    let mut batch_stream = std::pin::pin!(batch_stream);
152    let mut batches = Vec::new();
153    while let Some(result) = batch_stream.next().await {
154        let batch = result.map_err(|e| ConnectorError::ReadError(format!("read batch: {e}")))?;
155        batches.push(batch);
156    }
157
158    Ok(batches)
159}