1#[cfg(feature = "delta-lake")]
25use std::collections::HashMap;
26
27#[cfg(feature = "delta-lake")]
28use std::sync::Arc;
29
30#[cfg(feature = "delta-lake")]
31use arrow_array::RecordBatch;
32
33#[cfg(feature = "delta-lake")]
34use arrow_schema::SchemaRef;
35
36#[cfg(feature = "delta-lake")]
38use deltalake::kernel::engine::arrow_conversion::TryIntoKernel as _;
39
40#[cfg(feature = "delta-lake")]
41use deltalake::kernel::transaction::CommitProperties;
42
43#[cfg(feature = "delta-lake")]
44use deltalake::kernel::Transaction;
45
46#[cfg(feature = "delta-lake")]
47use deltalake::operations::write::SchemaMode;
48
49#[cfg(feature = "delta-lake")]
50use deltalake::protocol::SaveMode;
51
52#[cfg(feature = "delta-lake")]
53use deltalake::DeltaTable;
54
55#[cfg(feature = "delta-lake")]
56use tracing::{debug, info, warn};
57
58#[cfg(feature = "delta-lake")]
59use url::Url;
60
61#[cfg(feature = "delta-lake")]
62use crate::error::ConnectorError;
63
64#[cfg(feature = "delta-lake")]
66fn path_to_url(path: &str) -> Result<Url, ConnectorError> {
67 if path.contains("://") {
69 Url::parse(path)
70 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid URL '{path}': {e}")))
71 } else {
72 let path_buf = std::path::Path::new(path);
75 let normalized = if path_buf.exists() {
76 std::fs::canonicalize(path_buf).map_err(|e| {
77 ConnectorError::ConfigurationError(format!("invalid path '{path}': {e}"))
78 })?
79 } else {
80 if path_buf.is_absolute() {
83 path_buf.to_path_buf()
84 } else {
85 std::env::current_dir()
86 .map_err(|e| {
87 ConnectorError::ConfigurationError(format!("cannot get current dir: {e}"))
88 })?
89 .join(path_buf)
90 }
91 };
92
93 Url::from_directory_path(&normalized).map_err(|()| {
94 ConnectorError::ConfigurationError(format!(
95 "cannot convert path to URL: {}",
96 normalized.display()
97 ))
98 })
99 }
100}
101
102#[cfg(feature = "delta-lake")]
118#[allow(clippy::implicit_hasher)]
119pub async fn open_or_create_table(
120 table_path: &str,
121 storage_options: HashMap<String, String>,
122 schema: Option<&SchemaRef>,
123) -> Result<DeltaTable, ConnectorError> {
124 info!(table_path, "opening Delta Lake table");
125
126 let url = path_to_url(table_path)?;
127
128 let table = DeltaTable::try_from_url_with_storage_options(url, storage_options)
132 .await
133 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to open table: {e}")))?;
134
135 if table.version().is_some() {
137 info!(
138 table_path,
139 version = table.version(),
140 "opened existing Delta Lake table"
141 );
142 return Ok(table);
143 }
144
145 let Some(schema) = schema else {
147 info!(
148 table_path,
149 "table does not exist yet; will create on first write"
150 );
151 return Ok(table);
152 };
153
154 info!(table_path, "creating new Delta Lake table");
155
156 let delta_schema: deltalake::kernel::StructType = schema
158 .as_ref()
159 .try_into_kernel()
160 .map_err(|e| ConnectorError::SchemaMismatch(format!("schema conversion failed: {e}")))?;
161
162 let table = table
164 .create()
165 .with_columns(delta_schema.fields().cloned())
166 .await
167 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to create table: {e}")))?;
168
169 info!(
170 table_path,
171 version = table.version(),
172 "created new Delta Lake table"
173 );
174
175 Ok(table)
176}
177
178#[cfg(feature = "delta-lake")]
198#[allow(clippy::too_many_arguments)]
199pub async fn write_batches(
200 table: DeltaTable,
201 batches: Vec<RecordBatch>,
202 writer_id: &str,
203 epoch: u64,
204 save_mode: SaveMode,
205 partition_columns: Option<&[String]>,
206 schema_evolution: bool,
207 target_file_size: Option<usize>,
208 create_checkpoint: bool,
209 writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
210) -> Result<(DeltaTable, i64), ConnectorError> {
211 if batches.is_empty() {
212 debug!("no batches to write, skipping");
213 let version = table.version().unwrap_or(0);
214 return Ok((table, version));
215 }
216
217 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
218
219 debug!(
220 writer_id,
221 epoch,
222 total_rows,
223 num_batches = batches.len(),
224 "writing batches to Delta Lake"
225 );
226
227 #[allow(clippy::cast_possible_wrap)]
231 let epoch_i64 = epoch as i64;
232
233 let mut write_builder = table
234 .write(batches)
235 .with_save_mode(save_mode)
236 .with_commit_properties(
237 CommitProperties::default()
238 .with_application_transaction(Transaction::new(writer_id, epoch_i64))
239 .with_create_checkpoint(create_checkpoint),
240 );
241
242 if let Some(size) = target_file_size {
245 write_builder = write_builder.with_target_file_size(size);
246 }
247
248 if schema_evolution {
250 write_builder = write_builder.with_schema_mode(SchemaMode::Merge);
251 }
252
253 if let Some(cols) = partition_columns {
255 if !cols.is_empty() {
256 write_builder = write_builder.with_partition_columns(cols.to_vec());
257 }
258 }
259
260 if let Some(props) = writer_properties {
261 write_builder = write_builder.with_writer_properties(props);
262 }
263
264 let table = write_builder
266 .await
267 .map_err(|e| ConnectorError::WriteError(format!("Delta Lake write failed: {e}")))?;
268
269 let version = table.version().unwrap_or(0);
270
271 info!(
272 writer_id,
273 epoch, version, total_rows, "committed Delta Lake transaction"
274 );
275
276 Ok((table, version))
277}
278
279#[cfg(feature = "delta-lake")]
293pub async fn get_last_committed_epoch(table: &DeltaTable, writer_id: &str) -> u64 {
294 let Ok(snapshot) = table.snapshot() else {
296 debug!(writer_id, "no snapshot available, assuming epoch 0");
297 return 0;
298 };
299
300 match snapshot
301 .transaction_version(&table.log_store(), writer_id)
302 .await
303 {
304 Ok(Some(version)) => {
305 #[allow(clippy::cast_sign_loss)]
308 let epoch = version as u64;
309 debug!(
310 writer_id,
311 epoch, "found last committed epoch from txn metadata"
312 );
313 epoch
314 }
315 Ok(None) => {
316 debug!(
317 writer_id,
318 "no txn metadata found for writer, assuming epoch 0"
319 );
320 0
321 }
322 Err(e) => {
323 warn!(writer_id, error = %e, "failed to read txn metadata, assuming epoch 0");
324 0
325 }
326 }
327}
328
329#[cfg(feature = "delta-lake")]
343pub fn get_table_schema(table: &DeltaTable) -> Result<SchemaRef, ConnectorError> {
344 let state = table
345 .snapshot()
346 .map_err(|e| ConnectorError::SchemaMismatch(format!("table has no snapshot: {e}")))?;
347
348 Ok(state.snapshot().arrow_schema())
350}
351
352#[cfg(feature = "delta-lake")]
358pub async fn get_latest_version(table: &mut DeltaTable) -> Result<i64, ConnectorError> {
359 let log_store = table.log_store();
360 let current = table.version().unwrap_or(0);
361 log_store
362 .get_latest_version(current)
363 .await
364 .map_err(|e| ConnectorError::ReadError(format!("failed to get latest version: {e}")))
365}
366
367#[cfg(feature = "delta-lake")]
387pub async fn read_batches_at_version(
388 table: &mut DeltaTable,
389 version: i64,
390 max_records: usize,
391) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
392 use datafusion::prelude::SessionContext;
393 use tokio_stream::StreamExt;
394
395 table
397 .load_version(version)
398 .await
399 .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
400
401 debug!(version, "Delta Lake: loaded version for reading");
402
403 let provider =
405 table.table_provider().build().await.map_err(|e| {
406 ConnectorError::ReadError(format!("failed to build table provider: {e}"))
407 })?;
408
409 let ctx = SessionContext::new();
410 ctx.register_table("delta_source_scan", Arc::new(provider))
411 .map_err(|e| ConnectorError::ReadError(format!("failed to register scan table: {e}")))?;
412
413 let df = ctx
415 .sql("SELECT * FROM delta_source_scan")
416 .await
417 .map_err(|e| ConnectorError::ReadError(format!("scan query failed: {e}")))?;
418
419 let df = if max_records < usize::MAX {
420 df.limit(0, Some(max_records))
421 .map_err(|e| ConnectorError::ReadError(format!("limit failed: {e}")))?
422 } else {
423 df
424 };
425
426 let mut stream = df
428 .execute_stream()
429 .await
430 .map_err(|e| ConnectorError::ReadError(format!("stream execution failed: {e}")))?;
431
432 let mut batches = Vec::new();
433 let mut total_rows: usize = 0;
434
435 while let Some(result) = stream.next().await {
436 let batch =
437 result.map_err(|e| ConnectorError::ReadError(format!("stream batch failed: {e}")))?;
438 if batch.num_rows() == 0 {
439 continue;
440 }
441 total_rows += batch.num_rows();
442 batches.push(batch);
443
444 if total_rows >= max_records {
446 break;
447 }
448 }
449
450 let fully_consumed = if total_rows >= max_records {
454 stream.next().await.is_none()
455 } else {
456 true
457 };
458
459 debug!(
460 version,
461 num_batches = batches.len(),
462 total_rows,
463 fully_consumed,
464 "Delta Lake: scanned version"
465 );
466
467 Ok((batches, fully_consumed))
468}
469
470#[cfg(feature = "delta-lake")]
484#[allow(clippy::too_many_lines)]
485pub async fn read_version_diff(
486 table: &mut DeltaTable,
487 version: i64,
488 max_records: usize,
489 partition_filter: Option<&str>,
490) -> Result<(Vec<RecordBatch>, bool), ConnectorError> {
491 const MAX_DIRECT_READ_BYTES: u64 = 256 * 1024 * 1024;
494
495 if version <= 0 {
497 return read_batches_at_version(table, version, max_records).await;
498 }
499
500 let log_store = table.log_store();
503 let store = log_store.object_store(None);
504
505 let commit_data = log_store
506 .read_commit_entry(version)
507 .await
508 .map_err(|e| ConnectorError::ReadError(format!("read commit {version}: {e}")))?
509 .ok_or_else(|| {
510 ConnectorError::ReadError(format!(
511 "version {version} not available (cleaned up or never existed)"
512 ))
513 })?;
514 let commit_str = std::str::from_utf8(&commit_data)
515 .map_err(|e| ConnectorError::ReadError(format!("commit log is not valid UTF-8: {e}")))?;
516
517 let mut added_paths = Vec::new();
520 let mut removed_paths = std::collections::HashSet::new();
521 for line in commit_str.lines() {
522 let line = line.trim();
523 if line.is_empty() {
524 continue;
525 }
526 if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
527 if let Some(add) = obj.get("add") {
528 if let Some(path) = add.get("path").and_then(|p| p.as_str()) {
529 added_paths.push(decode_delta_path(path));
530 }
531 }
532 if let Some(remove) = obj.get("remove") {
533 if let Some(path) = remove.get("path").and_then(|p| p.as_str()) {
534 removed_paths.insert(decode_delta_path(path));
535 }
536 }
537 }
538 }
539
540 added_paths.retain(|p| !removed_paths.contains(p));
542
543 if added_paths.is_empty() {
544 debug!(
545 version,
546 num_removed = removed_paths.len(),
547 "Delta Lake: no net-new add actions in version"
548 );
549 return Ok((Vec::new(), true));
550 }
551
552 debug!(
553 version,
554 num_added_files = added_paths.len(),
555 num_removed_files = removed_paths.len(),
556 "Delta Lake: reading added files"
557 );
558
559 table
561 .load_version(version)
562 .await
563 .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
564
565 let table_schema = table
566 .snapshot()
567 .map(|s| s.snapshot().arrow_schema())
568 .map_err(|e| ConnectorError::ReadError(format!("no snapshot at version {version}: {e}")))?;
569
570 let added_paths = if let Some(filter) = partition_filter {
573 filter_paths_by_partition(&added_paths, filter)
574 } else {
575 added_paths
576 };
577
578 let mut batches = Vec::new();
582 let mut total_rows: usize = 0;
583
584 for file_path in &added_paths {
585 if total_rows >= max_records {
586 break;
587 }
588
589 let obj_path = deltalake::Path::from(file_path.as_str());
590
591 let file_meta = store
594 .head(&obj_path)
595 .await
596 .map_err(|e| ConnectorError::ReadError(format!("failed to stat '{file_path}': {e}")))?;
597 if file_meta.size > MAX_DIRECT_READ_BYTES {
598 warn!(
599 file_path,
600 file_size = file_meta.size,
601 "file too large for direct read, falling back to DataFusion scan"
602 );
603 return read_batches_at_version(table, version, max_records).await;
604 }
605
606 let file_bytes = get_with_retry(&store, &obj_path, file_path).await?;
607
608 let parquet_reader =
609 deltalake::parquet::arrow::arrow_reader::ArrowReaderBuilder::try_new(file_bytes)
610 .map_err(|e| {
611 ConnectorError::ReadError(format!(
612 "failed to open Parquet file '{file_path}': {e}"
613 ))
614 })?;
615
616 let remaining = max_records.saturating_sub(total_rows).saturating_add(1);
618 let reader = parquet_reader.with_limit(remaining).build().map_err(|e| {
619 ConnectorError::ReadError(format!("failed to build reader for '{file_path}': {e}"))
620 })?;
621
622 for result in reader {
623 let batch: RecordBatch = result.map_err(|e| {
624 ConnectorError::ReadError(format!("Parquet read error in '{file_path}': {e}"))
625 })?;
626 if batch.num_rows() == 0 {
627 continue;
628 }
629
630 let batch = if batch.schema() == table_schema {
633 batch
634 } else {
635 align_batch_to_schema(&batch, &table_schema)?
636 };
637
638 total_rows += batch.num_rows();
639 batches.push(batch);
640
641 if total_rows >= max_records {
642 break;
643 }
644 }
645 }
646
647 let fully_consumed = total_rows <= max_records;
650 if !fully_consumed {
651 let excess = total_rows - max_records;
653 let len = batches.len();
654 if len > 0 {
655 let last = &batches[len - 1];
656 if last.num_rows() > excess {
657 batches[len - 1] = last.slice(0, last.num_rows() - excess);
658 } else {
659 batches.pop();
660 }
661 }
662 }
663
664 debug!(
665 version,
666 num_batches = batches.len(),
667 fully_consumed,
668 num_added_files = added_paths.len(),
669 "Delta Lake: read version diff"
670 );
671
672 Ok((batches, fully_consumed))
673}
674
675#[cfg(feature = "delta-lake")]
678async fn get_with_retry(
679 store: &Arc<dyn deltalake::ObjectStore>,
680 path: &deltalake::Path,
681 display_path: &str,
682) -> Result<bytes::Bytes, ConnectorError> {
683 let backoff = [200u64, 1000, 4000];
684 let mut last_err = None;
685
686 for attempt in 0..=backoff.len() {
687 match store.get(path).await {
688 Ok(result) => {
689 return result.bytes().await.map_err(|e| {
690 ConnectorError::ReadError(format!(
691 "failed to read bytes of '{display_path}': {e}"
692 ))
693 });
694 }
695 Err(e) => {
696 let msg = e.to_string();
697 if msg.contains("not found") || msg.contains("404") {
698 return Err(ConnectorError::ReadError(format!(
699 "file not found '{display_path}': {e}"
700 )));
701 }
702 if let Some(&delay) = backoff.get(attempt) {
703 warn!(
704 attempt = attempt + 1,
705 delay_ms = delay,
706 error = %e,
707 path = display_path,
708 "object_store read failed, retrying"
709 );
710 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
711 }
712 last_err = Some(e);
713 }
714 }
715 }
716
717 Err(ConnectorError::ReadError(format!(
718 "failed to read '{display_path}' after {} attempts: {}",
719 backoff.len() + 1,
720 last_err.map_or_else(|| "unknown".to_string(), |e| e.to_string())
721 )))
722}
723
724#[cfg(feature = "delta-lake")]
731fn filter_paths_by_partition(paths: &[String], filter: &str) -> Vec<String> {
732 let mut required_segments: Vec<String> = Vec::new();
734 for clause in filter
735 .split_whitespace()
736 .collect::<Vec<_>>()
737 .join(" ")
738 .split(" AND ")
739 {
740 let clause = clause.trim();
741 if let Some((col, val)) = clause.split_once('=') {
742 let col = col.trim();
743 let val = val.trim().trim_matches('\'').trim_matches('"');
744 if !col.is_empty() && !val.is_empty() {
745 required_segments.push(format!("{col}={val}"));
746 }
747 }
748 }
749
750 if required_segments.is_empty() {
751 return paths.to_vec();
752 }
753
754 paths
755 .iter()
756 .filter(|path| required_segments.iter().all(|seg| path.contains(seg)))
757 .cloned()
758 .collect()
759}
760
761#[cfg(feature = "delta-lake")]
766fn decode_delta_path(encoded: &str) -> String {
767 url::Url::parse(&format!("file:///{encoded}")).map_or_else(
768 |_| encoded.to_string(),
769 |u| {
770 let p = u.path();
771 p.strip_prefix('/').unwrap_or(p).to_string()
772 },
773 )
774}
775
776#[cfg(feature = "delta-lake")]
780fn align_batch_to_schema(
781 batch: &RecordBatch,
782 target_schema: &SchemaRef,
783) -> Result<RecordBatch, ConnectorError> {
784 use arrow_array::new_null_array;
785
786 let mut columns = Vec::with_capacity(target_schema.fields().len());
787 for field in target_schema.fields() {
788 if let Ok(col_idx) = batch.schema().index_of(field.name()) {
789 columns.push(batch.column(col_idx).clone());
790 } else {
791 columns.push(new_null_array(field.data_type(), batch.num_rows()));
792 }
793 }
794
795 RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| {
796 ConnectorError::ReadError(format!("failed to align batch to table schema: {e}"))
797 })
798}
799
800#[cfg(feature = "delta-lake")]
809pub async fn read_cdf_batches(
810 table: DeltaTable,
811 start_version: i64,
812 end_version: i64,
813) -> Result<Vec<RecordBatch>, ConnectorError> {
814 use datafusion::prelude::SessionContext;
815 use tokio_stream::StreamExt;
816
817 debug!(start_version, end_version, "reading CDF batches");
818
819 let ctx = SessionContext::new();
820
821 let session_state = ctx.state();
823
824 let cdf_builder = table
825 .scan_cdf()
826 .with_starting_version(start_version)
827 .with_ending_version(end_version);
828
829 let plan = cdf_builder
830 .build(&session_state, None)
831 .await
832 .map_err(|e| ConnectorError::ReadError(format!("CDF scan build failed: {e}")))?;
833
834 let task_ctx = ctx.task_ctx();
836 let mut stream = datafusion::physical_plan::execute_stream(plan, task_ctx)
837 .map_err(|e| ConnectorError::ReadError(format!("CDF stream execution failed: {e}")))?;
838
839 let mut batches = Vec::new();
840 while let Some(result) = stream.next().await {
841 let batch: RecordBatch = result
842 .map_err(|e| ConnectorError::ReadError(format!("CDF stream batch failed: {e}")))?;
843 if batch.num_rows() > 0 {
844 batches.push(batch);
845 }
846 }
847
848 debug!(
849 start_version,
850 end_version,
851 num_batches = batches.len(),
852 "CDF scan complete"
853 );
854
855 Ok(batches)
856}
857
858#[cfg(feature = "delta-lake")]
866pub fn map_cdf_to_changelog(batch: &RecordBatch) -> Result<Option<RecordBatch>, ConnectorError> {
867 use arrow_array::StringArray;
868
869 let schema = batch.schema();
870 let Ok(ct_idx) = schema.index_of("_change_type") else {
871 return Ok(Some(batch.clone()));
872 };
873
874 let change_type = batch
875 .column(ct_idx)
876 .as_any()
877 .downcast_ref::<StringArray>()
878 .ok_or_else(|| ConnectorError::ReadError("_change_type is not Utf8".into()))?;
879
880 let (keep, ops): (Vec<bool>, Vec<Option<&str>>) = (0..batch.num_rows())
882 .map(|i| match change_type.value(i) {
883 "update_postimage" => (true, Some("U")),
884 "delete" => (true, Some("D")),
885 "update_preimage" => (false, Some("")),
886 _ => (true, Some("I")), })
888 .unzip();
889
890 let filter = arrow_array::BooleanArray::from(keep);
891 let filtered = arrow_select::filter::filter_record_batch(batch, &filter)
892 .map_err(|e| ConnectorError::ReadError(format!("CDF filter failed: {e}")))?;
893 if filtered.num_rows() == 0 {
894 return Ok(None);
895 }
896
897 let op_arr: StringArray = ops.into_iter().collect();
899 let op_filtered = arrow_select::filter::filter(&op_arr, &filter)
900 .map_err(|e| ConnectorError::ReadError(format!("CDF op filter: {e}")))?;
901
902 let cdf_meta = ["_change_type", "_commit_version", "_commit_timestamp"];
904 let mut fields = Vec::new();
905 let mut columns: Vec<Arc<dyn arrow_array::Array>> = Vec::new();
906 for (i, field) in filtered.schema().fields().iter().enumerate() {
907 if !cdf_meta.contains(&field.name().as_str()) {
908 fields.push(field.clone());
909 columns.push(filtered.column(i).clone());
910 }
911 }
912 fields.push(Arc::new(arrow_schema::Field::new(
913 "_op",
914 arrow_schema::DataType::Utf8,
915 false,
916 )));
917 columns.push(op_filtered);
918
919 RecordBatch::try_new(Arc::new(arrow_schema::Schema::new(fields)), columns)
920 .map(Some)
921 .map_err(|e| ConnectorError::ReadError(format!("CDF batch rebuild: {e}")))
922}
923
924#[cfg(feature = "delta-lake")]
926#[derive(Debug)]
927pub struct MergeResult {
928 pub rows_inserted: usize,
930 pub rows_updated: usize,
932 pub rows_deleted: usize,
934}
935
936#[cfg(feature = "delta-lake")]
949#[allow(clippy::too_many_lines)]
950#[allow(clippy::too_many_arguments)]
951pub async fn merge_changelog(
952 table: DeltaTable,
953 source_batch: RecordBatch,
954 key_columns: &[String],
955 writer_id: &str,
956 epoch: u64,
957 schema_evolution: bool,
958 writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
959 ctx: &datafusion::prelude::SessionContext,
960) -> Result<(DeltaTable, MergeResult), ConnectorError> {
961 use datafusion::prelude::*;
962 use deltalake::kernel::transaction::CommitProperties;
963 use deltalake::kernel::Transaction;
964
965 const CDC_COLUMNS: &[&str] = &["_op", "_ts_ms"];
966
967 if source_batch.num_rows() == 0 {
968 return Ok((
969 table,
970 MergeResult {
971 rows_inserted: 0,
972 rows_updated: 0,
973 rows_deleted: 0,
974 },
975 ));
976 }
977
978 debug!(
979 key_columns = ?key_columns,
980 source_rows = source_batch.num_rows(),
981 "performing atomic changelog MERGE"
982 );
983
984 let source_df = ctx.read_batch(source_batch).map_err(|e| {
985 ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
986 })?;
987
988 let predicate = key_columns
990 .iter()
991 .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
992 .reduce(Expr::and)
993 .ok_or_else(|| {
994 ConnectorError::ConfigurationError("merge requires at least one key column".into())
995 })?;
996
997 let source_schema = source_df.schema().clone();
998 let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
999
1000 let all_user_columns: Vec<String> = source_schema
1002 .fields()
1003 .iter()
1004 .map(|f| f.name().clone())
1005 .filter(|name| !CDC_COLUMNS.contains(&name.as_str()))
1006 .collect();
1007
1008 let non_key_user_columns: Vec<String> = all_user_columns
1009 .iter()
1010 .filter(|c| !key_set.contains(c.as_str()))
1011 .cloned()
1012 .collect();
1013
1014 let upsert_pred = col("source._op").in_list(vec![lit("I"), lit("U"), lit("r")], false);
1016 let delete_pred = col("source._op").eq(lit("D"));
1017
1018 #[allow(clippy::cast_possible_wrap)]
1019 let epoch_i64 = epoch as i64;
1020
1021 let non_key_for_update = non_key_user_columns;
1022 let all_for_insert = all_user_columns;
1023
1024 let mut merge_builder = table
1025 .merge(source_df, predicate)
1026 .with_source_alias("source")
1027 .with_target_alias("target")
1028 .with_commit_properties(
1029 CommitProperties::default()
1030 .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
1031 )
1032 .when_matched_update(|update| {
1033 let mut u = update.predicate(upsert_pred.clone());
1034 for col_name in &non_key_for_update {
1035 u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
1036 }
1037 u
1038 })
1039 .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
1040 .when_matched_delete(|delete| delete.predicate(delete_pred))
1041 .map_err(|e| ConnectorError::WriteError(format!("merge matched-delete failed: {e}")))?
1042 .when_not_matched_insert(|insert| {
1043 let mut ins = insert.predicate(upsert_pred);
1044 for col_name in &all_for_insert {
1045 ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
1046 }
1047 ins
1048 })
1049 .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
1050
1051 if schema_evolution {
1052 merge_builder = merge_builder.with_merge_schema(true);
1053 }
1054
1055 if let Some(props) = writer_properties {
1056 merge_builder = merge_builder.with_writer_properties(props);
1057 }
1058
1059 let (table, metrics) = merge_builder.await.map_err(|e| {
1060 ConnectorError::WriteError(format!("Delta Lake changelog MERGE failed: {e}"))
1061 })?;
1062
1063 let result = MergeResult {
1064 rows_inserted: metrics.num_target_rows_inserted,
1065 rows_updated: metrics.num_target_rows_updated,
1066 rows_deleted: metrics.num_target_rows_deleted,
1067 };
1068
1069 info!(
1070 writer_id,
1071 epoch,
1072 rows_inserted = result.rows_inserted,
1073 rows_updated = result.rows_updated,
1074 rows_deleted = result.rows_deleted,
1075 "Delta Lake changelog MERGE complete"
1076 );
1077
1078 Ok((table, result))
1079}
1080
1081#[cfg(feature = "delta-lake")]
1083#[derive(Debug)]
1084pub struct CompactionResult {
1085 pub files_added: u64,
1087 pub files_removed: u64,
1089 pub partitions_optimized: u64,
1091}
1092
1093#[cfg(feature = "delta-lake")]
1102pub async fn run_compaction(
1103 table: DeltaTable,
1104 target_file_size: u64,
1105 z_order_columns: &[String],
1106 writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
1107) -> Result<(DeltaTable, CompactionResult), ConnectorError> {
1108 use deltalake::operations::optimize::OptimizeType;
1109
1110 info!(target_file_size, "running Delta Lake compaction (OPTIMIZE)");
1111
1112 let optimize_type = if z_order_columns.is_empty() {
1113 OptimizeType::Compact
1114 } else {
1115 OptimizeType::ZOrder(z_order_columns.to_vec())
1116 };
1117
1118 let mut optimize_builder = table
1119 .optimize()
1120 .with_type(optimize_type)
1121 .with_target_size(target_file_size);
1122
1123 if let Some(props) = writer_properties {
1124 optimize_builder = optimize_builder.with_writer_properties(props);
1125 }
1126
1127 let (table, metrics) = optimize_builder
1128 .await
1129 .map_err(|e| ConnectorError::Internal(format!("compaction failed: {e}")))?;
1130
1131 let result = CompactionResult {
1132 files_added: metrics.num_files_added,
1133 files_removed: metrics.num_files_removed,
1134 partitions_optimized: metrics.partitions_optimized,
1135 };
1136
1137 info!(
1138 files_added = result.files_added,
1139 files_removed = result.files_removed,
1140 partitions_optimized = result.partitions_optimized,
1141 "Delta Lake compaction complete"
1142 );
1143
1144 Ok((table, result))
1145}
1146
1147#[cfg(feature = "delta-lake")]
1153pub async fn run_vacuum(
1154 table: DeltaTable,
1155 retention: std::time::Duration,
1156) -> Result<(DeltaTable, usize), ConnectorError> {
1157 let retention_hours = retention.as_secs() / 3600;
1158 info!(retention_hours, "running Delta Lake VACUUM");
1159
1160 let chrono_duration =
1161 chrono::Duration::from_std(retention).unwrap_or_else(|_| chrono::Duration::hours(168)); let (table, metrics) = table
1164 .vacuum()
1165 .with_retention_period(chrono_duration)
1166 .await
1167 .map_err(|e| ConnectorError::Internal(format!("vacuum failed: {e}")))?;
1168
1169 let files_deleted = metrics.files_deleted.len();
1170
1171 info!(files_deleted, "Delta Lake VACUUM complete");
1172
1173 Ok((table, files_deleted))
1174}
1175
1176#[cfg(feature = "delta-lake")]
1186#[allow(clippy::implicit_hasher, clippy::unused_async)]
1187pub async fn resolve_catalog_options(
1188 catalog: &super::delta_config::DeltaCatalogType,
1189 #[allow(unused_variables)] catalog_database: Option<&str>,
1190 #[allow(unused_variables)] catalog_name: Option<&str>,
1191 _catalog_schema: Option<&str>,
1192 table_path: &str,
1193 base_storage_options: &HashMap<String, String>,
1194) -> Result<(String, HashMap<String, String>), ConnectorError> {
1195 use super::delta_config::DeltaCatalogType;
1196
1197 match catalog {
1198 DeltaCatalogType::None => Ok((table_path.to_string(), base_storage_options.clone())),
1199 #[cfg(feature = "delta-lake-glue")]
1200 DeltaCatalogType::Glue => {
1201 use deltalake::DataCatalog;
1202 let database = catalog_database.ok_or_else(|| {
1203 ConnectorError::ConfigurationError(
1204 "Glue catalog requires 'catalog.database'".into(),
1205 )
1206 })?;
1207 let glue = deltalake_catalog_glue::GlueDataCatalog::from_env()
1208 .await
1209 .map_err(|e| {
1210 ConnectorError::ConnectionFailed(format!("failed to init Glue catalog: {e}"))
1211 })?;
1212 let resolved = glue
1213 .get_table_storage_location(catalog_name.map(String::from), database, table_path)
1214 .await
1215 .map_err(|e| {
1216 ConnectorError::ConnectionFailed(format!(
1217 "Glue catalog lookup failed for '{database}.{table_path}': {e}"
1218 ))
1219 })?;
1220 info!(
1221 glue_database = database,
1222 table = table_path,
1223 resolved_path = %resolved,
1224 "resolved table path via Glue catalog"
1225 );
1226 Ok((resolved, base_storage_options.clone()))
1227 }
1228 #[cfg(not(feature = "delta-lake-glue"))]
1229 DeltaCatalogType::Glue => Err(ConnectorError::ConfigurationError(
1230 "Glue catalog requires the 'delta-lake-glue' feature. \
1231 Build with: cargo build --features delta-lake-glue"
1232 .into(),
1233 )),
1234 #[cfg(feature = "delta-lake-unity")]
1235 DeltaCatalogType::Unity {
1236 workspace_url,
1237 access_token,
1238 } => {
1239 let full_name = table_path.strip_prefix("uc://").unwrap_or(table_path);
1245
1246 let storage_location = super::unity_catalog::get_table_storage_location(
1247 workspace_url,
1248 access_token,
1249 full_name,
1250 )
1251 .await?;
1252
1253 Ok((storage_location, base_storage_options.clone()))
1254 }
1255 #[cfg(not(feature = "delta-lake-unity"))]
1256 DeltaCatalogType::Unity { .. } => Err(ConnectorError::ConfigurationError(
1257 "Unity catalog requires the 'delta-lake-unity' feature. \
1258 Build with: cargo build --features delta-lake-unity"
1259 .into(),
1260 )),
1261 }
1262}
1263
1264#[cfg(all(test, feature = "delta-lake"))]
1269mod tests {
1270 use super::*;
1271 use arrow_array::{Float64Array, Int64Array, StringArray};
1272 use arrow_schema::{DataType, Field, Schema};
1273 use std::sync::Arc;
1274 use tempfile::TempDir;
1275
1276 fn test_schema() -> SchemaRef {
1277 Arc::new(Schema::new(vec![
1278 Field::new("id", DataType::Int64, false),
1279 Field::new("name", DataType::Utf8, true),
1280 Field::new("value", DataType::Float64, true),
1281 ]))
1282 }
1283
1284 fn test_batch(n: usize) -> RecordBatch {
1285 let ids: Vec<i64> = (0..n as i64).collect();
1286 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1287 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1288
1289 RecordBatch::try_new(
1290 test_schema(),
1291 vec![
1292 Arc::new(Int64Array::from(ids)),
1293 Arc::new(StringArray::from(names)),
1294 Arc::new(Float64Array::from(values)),
1295 ],
1296 )
1297 .unwrap()
1298 }
1299
1300 #[tokio::test]
1301 async fn test_open_creates_table() {
1302 let temp_dir = TempDir::new().unwrap();
1303 let table_path = temp_dir.path().to_str().unwrap();
1304
1305 let schema = test_schema();
1307 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1308 .await
1309 .unwrap();
1310
1311 assert_eq!(table.version(), Some(0));
1312
1313 let delta_log = temp_dir.path().join("_delta_log");
1315 assert!(delta_log.exists(), "_delta_log directory should exist");
1316 }
1317
1318 #[tokio::test]
1319 async fn test_open_existing_table() {
1320 let temp_dir = TempDir::new().unwrap();
1321 let table_path = temp_dir.path().to_str().unwrap();
1322
1323 let schema = test_schema();
1325 let _ = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1326 .await
1327 .unwrap();
1328
1329 let table = open_or_create_table(table_path, HashMap::new(), None)
1331 .await
1332 .unwrap();
1333
1334 assert_eq!(table.version(), Some(0));
1335 }
1336
1337 #[tokio::test]
1338 async fn test_open_nonexistent_without_schema_defers() {
1339 let temp_dir = TempDir::new().unwrap();
1340 let nonexistent_table = temp_dir.path().join("nonexistent");
1341 std::fs::create_dir_all(&nonexistent_table).unwrap();
1342 let table_path = nonexistent_table.to_str().unwrap();
1343
1344 let result = open_or_create_table(table_path, HashMap::new(), None).await;
1346 assert!(result.is_ok());
1347 let table = result.unwrap();
1348 assert!(table.version().is_none(), "table should be uninitialized");
1349 }
1350
1351 #[tokio::test]
1352 async fn test_write_batch_creates_parquet() {
1353 let temp_dir = TempDir::new().unwrap();
1354 let table_path = temp_dir.path().to_str().unwrap();
1355
1356 let schema = test_schema();
1358 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1359 .await
1360 .unwrap();
1361
1362 let batch = test_batch(100);
1364 let (table, version) = write_batches(
1365 table,
1366 vec![batch],
1367 "test-writer",
1368 1,
1369 SaveMode::Append,
1370 None,
1371 false,
1372 None,
1373 false,
1374 None,
1375 )
1376 .await
1377 .unwrap();
1378
1379 assert_eq!(version, 1);
1380 assert_eq!(table.version(), Some(1));
1381
1382 let parquet_files: Vec<_> = std::fs::read_dir(temp_dir.path())
1384 .unwrap()
1385 .filter_map(Result::ok)
1386 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1387 .collect();
1388
1389 assert!(
1390 !parquet_files.is_empty(),
1391 "should have created Parquet files"
1392 );
1393 }
1394
1395 #[tokio::test]
1396 async fn test_exactly_once_epoch_skip() {
1397 let temp_dir = TempDir::new().unwrap();
1398 let table_path = temp_dir.path().to_str().unwrap();
1399 let writer_id = "exactly-once-writer";
1400
1401 let schema = test_schema();
1403 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1404 .await
1405 .unwrap();
1406
1407 let batch = test_batch(10);
1408 let (table, _) = write_batches(
1409 table,
1410 vec![batch.clone()],
1411 writer_id,
1412 1,
1413 SaveMode::Append,
1414 None,
1415 false,
1416 None,
1417 false,
1418 None,
1419 )
1420 .await
1421 .unwrap();
1422
1423 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1425 assert_eq!(last_epoch, 1);
1426
1427 let reopened_table = open_or_create_table(table_path, HashMap::new(), None)
1429 .await
1430 .unwrap();
1431
1432 let recovered_epoch = get_last_committed_epoch(&reopened_table, writer_id).await;
1434 assert_eq!(recovered_epoch, 1);
1435 }
1436
1437 #[tokio::test]
1438 async fn test_multiple_epochs_sequential() {
1439 let temp_dir = TempDir::new().unwrap();
1440 let table_path = temp_dir.path().to_str().unwrap();
1441 let writer_id = "sequential-writer";
1442
1443 let schema = test_schema();
1444 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1445 .await
1446 .unwrap();
1447
1448 for epoch in 1..=3 {
1450 let batch = test_batch(10);
1451 let result = write_batches(
1452 table,
1453 vec![batch],
1454 writer_id,
1455 epoch,
1456 SaveMode::Append,
1457 None,
1458 false,
1459 None,
1460 false,
1461 None,
1462 )
1463 .await
1464 .unwrap();
1465 table = result.0;
1466 assert_eq!(result.1, epoch as i64);
1467 }
1468
1469 assert_eq!(table.version(), Some(3));
1471
1472 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1474 assert_eq!(last_epoch, 3);
1475 }
1476
1477 #[tokio::test]
1478 async fn test_get_table_schema() {
1479 let temp_dir = TempDir::new().unwrap();
1480 let table_path = temp_dir.path().to_str().unwrap();
1481
1482 let expected_schema = test_schema();
1483 let table = open_or_create_table(table_path, HashMap::new(), Some(&expected_schema))
1484 .await
1485 .unwrap();
1486
1487 let actual_schema = get_table_schema(&table).unwrap();
1488
1489 assert_eq!(actual_schema.fields().len(), expected_schema.fields().len());
1491 for (expected, actual) in expected_schema.fields().iter().zip(actual_schema.fields()) {
1492 assert_eq!(expected.name(), actual.name());
1493 }
1494 }
1495
1496 #[tokio::test]
1497 async fn test_write_empty_batches() {
1498 let temp_dir = TempDir::new().unwrap();
1499 let table_path = temp_dir.path().to_str().unwrap();
1500
1501 let schema = test_schema();
1502 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1503 .await
1504 .unwrap();
1505
1506 let (table, version) = write_batches(
1508 table,
1509 vec![],
1510 "test-writer",
1511 1,
1512 SaveMode::Append,
1513 None,
1514 false,
1515 None,
1516 false,
1517 None,
1518 )
1519 .await
1520 .unwrap();
1521
1522 assert_eq!(version, 0);
1524 assert_eq!(table.version(), Some(0));
1525 }
1526
1527 #[tokio::test]
1528 async fn test_write_multiple_batches() {
1529 let temp_dir = TempDir::new().unwrap();
1531 let table_path = temp_dir.path().to_str().unwrap();
1532
1533 let schema = test_schema();
1534 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1535 .await
1536 .unwrap();
1537
1538 let batch1 = test_batch(50);
1540 let batch2 = test_batch(50);
1541 let (table, version) = write_batches(
1542 table,
1543 vec![batch1, batch2],
1544 "multi-batch-writer",
1545 1,
1546 SaveMode::Append,
1547 None,
1548 false,
1549 None,
1550 false,
1551 None,
1552 )
1553 .await
1554 .unwrap();
1555
1556 assert_eq!(version, 1);
1557 assert_eq!(table.version(), Some(1));
1558
1559 let reopened = open_or_create_table(table_path, HashMap::new(), None)
1561 .await
1562 .unwrap();
1563 assert_eq!(reopened.version(), Some(1));
1564 }
1565
1566 #[test]
1567 fn test_path_to_url_local() {
1568 let temp_dir = TempDir::new().unwrap();
1569 let path = temp_dir.path().to_str().unwrap();
1570
1571 let url = path_to_url(path).unwrap();
1572 assert!(url.scheme() == "file");
1573 }
1574
1575 #[test]
1576 fn test_path_to_url_s3() {
1577 let url = path_to_url("s3://my-bucket/path/to/table").unwrap();
1578 assert_eq!(url.scheme(), "s3");
1579 assert_eq!(url.host_str(), Some("my-bucket"));
1580 }
1581
1582 #[test]
1583 fn test_path_to_url_azure() {
1584 let url = path_to_url("az://my-container/path/to/table").unwrap();
1585 assert_eq!(url.scheme(), "az");
1586 }
1587
1588 #[test]
1589 fn test_path_to_url_gcs() {
1590 let url = path_to_url("gs://my-bucket/path/to/table").unwrap();
1591 assert_eq!(url.scheme(), "gs");
1592 }
1593
1594 #[tokio::test]
1597 async fn test_get_latest_version() {
1598 let temp_dir = TempDir::new().unwrap();
1599 let table_path = temp_dir.path().to_str().unwrap();
1600
1601 let schema = test_schema();
1602 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1603 .await
1604 .unwrap();
1605
1606 let v = get_latest_version(&mut table).await.unwrap();
1608 assert_eq!(v, 0);
1609
1610 let batch = test_batch(10);
1612 let (returned_table, version) = write_batches(
1613 table,
1614 vec![batch],
1615 "writer",
1616 1,
1617 SaveMode::Append,
1618 None,
1619 false,
1620 None,
1621 false,
1622 None,
1623 )
1624 .await
1625 .unwrap();
1626 assert_eq!(version, 1);
1627 table = returned_table;
1628
1629 let v = get_latest_version(&mut table).await.unwrap();
1630 assert_eq!(v, 1);
1631 }
1632
1633 #[tokio::test]
1634 async fn test_read_batches_at_version() {
1635 let temp_dir = TempDir::new().unwrap();
1636 let table_path = temp_dir.path().to_str().unwrap();
1637
1638 let schema = test_schema();
1639 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1640 .await
1641 .unwrap();
1642
1643 let batch = test_batch(50);
1645 let (table, _) = write_batches(
1646 table,
1647 vec![batch],
1648 "writer",
1649 1,
1650 SaveMode::Append,
1651 None,
1652 false,
1653 None,
1654 false,
1655 None,
1656 )
1657 .await
1658 .unwrap();
1659
1660 let batch = test_batch(30);
1662 let (_table, _) = write_batches(
1663 table,
1664 vec![batch],
1665 "writer",
1666 2,
1667 SaveMode::Append,
1668 None,
1669 false,
1670 None,
1671 false,
1672 None,
1673 )
1674 .await
1675 .unwrap();
1676
1677 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1679 .await
1680 .unwrap();
1681 let (batches, _) = read_batches_at_version(&mut read_table, 1, 10000)
1682 .await
1683 .unwrap();
1684 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1685 assert_eq!(total_rows, 50);
1686
1687 let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1689 .await
1690 .unwrap();
1691 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1692 assert_eq!(total_rows, 80);
1693 }
1694
1695 #[tokio::test]
1696 async fn test_sink_source_roundtrip() {
1697 use super::super::delta::DeltaLakeSink;
1698 use super::super::delta_config::DeltaLakeSinkConfig;
1699 use super::super::delta_source::DeltaSource;
1700 use super::super::delta_source_config::DeltaSourceConfig;
1701 use crate::config::ConnectorConfig;
1702 use crate::connector::{SinkConnector, SourceConnector};
1703
1704 let temp_dir = TempDir::new().unwrap();
1705 let table_path = temp_dir.path().to_str().unwrap();
1706
1707 let sink_config = DeltaLakeSinkConfig::new(table_path);
1709 let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1710 let connector_config = ConnectorConfig::new("delta-lake");
1711 sink.open(&connector_config).await.unwrap();
1712
1713 sink.begin_epoch(1).await.unwrap();
1714 let batch = test_batch(25);
1715 sink.write_batch(&batch).await.unwrap();
1716 sink.pre_commit(1).await.unwrap();
1717 sink.commit_epoch(1).await.unwrap();
1718 sink.close().await.unwrap();
1719
1720 let mut source_config = DeltaSourceConfig::new(table_path);
1722 source_config.starting_version = Some(0);
1723 let mut source = DeltaSource::new(source_config, None);
1724 let source_connector_config = ConnectorConfig::new("delta-lake");
1725 source.open(&source_connector_config).await.unwrap();
1726
1727 let result = source.poll_batch(10000).await.unwrap();
1729 assert!(result.is_some(), "should have received a batch");
1730 let total_rows: usize = {
1731 let mut rows = result.unwrap().records.num_rows();
1732 while let Ok(Some(batch)) = source.poll_batch(10000).await {
1734 rows += batch.records.num_rows();
1735 }
1736 rows
1737 };
1738 assert_eq!(total_rows, 25);
1739
1740 source.close().await.unwrap();
1741 }
1742
1743 #[tokio::test]
1744 async fn test_source_checkpoint_restore() {
1745 use super::super::delta_source::DeltaSource;
1746 use super::super::delta_source_config::DeltaSourceConfig;
1747 use crate::config::ConnectorConfig;
1748 use crate::connector::SourceConnector;
1749
1750 let temp_dir = TempDir::new().unwrap();
1751 let table_path = temp_dir.path().to_str().unwrap();
1752
1753 let schema = test_schema();
1755 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1756 .await
1757 .unwrap();
1758
1759 let (table, _) = write_batches(
1760 table,
1761 vec![test_batch(10)],
1762 "writer",
1763 1,
1764 SaveMode::Append,
1765 None,
1766 false,
1767 None,
1768 false,
1769 None,
1770 )
1771 .await
1772 .unwrap();
1773 let (_table, _) = write_batches(
1774 table,
1775 vec![test_batch(20)],
1776 "writer",
1777 2,
1778 SaveMode::Append,
1779 None,
1780 false,
1781 None,
1782 false,
1783 None,
1784 )
1785 .await
1786 .unwrap();
1787
1788 let mut source_config = DeltaSourceConfig::new(table_path);
1791 source_config.starting_version = Some(0);
1792 let mut source = DeltaSource::new(source_config.clone(), None);
1793 let connector_config = ConnectorConfig::new("delta-lake");
1794 source.open(&connector_config).await.unwrap();
1795
1796 let _ = source.poll_batch(10000).await.unwrap();
1798 while let Ok(Some(_)) = source.poll_batch(10000).await {}
1800
1801 let cp = source.checkpoint();
1803 assert_eq!(cp.get_offset("delta_version"), Some("2"));
1804 source.close().await.unwrap();
1805
1806 let mut source2 = DeltaSource::new(source_config, None);
1808 source2.open(&connector_config).await.unwrap();
1809 source2.restore(&cp).await.unwrap();
1810
1811 assert_eq!(source2.current_version(), 2);
1812
1813 let result = source2.poll_batch(10000).await.unwrap();
1815 assert!(result.is_none());
1816
1817 source2.close().await.unwrap();
1818 }
1819
1820 #[tokio::test]
1821 async fn test_auto_flush_writes_data() {
1822 use super::super::delta::DeltaLakeSink;
1823 use super::super::delta_config::DeltaLakeSinkConfig;
1824 use crate::config::ConnectorConfig;
1825 use crate::connector::SinkConnector;
1826
1827 let temp_dir = TempDir::new().unwrap();
1828 let table_path = temp_dir.path().to_str().unwrap();
1829
1830 let mut sink_config = DeltaLakeSinkConfig::new(table_path);
1832 sink_config.max_buffer_records = 10;
1833 let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1834
1835 let connector_config = ConnectorConfig::new("delta-lake");
1836 sink.open(&connector_config).await.unwrap();
1837
1838 sink.begin_epoch(1).await.unwrap();
1839
1840 let batch = test_batch(25);
1842 sink.write_batch(&batch).await.unwrap();
1843
1844 sink.pre_commit(1).await.unwrap();
1846 sink.commit_epoch(1).await.unwrap();
1847 sink.close().await.unwrap();
1848
1849 let mut table = open_or_create_table(table_path, HashMap::new(), None)
1851 .await
1852 .unwrap();
1853 let latest = get_latest_version(&mut table).await.unwrap();
1854 assert!(latest >= 1, "should have at least 1 version");
1855
1856 let (batches, _) = read_batches_at_version(&mut table, latest, 10000)
1857 .await
1858 .unwrap();
1859 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1860 assert_eq!(
1861 total_rows, 25,
1862 "all 25 rows should be written, not dropped by auto-flush"
1863 );
1864 }
1865
1866 #[tokio::test]
1867 async fn test_sink_exactly_once_epoch() {
1868 let temp_dir = TempDir::new().unwrap();
1869 let table_path = temp_dir.path().to_str().unwrap();
1870 let writer_id = "exactly-once-test";
1871
1872 let schema = test_schema();
1873 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1874 .await
1875 .unwrap();
1876
1877 let (table, v1) = write_batches(
1879 table,
1880 vec![test_batch(10)],
1881 writer_id,
1882 1,
1883 SaveMode::Append,
1884 None,
1885 false,
1886 None,
1887 false,
1888 None,
1889 )
1890 .await
1891 .unwrap();
1892 assert_eq!(v1, 1);
1893
1894 let (table, v2) = write_batches(
1896 table,
1897 vec![test_batch(15)],
1898 writer_id,
1899 2,
1900 SaveMode::Append,
1901 None,
1902 false,
1903 None,
1904 false,
1905 None,
1906 )
1907 .await
1908 .unwrap();
1909 assert_eq!(v2, 2);
1910
1911 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1913 assert_eq!(last_epoch, 2);
1914
1915 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1917 .await
1918 .unwrap();
1919 let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
1920 .await
1921 .unwrap();
1922 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1923 assert_eq!(total_rows, 25);
1924 }
1925
1926 #[tokio::test]
1927 async fn test_schema_evolution_adds_column() {
1928 let temp_dir = TempDir::new().unwrap();
1929 let table_path = temp_dir.path().to_str().unwrap();
1930
1931 let schema_v1 = Arc::new(Schema::new(vec![
1933 Field::new("id", DataType::Int64, false),
1934 Field::new("name", DataType::Utf8, true),
1935 ]));
1936 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema_v1))
1937 .await
1938 .unwrap();
1939
1940 let batch_v1 = RecordBatch::try_new(
1942 schema_v1.clone(),
1943 vec![
1944 Arc::new(Int64Array::from(vec![1, 2])),
1945 Arc::new(StringArray::from(vec!["a", "b"])),
1946 ],
1947 )
1948 .unwrap();
1949 let (table, _) = write_batches(
1950 table,
1951 vec![batch_v1],
1952 "evo-writer",
1953 1,
1954 SaveMode::Append,
1955 None,
1956 true, None,
1958 false,
1959 None,
1960 )
1961 .await
1962 .unwrap();
1963
1964 let schema_v2 = Arc::new(Schema::new(vec![
1966 Field::new("id", DataType::Int64, false),
1967 Field::new("name", DataType::Utf8, true),
1968 Field::new("score", DataType::Float64, true),
1969 ]));
1970 let batch_v2 = RecordBatch::try_new(
1971 schema_v2,
1972 vec![
1973 Arc::new(Int64Array::from(vec![3])),
1974 Arc::new(StringArray::from(vec!["c"])),
1975 Arc::new(Float64Array::from(vec![99.5])),
1976 ],
1977 )
1978 .unwrap();
1979 let (table, _) = write_batches(
1980 table,
1981 vec![batch_v2],
1982 "evo-writer",
1983 2,
1984 SaveMode::Append,
1985 None,
1986 true,
1987 None,
1988 false,
1989 None,
1990 )
1991 .await
1992 .unwrap();
1993
1994 let final_schema = get_table_schema(&table).unwrap();
1996 assert_eq!(final_schema.fields().len(), 3);
1997 assert_eq!(final_schema.field(0).name(), "id");
1998 assert_eq!(final_schema.field(1).name(), "name");
1999 assert_eq!(final_schema.field(2).name(), "score");
2000
2001 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
2003 .await
2004 .unwrap();
2005 let (batches, _) = read_batches_at_version(&mut read_table, 2, 10000)
2006 .await
2007 .unwrap();
2008 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2009 assert_eq!(total_rows, 3);
2010 }
2011
2012 #[tokio::test]
2013 async fn test_compaction_reduces_files() {
2014 let temp_dir = TempDir::new().unwrap();
2015 let table_path = temp_dir.path().to_str().unwrap();
2016
2017 let schema = test_schema();
2018 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
2019 .await
2020 .unwrap();
2021
2022 for epoch in 1..=10u64 {
2024 let batch = test_batch(5);
2025 let (t, _) = write_batches(
2026 table,
2027 vec![batch],
2028 "compaction-writer",
2029 epoch,
2030 SaveMode::Append,
2031 None,
2032 false,
2033 None,
2034 false,
2035 None,
2036 )
2037 .await
2038 .unwrap();
2039 table = t;
2040 }
2041
2042 let parquet_before: Vec<_> = std::fs::read_dir(temp_dir.path())
2044 .unwrap()
2045 .filter_map(Result::ok)
2046 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2047 .collect();
2048 assert!(
2049 parquet_before.len() >= 10,
2050 "should have at least 10 Parquet files before compaction, got {}",
2051 parquet_before.len()
2052 );
2053
2054 let (table, result) = run_compaction(table, 128 * 1024 * 1024, &[], None)
2056 .await
2057 .unwrap();
2058 assert!(
2059 result.files_removed > 0,
2060 "compaction should have removed files"
2061 );
2062
2063 drop(table);
2067 }
2068}