1#[cfg(feature = "delta-lake")]
25use std::collections::HashMap;
26
27#[cfg(feature = "delta-lake")]
28use std::sync::Arc;
29
30#[cfg(feature = "delta-lake")]
31use arrow_array::RecordBatch;
32
33#[cfg(feature = "delta-lake")]
34use arrow_schema::SchemaRef;
35
36#[cfg(feature = "delta-lake")]
38use deltalake::kernel::engine::arrow_conversion::TryIntoKernel as _;
39
40#[cfg(feature = "delta-lake")]
41use deltalake::kernel::transaction::CommitProperties;
42
43#[cfg(feature = "delta-lake")]
44use deltalake::kernel::Transaction;
45
46#[cfg(feature = "delta-lake")]
47use deltalake::operations::write::SchemaMode;
48
49#[cfg(feature = "delta-lake")]
50use deltalake::protocol::SaveMode;
51
52#[cfg(feature = "delta-lake")]
53use deltalake::DeltaTable;
54
55#[cfg(feature = "delta-lake")]
56use tracing::{debug, info, warn};
57
58#[cfg(feature = "delta-lake")]
59use url::Url;
60
61#[cfg(feature = "delta-lake")]
62use crate::error::ConnectorError;
63
64#[cfg(feature = "delta-lake")]
66fn path_to_url(path: &str) -> Result<Url, ConnectorError> {
67 if path.contains("://") {
69 Url::parse(path)
70 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid URL '{path}': {e}")))
71 } else {
72 let path_buf = std::path::Path::new(path);
75 let normalized = if path_buf.exists() {
76 std::fs::canonicalize(path_buf).map_err(|e| {
77 ConnectorError::ConfigurationError(format!("invalid path '{path}': {e}"))
78 })?
79 } else {
80 if path_buf.is_absolute() {
83 path_buf.to_path_buf()
84 } else {
85 std::env::current_dir()
86 .map_err(|e| {
87 ConnectorError::ConfigurationError(format!("cannot get current dir: {e}"))
88 })?
89 .join(path_buf)
90 }
91 };
92
93 Url::from_directory_path(&normalized).map_err(|()| {
94 ConnectorError::ConfigurationError(format!(
95 "cannot convert path to URL: {}",
96 normalized.display()
97 ))
98 })
99 }
100}
101
102#[cfg(feature = "delta-lake")]
118#[allow(clippy::implicit_hasher)]
119pub async fn open_or_create_table(
120 table_path: &str,
121 storage_options: HashMap<String, String>,
122 schema: Option<&SchemaRef>,
123) -> Result<DeltaTable, ConnectorError> {
124 info!(table_path, "opening Delta Lake table");
125
126 let url = path_to_url(table_path)?;
127
128 let table = DeltaTable::try_from_url_with_storage_options(url, storage_options)
132 .await
133 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to open table: {e}")))?;
134
135 if table.version().is_some() {
137 info!(
138 table_path,
139 version = table.version(),
140 "opened existing Delta Lake table"
141 );
142 return Ok(table);
143 }
144
145 let Some(schema) = schema else {
147 info!(
148 table_path,
149 "table does not exist yet; will create on first write"
150 );
151 return Ok(table);
152 };
153
154 info!(table_path, "creating new Delta Lake table");
155
156 let delta_schema: deltalake::kernel::StructType = schema
158 .as_ref()
159 .try_into_kernel()
160 .map_err(|e| ConnectorError::SchemaMismatch(format!("schema conversion failed: {e}")))?;
161
162 let table = table
164 .create()
165 .with_columns(delta_schema.fields().cloned())
166 .await
167 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to create table: {e}")))?;
168
169 info!(
170 table_path,
171 version = table.version(),
172 "created new Delta Lake table"
173 );
174
175 Ok(table)
176}
177
178#[cfg(feature = "delta-lake")]
198#[allow(clippy::too_many_arguments)]
199pub async fn write_batches(
200 table: DeltaTable,
201 batches: Vec<RecordBatch>,
202 writer_id: &str,
203 epoch: u64,
204 save_mode: SaveMode,
205 partition_columns: Option<&[String]>,
206 schema_evolution: bool,
207 target_file_size: Option<usize>,
208 create_checkpoint: bool,
209 writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
210) -> Result<(DeltaTable, i64), ConnectorError> {
211 if batches.is_empty() {
212 debug!("no batches to write, skipping");
213 let version = table.version().unwrap_or(0);
214 return Ok((table, version));
215 }
216
217 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
218
219 debug!(
220 writer_id,
221 epoch,
222 total_rows,
223 num_batches = batches.len(),
224 "writing batches to Delta Lake"
225 );
226
227 #[allow(clippy::cast_possible_wrap)]
231 let epoch_i64 = epoch as i64;
232
233 let mut write_builder = table
234 .write(batches)
235 .with_save_mode(save_mode)
236 .with_commit_properties(
237 CommitProperties::default()
238 .with_application_transaction(Transaction::new(writer_id, epoch_i64))
239 .with_create_checkpoint(create_checkpoint),
240 );
241
242 if let Some(size) = target_file_size {
245 write_builder = write_builder.with_target_file_size(size);
246 }
247
248 if schema_evolution {
250 write_builder = write_builder.with_schema_mode(SchemaMode::Merge);
251 }
252
253 if let Some(cols) = partition_columns {
255 if !cols.is_empty() {
256 write_builder = write_builder.with_partition_columns(cols.to_vec());
257 }
258 }
259
260 if let Some(props) = writer_properties {
261 write_builder = write_builder.with_writer_properties(props);
262 }
263
264 let table = write_builder
266 .await
267 .map_err(|e| ConnectorError::WriteError(format!("Delta Lake write failed: {e}")))?;
268
269 let version = table.version().unwrap_or(0);
270
271 info!(
272 writer_id,
273 epoch, version, total_rows, "committed Delta Lake transaction"
274 );
275
276 Ok((table, version))
277}
278
279#[cfg(feature = "delta-lake")]
293pub async fn get_last_committed_epoch(table: &DeltaTable, writer_id: &str) -> u64 {
294 let Ok(snapshot) = table.snapshot() else {
296 debug!(writer_id, "no snapshot available, assuming epoch 0");
297 return 0;
298 };
299
300 match snapshot
301 .transaction_version(&table.log_store(), writer_id)
302 .await
303 {
304 Ok(Some(version)) => {
305 #[allow(clippy::cast_sign_loss)]
308 let epoch = version as u64;
309 debug!(
310 writer_id,
311 epoch, "found last committed epoch from txn metadata"
312 );
313 epoch
314 }
315 Ok(None) => {
316 debug!(
317 writer_id,
318 "no txn metadata found for writer, assuming epoch 0"
319 );
320 0
321 }
322 Err(e) => {
323 warn!(writer_id, error = %e, "failed to read txn metadata, assuming epoch 0");
324 0
325 }
326 }
327}
328
329#[cfg(feature = "delta-lake")]
333#[must_use]
334pub fn get_partition_columns(table: &DeltaTable) -> Vec<String> {
335 match table.snapshot() {
336 Ok(snapshot) => snapshot.snapshot().metadata().partition_columns().clone(),
337 Err(_) => Vec::new(),
338 }
339}
340
341#[cfg(feature = "delta-lake")]
355pub fn get_table_schema(table: &DeltaTable) -> Result<SchemaRef, ConnectorError> {
356 let state = table
357 .snapshot()
358 .map_err(|e| ConnectorError::SchemaMismatch(format!("table has no snapshot: {e}")))?;
359
360 Ok(state.snapshot().arrow_schema())
362}
363
364#[cfg(feature = "delta-lake")]
370pub async fn get_latest_version(table: &mut DeltaTable) -> Result<i64, ConnectorError> {
371 let log_store = table.log_store();
372 let current = table.version().unwrap_or(0);
373 log_store
374 .get_latest_version(current)
375 .await
376 .map_err(|e| ConnectorError::ReadError(format!("failed to get latest version: {e}")))
377}
378
379#[cfg(feature = "delta-lake")]
399pub async fn read_batches_at_version(
400 table: &mut DeltaTable,
401 version: i64,
402 max_records: usize,
403) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
404 use datafusion::prelude::SessionContext;
405 use tokio_stream::StreamExt;
406
407 table
409 .load_version(version)
410 .await
411 .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
412
413 debug!(version, "Delta Lake: loaded version for reading");
414
415 let provider =
417 table.table_provider().build().await.map_err(|e| {
418 ConnectorError::ReadError(format!("failed to build table provider: {e}"))
419 })?;
420
421 let ctx = SessionContext::new();
422 ctx.register_table("delta_source_scan", Arc::new(provider))
423 .map_err(|e| ConnectorError::ReadError(format!("failed to register scan table: {e}")))?;
424
425 let df = ctx
427 .sql("SELECT * FROM delta_source_scan")
428 .await
429 .map_err(|e| ConnectorError::ReadError(format!("scan query failed: {e}")))?;
430
431 let df = if max_records < usize::MAX {
432 df.limit(0, Some(max_records))
433 .map_err(|e| ConnectorError::ReadError(format!("limit failed: {e}")))?
434 } else {
435 df
436 };
437
438 let mut stream = df
440 .execute_stream()
441 .await
442 .map_err(|e| ConnectorError::ReadError(format!("stream execution failed: {e}")))?;
443
444 let mut batches = Vec::new();
445 let mut total_rows: usize = 0;
446
447 while let Some(result) = stream.next().await {
448 let batch =
449 result.map_err(|e| ConnectorError::ReadError(format!("stream batch failed: {e}")))?;
450 if batch.num_rows() == 0 {
451 continue;
452 }
453 total_rows += batch.num_rows();
454 batches.push(batch);
455
456 if total_rows >= max_records {
458 break;
459 }
460 }
461
462 let fully_consumed = if total_rows >= max_records {
466 stream.next().await.is_none()
467 } else {
468 true
469 };
470
471 debug!(
472 version,
473 num_batches = batches.len(),
474 total_rows,
475 fully_consumed,
476 "Delta Lake: scanned version"
477 );
478
479 Ok((batches, fully_consumed))
480}
481
482#[cfg(feature = "delta-lake")]
496#[allow(clippy::too_many_lines)]
497pub async fn read_version_diff(
498 table: &mut DeltaTable,
499 version: i64,
500 max_records: usize,
501 partition_filter: Option<&str>,
502) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
503 const MAX_DIRECT_READ_BYTES: u64 = 256 * 1024 * 1024;
506
507 if version <= 0 {
509 return read_batches_at_version(table, version, max_records).await;
510 }
511
512 let log_store = table.log_store();
515 let store = log_store.object_store(None);
516
517 let commit_data = log_store
518 .read_commit_entry(version)
519 .await
520 .map_err(|e| ConnectorError::ReadError(format!("read commit {version}: {e}")))?
521 .ok_or_else(|| {
522 ConnectorError::ReadError(format!(
523 "version {version} not available (cleaned up or never existed)"
524 ))
525 })?;
526 let commit_str = std::str::from_utf8(&commit_data)
527 .map_err(|e| ConnectorError::ReadError(format!("commit log is not valid UTF-8: {e}")))?;
528
529 let mut added_paths = Vec::new();
532 let mut removed_paths = std::collections::HashSet::new();
533 for line in commit_str.lines() {
534 let line = line.trim();
535 if line.is_empty() {
536 continue;
537 }
538 if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
539 if let Some(add) = obj.get("add") {
540 if let Some(path) = add.get("path").and_then(|p| p.as_str()) {
541 added_paths.push(decode_delta_path(path));
542 }
543 }
544 if let Some(remove) = obj.get("remove") {
545 if let Some(path) = remove.get("path").and_then(|p| p.as_str()) {
546 removed_paths.insert(decode_delta_path(path));
547 }
548 }
549 }
550 }
551
552 added_paths.retain(|p| !removed_paths.contains(p));
554
555 if added_paths.is_empty() {
556 debug!(
557 version,
558 num_removed = removed_paths.len(),
559 "Delta Lake: no net-new add actions in version"
560 );
561 return Ok((Vec::new(), true));
562 }
563
564 debug!(
565 version,
566 num_added_files = added_paths.len(),
567 num_removed_files = removed_paths.len(),
568 "Delta Lake: reading added files"
569 );
570
571 table
573 .load_version(version)
574 .await
575 .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
576
577 let table_schema = table
578 .snapshot()
579 .map(|s| s.snapshot().arrow_schema())
580 .map_err(|e| ConnectorError::ReadError(format!("no snapshot at version {version}: {e}")))?;
581
582 let added_paths = if let Some(filter) = partition_filter {
585 filter_paths_by_partition(&added_paths, filter)
586 } else {
587 added_paths
588 };
589
590 let mut batches = Vec::new();
594 let mut total_rows: usize = 0;
595
596 for file_path in &added_paths {
597 if total_rows >= max_records {
598 break;
599 }
600
601 let obj_path = deltalake::Path::from(file_path.as_str());
602
603 let file_meta = store
606 .head(&obj_path)
607 .await
608 .map_err(|e| ConnectorError::ReadError(format!("failed to stat '{file_path}': {e}")))?;
609 if file_meta.size > MAX_DIRECT_READ_BYTES {
610 warn!(
611 file_path,
612 file_size = file_meta.size,
613 "file too large for direct read, falling back to DataFusion scan"
614 );
615 return read_batches_at_version(table, version, max_records).await;
616 }
617
618 let file_bytes = get_with_retry(&store, &obj_path, file_path).await?;
619
620 let parquet_reader =
621 deltalake::parquet::arrow::arrow_reader::ArrowReaderBuilder::try_new(file_bytes)
622 .map_err(|e| {
623 ConnectorError::ReadError(format!(
624 "failed to open Parquet file '{file_path}': {e}"
625 ))
626 })?;
627
628 let remaining = max_records.saturating_sub(total_rows).saturating_add(1);
630 let reader = parquet_reader.with_limit(remaining).build().map_err(|e| {
631 ConnectorError::ReadError(format!("failed to build reader for '{file_path}': {e}"))
632 })?;
633
634 for result in reader {
635 let batch: RecordBatch = result.map_err(|e| {
636 ConnectorError::ReadError(format!("Parquet read error in '{file_path}': {e}"))
637 })?;
638 if batch.num_rows() == 0 {
639 continue;
640 }
641
642 let batch = if batch.schema() == table_schema {
645 batch
646 } else {
647 align_batch_to_schema(&batch, &table_schema)?
648 };
649
650 total_rows += batch.num_rows();
651 batches.push(batch);
652
653 if total_rows >= max_records {
654 break;
655 }
656 }
657 }
658
659 let fully_consumed = total_rows <= max_records;
662 if !fully_consumed {
663 let excess = total_rows - max_records;
665 let len = batches.len();
666 if len > 0 {
667 let last = &batches[len - 1];
668 if last.num_rows() > excess {
669 batches[len - 1] = last.slice(0, last.num_rows() - excess);
670 } else {
671 batches.pop();
672 }
673 }
674 }
675
676 debug!(
677 version,
678 num_batches = batches.len(),
679 fully_consumed,
680 num_added_files = added_paths.len(),
681 "Delta Lake: read version diff"
682 );
683
684 Ok((batches, fully_consumed))
685}
686
687#[cfg(feature = "delta-lake")]
690async fn get_with_retry(
691 store: &Arc<dyn deltalake::ObjectStore>,
692 path: &deltalake::Path,
693 display_path: &str,
694) -> Result<bytes::Bytes, ConnectorError> {
695 let backoff = [200u64, 1000, 4000];
696 let mut last_err = None;
697
698 for attempt in 0..=backoff.len() {
699 match store.get(path).await {
700 Ok(result) => {
701 return result.bytes().await.map_err(|e| {
702 ConnectorError::ReadError(format!(
703 "failed to read bytes of '{display_path}': {e}"
704 ))
705 });
706 }
707 Err(e) => {
708 let msg = e.to_string();
709 if msg.contains("not found") || msg.contains("404") {
710 return Err(ConnectorError::ReadError(format!(
711 "file not found '{display_path}': {e}"
712 )));
713 }
714 if let Some(&delay) = backoff.get(attempt) {
715 warn!(
716 attempt = attempt + 1,
717 delay_ms = delay,
718 error = %e,
719 path = display_path,
720 "object_store read failed, retrying"
721 );
722 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
723 }
724 last_err = Some(e);
725 }
726 }
727 }
728
729 Err(ConnectorError::ReadError(format!(
730 "failed to read '{display_path}' after {} attempts: {}",
731 backoff.len() + 1,
732 last_err.map_or_else(|| "unknown".to_string(), |e| e.to_string())
733 )))
734}
735
736#[cfg(feature = "delta-lake")]
743fn filter_paths_by_partition(paths: &[String], filter: &str) -> Vec<String> {
744 let mut required_segments: Vec<String> = Vec::new();
746 for clause in filter
747 .split_whitespace()
748 .collect::<Vec<_>>()
749 .join(" ")
750 .split(" AND ")
751 {
752 let clause = clause.trim();
753 if let Some((col, val)) = clause.split_once('=') {
754 let col = col.trim();
755 let val = val.trim().trim_matches('\'').trim_matches('"');
756 if !col.is_empty() && !val.is_empty() {
757 required_segments.push(format!("{col}={val}"));
758 }
759 }
760 }
761
762 if required_segments.is_empty() {
763 return paths.to_vec();
764 }
765
766 paths
767 .iter()
768 .filter(|path| required_segments.iter().all(|seg| path.contains(seg)))
769 .cloned()
770 .collect()
771}
772
773#[cfg(feature = "delta-lake")]
778fn decode_delta_path(encoded: &str) -> String {
779 url::Url::parse(&format!("file:///{encoded}")).map_or_else(
780 |_| encoded.to_string(),
781 |u| {
782 let p = u.path();
783 p.strip_prefix('/').unwrap_or(p).to_string()
784 },
785 )
786}
787
788#[cfg(feature = "delta-lake")]
792fn align_batch_to_schema(
793 batch: &RecordBatch,
794 target_schema: &SchemaRef,
795) -> Result<RecordBatch, ConnectorError> {
796 use arrow_array::new_null_array;
797
798 let mut columns = Vec::with_capacity(target_schema.fields().len());
799 for field in target_schema.fields() {
800 if let Ok(col_idx) = batch.schema().index_of(field.name()) {
801 columns.push(batch.column(col_idx).clone());
802 } else {
803 columns.push(new_null_array(field.data_type(), batch.num_rows()));
804 }
805 }
806
807 RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| {
808 ConnectorError::ReadError(format!("failed to align batch to table schema: {e}"))
809 })
810}
811
812#[cfg(feature = "delta-lake")]
821pub async fn read_cdf_batches(
822 table: DeltaTable,
823 start_version: i64,
824 end_version: i64,
825) -> Result<Vec<RecordBatch>, ConnectorError> {
826 use datafusion::prelude::SessionContext;
827 use tokio_stream::StreamExt;
828
829 debug!(start_version, end_version, "reading CDF batches");
830
831 let ctx = SessionContext::new();
832
833 let session_state = ctx.state();
835
836 let cdf_builder = table
837 .scan_cdf()
838 .with_starting_version(start_version)
839 .with_ending_version(end_version);
840
841 let plan = cdf_builder
842 .build(&session_state, None)
843 .await
844 .map_err(|e| ConnectorError::ReadError(format!("CDF scan build failed: {e}")))?;
845
846 let task_ctx = ctx.task_ctx();
848 let mut stream = datafusion::physical_plan::execute_stream(plan, task_ctx)
849 .map_err(|e| ConnectorError::ReadError(format!("CDF stream execution failed: {e}")))?;
850
851 let mut batches = Vec::new();
852 while let Some(result) = stream.next().await {
853 let batch: RecordBatch = result
854 .map_err(|e| ConnectorError::ReadError(format!("CDF stream batch failed: {e}")))?;
855 if batch.num_rows() > 0 {
856 batches.push(batch);
857 }
858 }
859
860 debug!(
861 start_version,
862 end_version,
863 num_batches = batches.len(),
864 "CDF scan complete"
865 );
866
867 Ok(batches)
868}
869
870#[cfg(feature = "delta-lake")]
878pub fn map_cdf_to_changelog(batch: &RecordBatch) -> Result<Option<RecordBatch>, ConnectorError> {
879 use arrow_array::StringArray;
880
881 let schema = batch.schema();
882 let Ok(ct_idx) = schema.index_of("_change_type") else {
883 return Ok(Some(batch.clone()));
884 };
885
886 let change_type = batch
887 .column(ct_idx)
888 .as_any()
889 .downcast_ref::<StringArray>()
890 .ok_or_else(|| ConnectorError::ReadError("_change_type is not Utf8".into()))?;
891
892 let (keep, ops): (Vec<bool>, Vec<Option<&str>>) = (0..batch.num_rows())
894 .map(|i| match change_type.value(i) {
895 "update_postimage" => (true, Some("U")),
896 "delete" => (true, Some("D")),
897 "update_preimage" => (false, Some("")),
898 _ => (true, Some("I")), })
900 .unzip();
901
902 let filter = arrow_array::BooleanArray::from(keep);
903 let filtered = arrow_select::filter::filter_record_batch(batch, &filter)
904 .map_err(|e| ConnectorError::ReadError(format!("CDF filter failed: {e}")))?;
905 if filtered.num_rows() == 0 {
906 return Ok(None);
907 }
908
909 let op_arr: StringArray = ops.into_iter().collect();
911 let op_filtered = arrow_select::filter::filter(&op_arr, &filter)
912 .map_err(|e| ConnectorError::ReadError(format!("CDF op filter: {e}")))?;
913
914 let cdf_meta = ["_change_type", "_commit_version", "_commit_timestamp"];
916 let mut fields = Vec::new();
917 let mut columns: Vec<Arc<dyn arrow_array::Array>> = Vec::new();
918 for (i, field) in filtered.schema().fields().iter().enumerate() {
919 if !cdf_meta.contains(&field.name().as_str()) {
920 fields.push(field.clone());
921 columns.push(filtered.column(i).clone());
922 }
923 }
924 fields.push(Arc::new(arrow_schema::Field::new(
925 "_op",
926 arrow_schema::DataType::Utf8,
927 false,
928 )));
929 columns.push(op_filtered);
930
931 RecordBatch::try_new(Arc::new(arrow_schema::Schema::new(fields)), columns)
932 .map(Some)
933 .map_err(|e| ConnectorError::ReadError(format!("CDF batch rebuild: {e}")))
934}
935
936#[cfg(feature = "delta-lake")]
938#[derive(Debug)]
939pub struct MergeResult {
940 pub rows_inserted: usize,
942 pub rows_updated: usize,
944 pub rows_deleted: usize,
946}
947
948#[cfg(feature = "delta-lake")]
961#[allow(clippy::too_many_lines)]
962#[allow(clippy::too_many_arguments)]
963pub async fn merge_changelog(
964 table: DeltaTable,
965 source_batch: RecordBatch,
966 key_columns: &[String],
967 writer_id: &str,
968 epoch: u64,
969 schema_evolution: bool,
970 writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
971 ctx: &datafusion::prelude::SessionContext,
972) -> Result<(DeltaTable, MergeResult), ConnectorError> {
973 use datafusion::prelude::*;
974 use deltalake::kernel::transaction::CommitProperties;
975 use deltalake::kernel::Transaction;
976
977 const CDC_COLUMNS: &[&str] = &["_op", "_ts_ms"];
978
979 if source_batch.num_rows() == 0 {
980 return Ok((
981 table,
982 MergeResult {
983 rows_inserted: 0,
984 rows_updated: 0,
985 rows_deleted: 0,
986 },
987 ));
988 }
989
990 debug!(
991 key_columns = ?key_columns,
992 source_rows = source_batch.num_rows(),
993 "performing atomic changelog MERGE"
994 );
995
996 let source_df = ctx.read_batch(source_batch).map_err(|e| {
997 ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
998 })?;
999
1000 let predicate = key_columns
1002 .iter()
1003 .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
1004 .reduce(Expr::and)
1005 .ok_or_else(|| {
1006 ConnectorError::ConfigurationError("merge requires at least one key column".into())
1007 })?;
1008
1009 let source_schema = source_df.schema().clone();
1010 let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
1011
1012 let all_user_columns: Vec<String> = source_schema
1014 .fields()
1015 .iter()
1016 .map(|f| f.name().clone())
1017 .filter(|name| !CDC_COLUMNS.contains(&name.as_str()))
1018 .collect();
1019
1020 let non_key_user_columns: Vec<String> = all_user_columns
1021 .iter()
1022 .filter(|c| !key_set.contains(c.as_str()))
1023 .cloned()
1024 .collect();
1025
1026 let upsert_pred = col("source._op").in_list(vec![lit("I"), lit("U"), lit("r")], false);
1028 let delete_pred = col("source._op").eq(lit("D"));
1029
1030 #[allow(clippy::cast_possible_wrap)]
1031 let epoch_i64 = epoch as i64;
1032
1033 let non_key_for_update = non_key_user_columns;
1034 let all_for_insert = all_user_columns;
1035
1036 let mut merge_builder = table
1037 .merge(source_df, predicate)
1038 .with_source_alias("source")
1039 .with_target_alias("target")
1040 .with_commit_properties(
1041 CommitProperties::default()
1042 .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
1043 )
1044 .when_matched_update(|update| {
1045 let mut u = update.predicate(upsert_pred.clone());
1046 for col_name in &non_key_for_update {
1047 u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
1048 }
1049 u
1050 })
1051 .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
1052 .when_matched_delete(|delete| delete.predicate(delete_pred))
1053 .map_err(|e| ConnectorError::WriteError(format!("merge matched-delete failed: {e}")))?
1054 .when_not_matched_insert(|insert| {
1055 let mut ins = insert.predicate(upsert_pred);
1056 for col_name in &all_for_insert {
1057 ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
1058 }
1059 ins
1060 })
1061 .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
1062
1063 if schema_evolution {
1064 merge_builder = merge_builder.with_merge_schema(true);
1065 }
1066
1067 if let Some(props) = writer_properties {
1068 merge_builder = merge_builder.with_writer_properties(props);
1069 }
1070
1071 let (table, metrics) = merge_builder.await.map_err(|e| {
1072 ConnectorError::WriteError(format!("Delta Lake changelog MERGE failed: {e}"))
1073 })?;
1074
1075 let result = MergeResult {
1076 rows_inserted: metrics.num_target_rows_inserted,
1077 rows_updated: metrics.num_target_rows_updated,
1078 rows_deleted: metrics.num_target_rows_deleted,
1079 };
1080
1081 info!(
1082 writer_id,
1083 epoch,
1084 rows_inserted = result.rows_inserted,
1085 rows_updated = result.rows_updated,
1086 rows_deleted = result.rows_deleted,
1087 "Delta Lake changelog MERGE complete"
1088 );
1089
1090 Ok((table, result))
1091}
1092
1093#[cfg(feature = "delta-lake")]
1095#[derive(Debug)]
1096pub struct CompactionResult {
1097 pub files_added: u64,
1099 pub files_removed: u64,
1101 pub partitions_optimized: u64,
1103}
1104
1105#[cfg(feature = "delta-lake")]
1114pub async fn run_compaction(
1115 table: DeltaTable,
1116 target_file_size: u64,
1117 z_order_columns: &[String],
1118 writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
1119) -> Result<(DeltaTable, CompactionResult), ConnectorError> {
1120 use deltalake::operations::optimize::OptimizeType;
1121
1122 info!(target_file_size, "running Delta Lake compaction (OPTIMIZE)");
1123
1124 let optimize_type = if z_order_columns.is_empty() {
1125 OptimizeType::Compact
1126 } else {
1127 OptimizeType::ZOrder(z_order_columns.to_vec())
1128 };
1129
1130 let mut optimize_builder = table
1131 .optimize()
1132 .with_type(optimize_type)
1133 .with_target_size(target_file_size);
1134
1135 if let Some(props) = writer_properties {
1136 optimize_builder = optimize_builder.with_writer_properties(props);
1137 }
1138
1139 let (table, metrics) = optimize_builder
1140 .await
1141 .map_err(|e| ConnectorError::Internal(format!("compaction failed: {e}")))?;
1142
1143 let result = CompactionResult {
1144 files_added: metrics.num_files_added,
1145 files_removed: metrics.num_files_removed,
1146 partitions_optimized: metrics.partitions_optimized,
1147 };
1148
1149 info!(
1150 files_added = result.files_added,
1151 files_removed = result.files_removed,
1152 partitions_optimized = result.partitions_optimized,
1153 "Delta Lake compaction complete"
1154 );
1155
1156 Ok((table, result))
1157}
1158
1159#[cfg(feature = "delta-lake")]
1165pub async fn run_vacuum(
1166 table: DeltaTable,
1167 retention: std::time::Duration,
1168) -> Result<(DeltaTable, usize), ConnectorError> {
1169 let retention_hours = retention.as_secs() / 3600;
1170 info!(retention_hours, "running Delta Lake VACUUM");
1171
1172 let chrono_duration =
1173 chrono::Duration::from_std(retention).unwrap_or_else(|_| chrono::Duration::hours(168)); let (table, metrics) = table
1176 .vacuum()
1177 .with_retention_period(chrono_duration)
1178 .await
1179 .map_err(|e| ConnectorError::Internal(format!("vacuum failed: {e}")))?;
1180
1181 let files_deleted = metrics.files_deleted.len();
1182
1183 info!(files_deleted, "Delta Lake VACUUM complete");
1184
1185 Ok((table, files_deleted))
1186}
1187
1188#[cfg(feature = "delta-lake")]
1198#[allow(clippy::implicit_hasher, clippy::unused_async)]
1199pub async fn resolve_catalog_options(
1200 catalog: &super::delta_config::DeltaCatalogType,
1201 #[allow(unused_variables)] catalog_database: Option<&str>,
1202 #[allow(unused_variables)] catalog_name: Option<&str>,
1203 _catalog_schema: Option<&str>,
1204 table_path: &str,
1205 base_storage_options: &HashMap<String, String>,
1206) -> Result<(String, HashMap<String, String>), ConnectorError> {
1207 use super::delta_config::DeltaCatalogType;
1208
1209 match catalog {
1210 DeltaCatalogType::None => Ok((table_path.to_string(), base_storage_options.clone())),
1211 #[cfg(feature = "delta-lake-glue")]
1212 DeltaCatalogType::Glue => {
1213 use deltalake::DataCatalog;
1214 let database = catalog_database.ok_or_else(|| {
1215 ConnectorError::ConfigurationError(
1216 "Glue catalog requires 'catalog.database'".into(),
1217 )
1218 })?;
1219 let glue = deltalake_catalog_glue::GlueDataCatalog::from_env()
1220 .await
1221 .map_err(|e| {
1222 ConnectorError::ConnectionFailed(format!("failed to init Glue catalog: {e}"))
1223 })?;
1224 let resolved = glue
1225 .get_table_storage_location(catalog_name.map(String::from), database, table_path)
1226 .await
1227 .map_err(|e| {
1228 ConnectorError::ConnectionFailed(format!(
1229 "Glue catalog lookup failed for '{database}.{table_path}': {e}"
1230 ))
1231 })?;
1232 info!(
1233 glue_database = database,
1234 table = table_path,
1235 resolved_path = %resolved,
1236 "resolved table path via Glue catalog"
1237 );
1238 Ok((resolved, base_storage_options.clone()))
1239 }
1240 #[cfg(not(feature = "delta-lake-glue"))]
1241 DeltaCatalogType::Glue => Err(ConnectorError::ConfigurationError(
1242 "Glue catalog requires the 'delta-lake-glue' feature. \
1243 Build with: cargo build --features delta-lake-glue"
1244 .into(),
1245 )),
1246 #[cfg(feature = "delta-lake-unity")]
1247 DeltaCatalogType::Unity {
1248 workspace_url,
1249 access_token,
1250 } => {
1251 let full_name = table_path.strip_prefix("uc://").unwrap_or(table_path);
1257
1258 let storage_location = super::unity_catalog::get_table_storage_location(
1259 workspace_url,
1260 access_token,
1261 full_name,
1262 )
1263 .await?;
1264
1265 Ok((storage_location, base_storage_options.clone()))
1266 }
1267 #[cfg(not(feature = "delta-lake-unity"))]
1268 DeltaCatalogType::Unity { .. } => Err(ConnectorError::ConfigurationError(
1269 "Unity catalog requires the 'delta-lake-unity' feature. \
1270 Build with: cargo build --features delta-lake-unity"
1271 .into(),
1272 )),
1273 }
1274}
1275
1276#[cfg(all(test, feature = "delta-lake"))]
1281mod tests {
1282 use super::*;
1283 use arrow_array::{Float64Array, Int64Array, StringArray};
1284 use arrow_schema::{DataType, Field, Schema};
1285 use std::sync::Arc;
1286 use tempfile::TempDir;
1287
1288 fn test_schema() -> SchemaRef {
1289 Arc::new(Schema::new(vec![
1290 Field::new("id", DataType::Int64, false),
1291 Field::new("name", DataType::Utf8, true),
1292 Field::new("value", DataType::Float64, true),
1293 ]))
1294 }
1295
1296 fn test_batch(n: usize) -> RecordBatch {
1297 let ids: Vec<i64> = (0..n as i64).collect();
1298 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1299 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1300
1301 RecordBatch::try_new(
1302 test_schema(),
1303 vec![
1304 Arc::new(Int64Array::from(ids)),
1305 Arc::new(StringArray::from(names)),
1306 Arc::new(Float64Array::from(values)),
1307 ],
1308 )
1309 .unwrap()
1310 }
1311
1312 #[tokio::test]
1313 async fn test_open_creates_table() {
1314 let temp_dir = TempDir::new().unwrap();
1315 let table_path = temp_dir.path().to_str().unwrap();
1316
1317 let schema = test_schema();
1319 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1320 .await
1321 .unwrap();
1322
1323 assert_eq!(table.version(), Some(0));
1324
1325 let delta_log = temp_dir.path().join("_delta_log");
1327 assert!(delta_log.exists(), "_delta_log directory should exist");
1328 }
1329
1330 #[tokio::test]
1331 async fn test_open_existing_table() {
1332 let temp_dir = TempDir::new().unwrap();
1333 let table_path = temp_dir.path().to_str().unwrap();
1334
1335 let schema = test_schema();
1337 let _ = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1338 .await
1339 .unwrap();
1340
1341 let table = open_or_create_table(table_path, HashMap::new(), None)
1343 .await
1344 .unwrap();
1345
1346 assert_eq!(table.version(), Some(0));
1347 }
1348
1349 #[tokio::test]
1350 async fn test_open_nonexistent_without_schema_defers() {
1351 let temp_dir = TempDir::new().unwrap();
1352 let nonexistent_table = temp_dir.path().join("nonexistent");
1353 std::fs::create_dir_all(&nonexistent_table).unwrap();
1354 let table_path = nonexistent_table.to_str().unwrap();
1355
1356 let result = open_or_create_table(table_path, HashMap::new(), None).await;
1358 assert!(result.is_ok());
1359 let table = result.unwrap();
1360 assert!(table.version().is_none(), "table should be uninitialized");
1361 }
1362
1363 #[tokio::test]
1364 async fn test_write_batch_creates_parquet() {
1365 let temp_dir = TempDir::new().unwrap();
1366 let table_path = temp_dir.path().to_str().unwrap();
1367
1368 let schema = test_schema();
1370 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1371 .await
1372 .unwrap();
1373
1374 let batch = test_batch(100);
1376 let (table, version) = write_batches(
1377 table,
1378 vec![batch],
1379 "test-writer",
1380 1,
1381 SaveMode::Append,
1382 None,
1383 false,
1384 None,
1385 false,
1386 None,
1387 )
1388 .await
1389 .unwrap();
1390
1391 assert_eq!(version, 1);
1392 assert_eq!(table.version(), Some(1));
1393
1394 let parquet_files: Vec<_> = std::fs::read_dir(temp_dir.path())
1396 .unwrap()
1397 .filter_map(Result::ok)
1398 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1399 .collect();
1400
1401 assert!(
1402 !parquet_files.is_empty(),
1403 "should have created Parquet files"
1404 );
1405 }
1406
1407 #[tokio::test]
1408 async fn test_exactly_once_epoch_skip() {
1409 let temp_dir = TempDir::new().unwrap();
1410 let table_path = temp_dir.path().to_str().unwrap();
1411 let writer_id = "exactly-once-writer";
1412
1413 let schema = test_schema();
1415 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1416 .await
1417 .unwrap();
1418
1419 let batch = test_batch(10);
1420 let (table, _) = write_batches(
1421 table,
1422 vec![batch.clone()],
1423 writer_id,
1424 1,
1425 SaveMode::Append,
1426 None,
1427 false,
1428 None,
1429 false,
1430 None,
1431 )
1432 .await
1433 .unwrap();
1434
1435 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1437 assert_eq!(last_epoch, 1);
1438
1439 let reopened_table = open_or_create_table(table_path, HashMap::new(), None)
1441 .await
1442 .unwrap();
1443
1444 let recovered_epoch = get_last_committed_epoch(&reopened_table, writer_id).await;
1446 assert_eq!(recovered_epoch, 1);
1447 }
1448
1449 #[tokio::test]
1450 async fn test_multiple_epochs_sequential() {
1451 let temp_dir = TempDir::new().unwrap();
1452 let table_path = temp_dir.path().to_str().unwrap();
1453 let writer_id = "sequential-writer";
1454
1455 let schema = test_schema();
1456 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1457 .await
1458 .unwrap();
1459
1460 for epoch in 1..=3 {
1462 let batch = test_batch(10);
1463 let result = write_batches(
1464 table,
1465 vec![batch],
1466 writer_id,
1467 epoch,
1468 SaveMode::Append,
1469 None,
1470 false,
1471 None,
1472 false,
1473 None,
1474 )
1475 .await
1476 .unwrap();
1477 table = result.0;
1478 assert_eq!(result.1, epoch as i64);
1479 }
1480
1481 assert_eq!(table.version(), Some(3));
1483
1484 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1486 assert_eq!(last_epoch, 3);
1487 }
1488
1489 #[tokio::test]
1490 async fn test_get_table_schema() {
1491 let temp_dir = TempDir::new().unwrap();
1492 let table_path = temp_dir.path().to_str().unwrap();
1493
1494 let expected_schema = test_schema();
1495 let table = open_or_create_table(table_path, HashMap::new(), Some(&expected_schema))
1496 .await
1497 .unwrap();
1498
1499 let actual_schema = get_table_schema(&table).unwrap();
1500
1501 assert_eq!(actual_schema.fields().len(), expected_schema.fields().len());
1503 for (expected, actual) in expected_schema.fields().iter().zip(actual_schema.fields()) {
1504 assert_eq!(expected.name(), actual.name());
1505 }
1506 }
1507
1508 #[tokio::test]
1509 async fn test_write_empty_batches() {
1510 let temp_dir = TempDir::new().unwrap();
1511 let table_path = temp_dir.path().to_str().unwrap();
1512
1513 let schema = test_schema();
1514 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1515 .await
1516 .unwrap();
1517
1518 let (table, version) = write_batches(
1520 table,
1521 vec![],
1522 "test-writer",
1523 1,
1524 SaveMode::Append,
1525 None,
1526 false,
1527 None,
1528 false,
1529 None,
1530 )
1531 .await
1532 .unwrap();
1533
1534 assert_eq!(version, 0);
1536 assert_eq!(table.version(), Some(0));
1537 }
1538
1539 #[tokio::test]
1540 async fn test_write_multiple_batches() {
1541 let temp_dir = TempDir::new().unwrap();
1543 let table_path = temp_dir.path().to_str().unwrap();
1544
1545 let schema = test_schema();
1546 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1547 .await
1548 .unwrap();
1549
1550 let batch1 = test_batch(50);
1552 let batch2 = test_batch(50);
1553 let (table, version) = write_batches(
1554 table,
1555 vec![batch1, batch2],
1556 "multi-batch-writer",
1557 1,
1558 SaveMode::Append,
1559 None,
1560 false,
1561 None,
1562 false,
1563 None,
1564 )
1565 .await
1566 .unwrap();
1567
1568 assert_eq!(version, 1);
1569 assert_eq!(table.version(), Some(1));
1570
1571 let reopened = open_or_create_table(table_path, HashMap::new(), None)
1573 .await
1574 .unwrap();
1575 assert_eq!(reopened.version(), Some(1));
1576 }
1577
1578 #[test]
1579 fn test_path_to_url_local() {
1580 let temp_dir = TempDir::new().unwrap();
1581 let path = temp_dir.path().to_str().unwrap();
1582
1583 let url = path_to_url(path).unwrap();
1584 assert!(url.scheme() == "file");
1585 }
1586
1587 #[test]
1588 fn test_path_to_url_s3() {
1589 let url = path_to_url("s3://my-bucket/path/to/table").unwrap();
1590 assert_eq!(url.scheme(), "s3");
1591 assert_eq!(url.host_str(), Some("my-bucket"));
1592 }
1593
1594 #[test]
1595 fn test_path_to_url_azure() {
1596 let url = path_to_url("az://my-container/path/to/table").unwrap();
1597 assert_eq!(url.scheme(), "az");
1598 }
1599
1600 #[test]
1601 fn test_path_to_url_gcs() {
1602 let url = path_to_url("gs://my-bucket/path/to/table").unwrap();
1603 assert_eq!(url.scheme(), "gs");
1604 }
1605
1606 #[tokio::test]
1609 async fn test_get_latest_version() {
1610 let temp_dir = TempDir::new().unwrap();
1611 let table_path = temp_dir.path().to_str().unwrap();
1612
1613 let schema = test_schema();
1614 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1615 .await
1616 .unwrap();
1617
1618 let v = get_latest_version(&mut table).await.unwrap();
1620 assert_eq!(v, 0);
1621
1622 let batch = test_batch(10);
1624 let (returned_table, version) = write_batches(
1625 table,
1626 vec![batch],
1627 "writer",
1628 1,
1629 SaveMode::Append,
1630 None,
1631 false,
1632 None,
1633 false,
1634 None,
1635 )
1636 .await
1637 .unwrap();
1638 assert_eq!(version, 1);
1639 table = returned_table;
1640
1641 let v = get_latest_version(&mut table).await.unwrap();
1642 assert_eq!(v, 1);
1643 }
1644
1645 #[tokio::test]
1646 async fn test_read_batches_at_version() {
1647 let temp_dir = TempDir::new().unwrap();
1648 let table_path = temp_dir.path().to_str().unwrap();
1649
1650 let schema = test_schema();
1651 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1652 .await
1653 .unwrap();
1654
1655 let batch = test_batch(50);
1657 let (table, _) = write_batches(
1658 table,
1659 vec![batch],
1660 "writer",
1661 1,
1662 SaveMode::Append,
1663 None,
1664 false,
1665 None,
1666 false,
1667 None,
1668 )
1669 .await
1670 .unwrap();
1671
1672 let batch = test_batch(30);
1674 let (_table, _) = write_batches(
1675 table,
1676 vec![batch],
1677 "writer",
1678 2,
1679 SaveMode::Append,
1680 None,
1681 false,
1682 None,
1683 false,
1684 None,
1685 )
1686 .await
1687 .unwrap();
1688
1689 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1691 .await
1692 .unwrap();
1693 let (batches, _) = read_batches_at_version(&mut read_table, 1, 10000)
1694 .await
1695 .unwrap();
1696 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1697 assert_eq!(total_rows, 50);
1698
1699 let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1701 .await
1702 .unwrap();
1703 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1704 assert_eq!(total_rows, 80);
1705 }
1706
1707 #[tokio::test]
1708 async fn test_sink_source_roundtrip() {
1709 use super::super::delta::DeltaLakeSink;
1710 use super::super::delta_config::DeltaLakeSinkConfig;
1711 use super::super::delta_source::DeltaSource;
1712 use super::super::delta_source_config::DeltaSourceConfig;
1713 use crate::config::ConnectorConfig;
1714 use crate::connector::{SinkConnector, SourceConnector};
1715
1716 let temp_dir = TempDir::new().unwrap();
1717 let table_path = temp_dir.path().to_str().unwrap();
1718
1719 let sink_config = DeltaLakeSinkConfig::new(table_path);
1721 let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1722 let connector_config = ConnectorConfig::new("delta-lake");
1723 sink.open(&connector_config).await.unwrap();
1724
1725 sink.begin_epoch(1).await.unwrap();
1726 let batch = test_batch(25);
1727 sink.write_batch(&batch).await.unwrap();
1728 sink.pre_commit(1).await.unwrap();
1729 sink.commit_epoch(1).await.unwrap();
1730 sink.close().await.unwrap();
1731
1732 let mut source_config = DeltaSourceConfig::new(table_path);
1734 source_config.starting_version = Some(0);
1735 let mut source = DeltaSource::new(source_config, None);
1736 let source_connector_config = ConnectorConfig::new("delta-lake");
1737 source.open(&source_connector_config).await.unwrap();
1738
1739 let result = source.poll_batch(10000).await.unwrap();
1741 assert!(result.is_some(), "should have received a batch");
1742 let total_rows: usize = {
1743 let mut rows = result.unwrap().records.num_rows();
1744 while let Ok(Some(batch)) = source.poll_batch(10000).await {
1746 rows += batch.records.num_rows();
1747 }
1748 rows
1749 };
1750 assert_eq!(total_rows, 25);
1751
1752 source.close().await.unwrap();
1753 }
1754
1755 #[tokio::test]
1756 async fn test_source_checkpoint_restore() {
1757 use super::super::delta_source::DeltaSource;
1758 use super::super::delta_source_config::DeltaSourceConfig;
1759 use crate::config::ConnectorConfig;
1760 use crate::connector::SourceConnector;
1761
1762 let temp_dir = TempDir::new().unwrap();
1763 let table_path = temp_dir.path().to_str().unwrap();
1764
1765 let schema = test_schema();
1767 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1768 .await
1769 .unwrap();
1770
1771 let (table, _) = write_batches(
1772 table,
1773 vec![test_batch(10)],
1774 "writer",
1775 1,
1776 SaveMode::Append,
1777 None,
1778 false,
1779 None,
1780 false,
1781 None,
1782 )
1783 .await
1784 .unwrap();
1785 let (_table, _) = write_batches(
1786 table,
1787 vec![test_batch(20)],
1788 "writer",
1789 2,
1790 SaveMode::Append,
1791 None,
1792 false,
1793 None,
1794 false,
1795 None,
1796 )
1797 .await
1798 .unwrap();
1799
1800 let mut source_config = DeltaSourceConfig::new(table_path);
1803 source_config.starting_version = Some(0);
1804 let mut source = DeltaSource::new(source_config.clone(), None);
1805 let connector_config = ConnectorConfig::new("delta-lake");
1806 source.open(&connector_config).await.unwrap();
1807
1808 let _ = source.poll_batch(10000).await.unwrap();
1810 while let Ok(Some(_)) = source.poll_batch(10000).await {}
1812
1813 let cp = source.checkpoint();
1815 assert_eq!(cp.get_offset("delta_version"), Some("2"));
1816 source.close().await.unwrap();
1817
1818 let mut source2 = DeltaSource::new(source_config, None);
1820 source2.open(&connector_config).await.unwrap();
1821 source2.restore(&cp).await.unwrap();
1822
1823 assert_eq!(source2.current_version(), 2);
1824
1825 let result = source2.poll_batch(10000).await.unwrap();
1827 assert!(result.is_none());
1828
1829 source2.close().await.unwrap();
1830 }
1831
1832 #[tokio::test]
1833 async fn test_auto_flush_writes_data() {
1834 use super::super::delta::DeltaLakeSink;
1835 use super::super::delta_config::DeltaLakeSinkConfig;
1836 use crate::config::ConnectorConfig;
1837 use crate::connector::SinkConnector;
1838
1839 let temp_dir = TempDir::new().unwrap();
1840 let table_path = temp_dir.path().to_str().unwrap();
1841
1842 let mut sink_config = DeltaLakeSinkConfig::new(table_path);
1844 sink_config.max_buffer_records = 10;
1845 let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1846
1847 let connector_config = ConnectorConfig::new("delta-lake");
1848 sink.open(&connector_config).await.unwrap();
1849
1850 sink.begin_epoch(1).await.unwrap();
1851
1852 let batch = test_batch(25);
1854 sink.write_batch(&batch).await.unwrap();
1855
1856 sink.pre_commit(1).await.unwrap();
1858 sink.commit_epoch(1).await.unwrap();
1859 sink.close().await.unwrap();
1860
1861 let mut table = open_or_create_table(table_path, HashMap::new(), None)
1863 .await
1864 .unwrap();
1865 let latest = get_latest_version(&mut table).await.unwrap();
1866 assert!(latest >= 1, "should have at least 1 version");
1867
1868 let (batches, _) = read_batches_at_version(&mut table, latest, 10000)
1869 .await
1870 .unwrap();
1871 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1872 assert_eq!(
1873 total_rows, 25,
1874 "all 25 rows should be written, not dropped by auto-flush"
1875 );
1876 }
1877
1878 #[tokio::test]
1879 async fn test_sink_exactly_once_epoch() {
1880 let temp_dir = TempDir::new().unwrap();
1881 let table_path = temp_dir.path().to_str().unwrap();
1882 let writer_id = "exactly-once-test";
1883
1884 let schema = test_schema();
1885 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1886 .await
1887 .unwrap();
1888
1889 let (table, v1) = write_batches(
1891 table,
1892 vec![test_batch(10)],
1893 writer_id,
1894 1,
1895 SaveMode::Append,
1896 None,
1897 false,
1898 None,
1899 false,
1900 None,
1901 )
1902 .await
1903 .unwrap();
1904 assert_eq!(v1, 1);
1905
1906 let (table, v2) = write_batches(
1908 table,
1909 vec![test_batch(15)],
1910 writer_id,
1911 2,
1912 SaveMode::Append,
1913 None,
1914 false,
1915 None,
1916 false,
1917 None,
1918 )
1919 .await
1920 .unwrap();
1921 assert_eq!(v2, 2);
1922
1923 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1925 assert_eq!(last_epoch, 2);
1926
1927 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1929 .await
1930 .unwrap();
1931 let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1932 .await
1933 .unwrap();
1934 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1935 assert_eq!(total_rows, 25);
1936 }
1937
1938 #[tokio::test]
1939 async fn test_schema_evolution_adds_column() {
1940 let temp_dir = TempDir::new().unwrap();
1941 let table_path = temp_dir.path().to_str().unwrap();
1942
1943 let schema_v1 = Arc::new(Schema::new(vec![
1945 Field::new("id", DataType::Int64, false),
1946 Field::new("name", DataType::Utf8, true),
1947 ]));
1948 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema_v1))
1949 .await
1950 .unwrap();
1951
1952 let batch_v1 = RecordBatch::try_new(
1954 schema_v1.clone(),
1955 vec![
1956 Arc::new(Int64Array::from(vec![1, 2])),
1957 Arc::new(StringArray::from(vec!["a", "b"])),
1958 ],
1959 )
1960 .unwrap();
1961 let (table, _) = write_batches(
1962 table,
1963 vec![batch_v1],
1964 "evo-writer",
1965 1,
1966 SaveMode::Append,
1967 None,
1968 true, None,
1970 false,
1971 None,
1972 )
1973 .await
1974 .unwrap();
1975
1976 let schema_v2 = Arc::new(Schema::new(vec![
1978 Field::new("id", DataType::Int64, false),
1979 Field::new("name", DataType::Utf8, true),
1980 Field::new("score", DataType::Float64, true),
1981 ]));
1982 let batch_v2 = RecordBatch::try_new(
1983 schema_v2,
1984 vec![
1985 Arc::new(Int64Array::from(vec![3])),
1986 Arc::new(StringArray::from(vec!["c"])),
1987 Arc::new(Float64Array::from(vec![99.5])),
1988 ],
1989 )
1990 .unwrap();
1991 let (table, _) = write_batches(
1992 table,
1993 vec![batch_v2],
1994 "evo-writer",
1995 2,
1996 SaveMode::Append,
1997 None,
1998 true,
1999 None,
2000 false,
2001 None,
2002 )
2003 .await
2004 .unwrap();
2005
2006 let final_schema = get_table_schema(&table).unwrap();
2008 assert_eq!(final_schema.fields().len(), 3);
2009 assert_eq!(final_schema.field(0).name(), "id");
2010 assert_eq!(final_schema.field(1).name(), "name");
2011 assert_eq!(final_schema.field(2).name(), "score");
2012
2013 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
2015 .await
2016 .unwrap();
2017 let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
2018 .await
2019 .unwrap();
2020 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2021 assert_eq!(total_rows, 3);
2022 }
2023
2024 #[tokio::test]
2025 async fn test_compaction_reduces_files() {
2026 let temp_dir = TempDir::new().unwrap();
2027 let table_path = temp_dir.path().to_str().unwrap();
2028
2029 let schema = test_schema();
2030 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
2031 .await
2032 .unwrap();
2033
2034 for epoch in 1..=10u64 {
2036 let batch = test_batch(5);
2037 let (t, _) = write_batches(
2038 table,
2039 vec![batch],
2040 "compaction-writer",
2041 epoch,
2042 SaveMode::Append,
2043 None,
2044 false,
2045 None,
2046 false,
2047 None,
2048 )
2049 .await
2050 .unwrap();
2051 table = t;
2052 }
2053
2054 let parquet_before: Vec<_> = std::fs::read_dir(temp_dir.path())
2056 .unwrap()
2057 .filter_map(Result::ok)
2058 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2059 .collect();
2060 assert!(
2061 parquet_before.len() >= 10,
2062 "should have at least 10 Parquet files before compaction, got {}",
2063 parquet_before.len()
2064 );
2065
2066 let (table, result) = run_compaction(table, 128 * 1024 * 1024, &[], None)
2068 .await
2069 .unwrap();
2070 assert!(
2071 result.files_removed > 0,
2072 "compaction should have removed files"
2073 );
2074
2075 drop(table);
2079 }
2080}