laminar_connectors/lakehouse/
iceberg_incremental.rs1#![cfg(feature = "iceberg")]
6
7use arrow_array::RecordBatch;
8use iceberg::spec::ManifestStatus;
9use iceberg::table::Table;
10use tracing::{debug, warn};
11
12use crate::error::ConnectorError;
13
14pub async fn scan_incremental(
28 table: &Table,
29 old_snapshot_id: i64,
30 new_snapshot_id: i64,
31 select_columns: &[String],
32) -> Result<Vec<RecordBatch>, ConnectorError> {
33 let metadata = table.metadata();
34 let file_io = table.file_io();
35
36 let snapshot = metadata.snapshot_by_id(new_snapshot_id).ok_or_else(|| {
37 ConnectorError::ReadError(format!("snapshot {new_snapshot_id} not found"))
38 })?;
39
40 let manifest_list = snapshot
41 .load_manifest_list(file_io, metadata)
42 .await
43 .map_err(|e| ConnectorError::ReadError(format!("load manifest list: {e}")))?;
44
45 let mut new_file_paths = Vec::new();
46
47 for manifest_file in manifest_list.entries() {
48 if manifest_file.added_snapshot_id <= old_snapshot_id {
49 continue;
50 }
51
52 let manifest = manifest_file
53 .load_manifest(file_io)
54 .await
55 .map_err(|e| ConnectorError::ReadError(format!("load manifest: {e}")))?;
56
57 for entry in manifest.entries() {
58 match entry.status {
59 ManifestStatus::Added => {
60 new_file_paths.push(entry.file_path().to_string());
61 }
62 ManifestStatus::Deleted => {
63 warn!(
67 old_snapshot_id,
68 new_snapshot_id,
69 deleted_file = entry.file_path(),
70 "incremental scan: detected deleted entry, falling back to full scan"
71 );
72 return super::iceberg_io::scan_table(
73 table,
74 Some(new_snapshot_id),
75 select_columns,
76 )
77 .await;
78 }
79 ManifestStatus::Existing => {}
80 }
81 }
82 }
83
84 if new_file_paths.is_empty() {
85 debug!(
86 old_snapshot_id,
87 new_snapshot_id, "incremental scan: no new data files"
88 );
89 return Ok(Vec::new());
90 }
91
92 debug!(
93 old_snapshot_id,
94 new_snapshot_id,
95 files = new_file_paths.len(),
96 "incremental scan: reading new data files"
97 );
98
99 read_files_via_scan(table, new_snapshot_id, select_columns, &new_file_paths).await
100}
101
102async fn read_files_via_scan(
104 table: &Table,
105 snapshot_id: i64,
106 select_columns: &[String],
107 file_paths: &[String],
108) -> Result<Vec<RecordBatch>, ConnectorError> {
109 use tokio_stream::StreamExt;
110
111 let mut scan_builder = table.scan().snapshot_id(snapshot_id);
112
113 if select_columns.is_empty() {
114 scan_builder = scan_builder.select_all();
115 } else {
116 scan_builder = scan_builder.select(select_columns.iter().map(String::as_str));
117 }
118
119 let scan = scan_builder
120 .build()
121 .map_err(|e| ConnectorError::ReadError(format!("build scan: {e}")))?;
122
123 let file_tasks = scan
125 .plan_files()
126 .await
127 .map_err(|e| ConnectorError::ReadError(format!("plan files: {e}")))?;
128
129 let mut file_tasks = std::pin::pin!(file_tasks);
130 let mut target_tasks = Vec::new();
131 while let Some(task_result) = file_tasks.next().await {
132 let task = task_result.map_err(|e| ConnectorError::ReadError(format!("file task: {e}")))?;
133 if file_paths.iter().any(|p| task.data_file_path() == p) {
134 target_tasks.push(task);
135 }
136 }
137
138 if target_tasks.is_empty() {
139 return Ok(Vec::new());
140 }
141
142 let reader = table.reader_builder().with_batch_size(8192).build();
144
145 let batch_stream = reader
146 .read(Box::pin(tokio_stream::iter(
147 target_tasks.into_iter().map(Ok),
148 )))
149 .map_err(|e| ConnectorError::ReadError(format!("build reader: {e}")))?;
150
151 let mut batch_stream = std::pin::pin!(batch_stream);
152 let mut batches = Vec::new();
153 while let Some(result) = batch_stream.next().await {
154 let batch = result.map_err(|e| ConnectorError::ReadError(format!("read batch: {e}")))?;
155 batches.push(batch);
156 }
157
158 Ok(batches)
159}