1#[cfg(feature = "delta-lake")]
25use std::collections::HashMap;
26
27#[cfg(feature = "delta-lake")]
28use std::sync::Arc;
29
30#[cfg(feature = "delta-lake")]
31use arrow_array::RecordBatch;
32
33#[cfg(feature = "delta-lake")]
34use arrow_schema::SchemaRef;
35
36#[cfg(feature = "delta-lake")]
38use deltalake::kernel::engine::arrow_conversion::TryIntoKernel as _;
39
40#[cfg(feature = "delta-lake")]
41use deltalake::kernel::transaction::CommitProperties;
42
43#[cfg(feature = "delta-lake")]
44use deltalake::kernel::Transaction;
45
46#[cfg(feature = "delta-lake")]
47use deltalake::operations::write::SchemaMode;
48
49#[cfg(feature = "delta-lake")]
50use deltalake::protocol::SaveMode;
51
52#[cfg(feature = "delta-lake")]
53use deltalake::DeltaTable;
54
55#[cfg(feature = "delta-lake")]
56use tracing::{debug, info, warn};
57
58#[cfg(feature = "delta-lake")]
59use url::Url;
60
61#[cfg(feature = "delta-lake")]
62use crate::error::ConnectorError;
63
64#[cfg(feature = "delta-lake")]
66fn path_to_url(path: &str) -> Result<Url, ConnectorError> {
67 if path.contains("://") {
69 Url::parse(path)
70 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid URL '{path}': {e}")))
71 } else {
72 let path_buf = std::path::Path::new(path);
75 let normalized = if path_buf.exists() {
76 std::fs::canonicalize(path_buf).map_err(|e| {
77 ConnectorError::ConfigurationError(format!("invalid path '{path}': {e}"))
78 })?
79 } else {
80 if path_buf.is_absolute() {
83 path_buf.to_path_buf()
84 } else {
85 std::env::current_dir()
86 .map_err(|e| {
87 ConnectorError::ConfigurationError(format!("cannot get current dir: {e}"))
88 })?
89 .join(path_buf)
90 }
91 };
92
93 Url::from_directory_path(&normalized).map_err(|()| {
94 ConnectorError::ConfigurationError(format!(
95 "cannot convert path to URL: {}",
96 normalized.display()
97 ))
98 })
99 }
100}
101
102#[cfg(feature = "delta-lake")]
118#[allow(clippy::implicit_hasher)]
119pub async fn open_or_create_table(
120 table_path: &str,
121 storage_options: HashMap<String, String>,
122 schema: Option<&SchemaRef>,
123) -> Result<DeltaTable, ConnectorError> {
124 info!(table_path, "opening Delta Lake table");
125
126 let url = path_to_url(table_path)?;
127
128 let table = DeltaTable::try_from_url_with_storage_options(url.clone(), storage_options.clone())
130 .await
131 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to open table: {e}")))?;
132
133 if table.version().is_some() {
135 info!(
136 table_path,
137 version = table.version(),
138 "opened existing Delta Lake table"
139 );
140 return Ok(table);
141 }
142
143 let schema = schema.ok_or_else(|| {
145 ConnectorError::ConfigurationError(
146 "cannot create Delta Lake table without schema - \
147 write at least one batch first"
148 .into(),
149 )
150 })?;
151
152 info!(table_path, "creating new Delta Lake table");
153
154 let delta_schema: deltalake::kernel::StructType = schema
156 .as_ref()
157 .try_into_kernel()
158 .map_err(|e| ConnectorError::SchemaMismatch(format!("schema conversion failed: {e}")))?;
159
160 let table = table
162 .create()
163 .with_columns(delta_schema.fields().cloned())
164 .await
165 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to create table: {e}")))?;
166
167 info!(
168 table_path,
169 version = table.version(),
170 "created new Delta Lake table"
171 );
172
173 Ok(table)
174}
175
176#[cfg(feature = "delta-lake")]
196pub async fn write_batches(
197 table: DeltaTable,
198 batches: Vec<RecordBatch>,
199 writer_id: &str,
200 epoch: u64,
201 save_mode: SaveMode,
202 partition_columns: Option<&[String]>,
203 schema_evolution: bool,
204) -> Result<(DeltaTable, i64), ConnectorError> {
205 if batches.is_empty() {
206 debug!("no batches to write, skipping");
207 let version = table.version().unwrap_or(0);
208 return Ok((table, version));
209 }
210
211 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
212
213 debug!(
214 writer_id,
215 epoch,
216 total_rows,
217 num_batches = batches.len(),
218 "writing batches to Delta Lake"
219 );
220
221 #[allow(clippy::cast_possible_wrap)]
225 let epoch_i64 = epoch as i64;
226
227 let mut write_builder = table
228 .write(batches)
229 .with_save_mode(save_mode)
230 .with_commit_properties(
231 CommitProperties::default()
232 .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
233 );
234
235 if schema_evolution {
237 write_builder = write_builder.with_schema_mode(SchemaMode::Merge);
238 }
239
240 if let Some(cols) = partition_columns {
242 if !cols.is_empty() {
243 write_builder = write_builder.with_partition_columns(cols.to_vec());
244 }
245 }
246
247 let table = write_builder
249 .await
250 .map_err(|e| ConnectorError::WriteError(format!("Delta Lake write failed: {e}")))?;
251
252 let version = table.version().unwrap_or(0);
253
254 info!(
255 writer_id,
256 epoch, version, total_rows, "committed Delta Lake transaction"
257 );
258
259 Ok((table, version))
260}
261
262#[cfg(feature = "delta-lake")]
276pub async fn get_last_committed_epoch(table: &DeltaTable, writer_id: &str) -> u64 {
277 let Ok(snapshot) = table.snapshot() else {
279 debug!(writer_id, "no snapshot available, assuming epoch 0");
280 return 0;
281 };
282
283 match snapshot
284 .transaction_version(&table.log_store(), writer_id)
285 .await
286 {
287 Ok(Some(version)) => {
288 #[allow(clippy::cast_sign_loss)]
291 let epoch = version as u64;
292 debug!(
293 writer_id,
294 epoch, "found last committed epoch from txn metadata"
295 );
296 epoch
297 }
298 Ok(None) => {
299 debug!(
300 writer_id,
301 "no txn metadata found for writer, assuming epoch 0"
302 );
303 0
304 }
305 Err(e) => {
306 warn!(writer_id, error = %e, "failed to read txn metadata, assuming epoch 0");
307 0
308 }
309 }
310}
311
312#[cfg(feature = "delta-lake")]
326pub fn get_table_schema(table: &DeltaTable) -> Result<SchemaRef, ConnectorError> {
327 let state = table
328 .snapshot()
329 .map_err(|e| ConnectorError::SchemaMismatch(format!("table has no snapshot: {e}")))?;
330
331 Ok(state.snapshot().arrow_schema())
333}
334
335#[cfg(feature = "delta-lake")]
343pub async fn get_latest_version(table: &mut DeltaTable) -> Result<i64, ConnectorError> {
344 let (updated, _metrics) =
346 table.clone().update().await.map_err(|e| {
347 ConnectorError::ReadError(format!("failed to refresh Delta table: {e}"))
348 })?;
349
350 *table = updated;
351 Ok(table.version().unwrap_or(0))
352}
353
354#[cfg(feature = "delta-lake")]
371pub async fn read_batches_at_version(
372 table: &mut DeltaTable,
373 version: i64,
374 max_records: usize,
375) -> Result<Vec<RecordBatch>, ConnectorError> {
376 use datafusion::prelude::SessionContext;
377 use tokio_stream::StreamExt;
378
379 table
381 .load_version(version)
382 .await
383 .map_err(|e| ConnectorError::ReadError(format!("failed to load version {version}: {e}")))?;
384
385 debug!(version, "Delta Lake: loaded version for reading");
386
387 let provider =
389 table.table_provider().build().await.map_err(|e| {
390 ConnectorError::ReadError(format!("failed to build table provider: {e}"))
391 })?;
392
393 let ctx = SessionContext::new();
394 ctx.register_table("delta_source_scan", Arc::new(provider))
395 .map_err(|e| ConnectorError::ReadError(format!("failed to register scan table: {e}")))?;
396
397 let df = ctx
399 .sql("SELECT * FROM delta_source_scan")
400 .await
401 .map_err(|e| ConnectorError::ReadError(format!("scan query failed: {e}")))?;
402
403 let df = if max_records < usize::MAX {
404 df.limit(0, Some(max_records))
405 .map_err(|e| ConnectorError::ReadError(format!("limit failed: {e}")))?
406 } else {
407 df
408 };
409
410 let mut stream = df
412 .execute_stream()
413 .await
414 .map_err(|e| ConnectorError::ReadError(format!("stream execution failed: {e}")))?;
415
416 let mut batches = Vec::new();
417 let mut total_rows: usize = 0;
418
419 while let Some(result) = stream.next().await {
420 let batch =
421 result.map_err(|e| ConnectorError::ReadError(format!("stream batch failed: {e}")))?;
422 if batch.num_rows() == 0 {
423 continue;
424 }
425 total_rows += batch.num_rows();
426 batches.push(batch);
427
428 if total_rows >= max_records {
430 break;
431 }
432 }
433
434 debug!(
435 version,
436 num_batches = batches.len(),
437 total_rows,
438 "Delta Lake: scanned version"
439 );
440
441 Ok(batches)
442}
443
444#[cfg(feature = "delta-lake")]
446#[derive(Debug)]
447pub struct MergeResult {
448 pub rows_inserted: usize,
450 pub rows_updated: usize,
452 pub rows_deleted: usize,
454}
455
456#[cfg(feature = "delta-lake")]
475#[allow(clippy::too_many_lines)]
476pub async fn merge_batches(
477 table: DeltaTable,
478 source_batch: RecordBatch,
479 key_columns: &[String],
480 writer_id: &str,
481 epoch: u64,
482 schema_evolution: bool,
483) -> Result<(DeltaTable, MergeResult), ConnectorError> {
484 use datafusion::prelude::*;
485 use deltalake::kernel::transaction::CommitProperties;
486 use deltalake::kernel::Transaction;
487 if source_batch.num_rows() == 0 {
488 return Ok((
489 table,
490 MergeResult {
491 rows_inserted: 0,
492 rows_updated: 0,
493 rows_deleted: 0,
494 },
495 ));
496 }
497
498 debug!(
499 key_columns = ?key_columns,
500 source_rows = source_batch.num_rows(),
501 "performing Delta Lake MERGE"
502 );
503
504 let ctx = SessionContext::new();
506 let source_df = ctx.read_batch(source_batch).map_err(|e| {
507 ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
508 })?;
509
510 let predicate = key_columns
512 .iter()
513 .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
514 .reduce(Expr::and)
515 .ok_or_else(|| {
516 ConnectorError::ConfigurationError("merge requires at least one key column".into())
517 })?;
518
519 #[allow(clippy::cast_possible_wrap)]
521 let epoch_i64 = epoch as i64;
522
523 let source_schema = source_df.schema().clone();
524
525 let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
527
528 let all_columns: Vec<String> = source_schema
529 .fields()
530 .iter()
531 .map(|f| f.name().clone())
532 .collect();
533
534 let non_key_columns: Vec<String> = all_columns
535 .iter()
536 .filter(|c| !key_set.contains(c.as_str()))
537 .cloned()
538 .collect();
539
540 let non_key_for_update = non_key_columns;
542 let all_for_insert = all_columns;
543
544 let mut merge_builder = table
545 .merge(source_df, predicate)
546 .with_source_alias("source")
547 .with_target_alias("target")
548 .with_commit_properties(
549 CommitProperties::default()
550 .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
551 )
552 .when_matched_update(|update| {
553 let mut u = update;
554 for col_name in &non_key_for_update {
555 u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
556 }
557 u
558 })
559 .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
560 .when_not_matched_insert(|insert| {
561 let mut ins = insert;
562 for col_name in &all_for_insert {
563 ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
564 }
565 ins
566 })
567 .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
568
569 if schema_evolution {
570 merge_builder = merge_builder.with_merge_schema(true);
571 }
572
573 let (table, metrics) = merge_builder
574 .await
575 .map_err(|e| ConnectorError::WriteError(format!("Delta Lake MERGE failed: {e}")))?;
576
577 let result = MergeResult {
578 rows_inserted: metrics.num_target_rows_inserted,
579 rows_updated: metrics.num_target_rows_updated,
580 rows_deleted: metrics.num_target_rows_deleted,
581 };
582
583 info!(
584 writer_id,
585 epoch,
586 rows_inserted = result.rows_inserted,
587 rows_updated = result.rows_updated,
588 rows_deleted = result.rows_deleted,
589 "Delta Lake MERGE complete"
590 );
591
592 Ok((table, result))
593}
594
595#[cfg(feature = "delta-lake")]
603pub async fn delete_by_merge(
604 table: DeltaTable,
605 delete_batch: RecordBatch,
606 key_columns: &[String],
607 writer_id: &str,
608 epoch: u64,
609) -> Result<(DeltaTable, usize), ConnectorError> {
610 use datafusion::prelude::*;
611 use deltalake::kernel::transaction::CommitProperties;
612 use deltalake::kernel::Transaction;
613
614 if delete_batch.num_rows() == 0 {
615 return Ok((table, 0));
616 }
617
618 debug!(
619 delete_rows = delete_batch.num_rows(),
620 "performing Delta Lake delete-by-merge"
621 );
622
623 let ctx = SessionContext::new();
624 let source_df = ctx.read_batch(delete_batch).map_err(|e| {
625 ConnectorError::WriteError(format!("failed to create delete DataFrame: {e}"))
626 })?;
627
628 let predicate = key_columns
629 .iter()
630 .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
631 .reduce(Expr::and)
632 .ok_or_else(|| {
633 ConnectorError::ConfigurationError("delete requires at least one key column".into())
634 })?;
635
636 #[allow(clippy::cast_possible_wrap)]
637 let epoch_i64 = epoch as i64;
638
639 let (table, metrics) = table
640 .merge(source_df, predicate)
641 .with_source_alias("source")
642 .with_target_alias("target")
643 .with_commit_properties(
644 CommitProperties::default()
645 .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
646 )
647 .when_matched_delete(|delete| delete)
648 .map_err(|e| ConnectorError::WriteError(format!("delete-merge setup failed: {e}")))?
649 .await
650 .map_err(|e| ConnectorError::WriteError(format!("Delta Lake delete-merge failed: {e}")))?;
651
652 let rows_deleted = metrics.num_target_rows_deleted;
653 info!(rows_deleted, "Delta Lake delete-by-merge complete");
654
655 Ok((table, rows_deleted))
656}
657
658#[cfg(feature = "delta-lake")]
671#[allow(clippy::too_many_lines)]
672pub async fn merge_changelog(
673 table: DeltaTable,
674 source_batch: RecordBatch,
675 key_columns: &[String],
676 writer_id: &str,
677 epoch: u64,
678 schema_evolution: bool,
679) -> Result<(DeltaTable, MergeResult), ConnectorError> {
680 use datafusion::prelude::*;
681 use deltalake::kernel::transaction::CommitProperties;
682 use deltalake::kernel::Transaction;
683
684 const CDC_COLUMNS: &[&str] = &["_op", "_ts_ms"];
685
686 if source_batch.num_rows() == 0 {
687 return Ok((
688 table,
689 MergeResult {
690 rows_inserted: 0,
691 rows_updated: 0,
692 rows_deleted: 0,
693 },
694 ));
695 }
696
697 debug!(
698 key_columns = ?key_columns,
699 source_rows = source_batch.num_rows(),
700 "performing atomic changelog MERGE"
701 );
702
703 let ctx = SessionContext::new();
704 let source_df = ctx.read_batch(source_batch).map_err(|e| {
705 ConnectorError::WriteError(format!("failed to create source DataFrame: {e}"))
706 })?;
707
708 let predicate = key_columns
710 .iter()
711 .map(|k| col(format!("target.{k}")).eq(col(format!("source.{k}"))))
712 .reduce(Expr::and)
713 .ok_or_else(|| {
714 ConnectorError::ConfigurationError("merge requires at least one key column".into())
715 })?;
716
717 let source_schema = source_df.schema().clone();
718 let key_set: std::collections::HashSet<&str> = key_columns.iter().map(String::as_str).collect();
719
720 let all_user_columns: Vec<String> = source_schema
722 .fields()
723 .iter()
724 .map(|f| f.name().clone())
725 .filter(|name| !CDC_COLUMNS.contains(&name.as_str()))
726 .collect();
727
728 let non_key_user_columns: Vec<String> = all_user_columns
729 .iter()
730 .filter(|c| !key_set.contains(c.as_str()))
731 .cloned()
732 .collect();
733
734 let upsert_pred = col("source._op").in_list(vec![lit("I"), lit("U"), lit("r")], false);
736 let delete_pred = col("source._op").eq(lit("D"));
737
738 #[allow(clippy::cast_possible_wrap)]
739 let epoch_i64 = epoch as i64;
740
741 let non_key_for_update = non_key_user_columns;
742 let all_for_insert = all_user_columns;
743
744 let mut merge_builder = table
745 .merge(source_df, predicate)
746 .with_source_alias("source")
747 .with_target_alias("target")
748 .with_commit_properties(
749 CommitProperties::default()
750 .with_application_transaction(Transaction::new(writer_id, epoch_i64)),
751 )
752 .when_matched_update(|update| {
753 let mut u = update.predicate(upsert_pred.clone());
754 for col_name in &non_key_for_update {
755 u = u.update(col_name.as_str(), col(format!("source.{col_name}")));
756 }
757 u
758 })
759 .map_err(|e| ConnectorError::WriteError(format!("merge matched-update failed: {e}")))?
760 .when_matched_delete(|delete| delete.predicate(delete_pred))
761 .map_err(|e| ConnectorError::WriteError(format!("merge matched-delete failed: {e}")))?
762 .when_not_matched_insert(|insert| {
763 let mut ins = insert.predicate(upsert_pred);
764 for col_name in &all_for_insert {
765 ins = ins.set(col_name.as_str(), col(format!("source.{col_name}")));
766 }
767 ins
768 })
769 .map_err(|e| ConnectorError::WriteError(format!("merge not-matched-insert failed: {e}")))?;
770
771 if schema_evolution {
772 merge_builder = merge_builder.with_merge_schema(true);
773 }
774
775 let (table, metrics) = merge_builder.await.map_err(|e| {
776 ConnectorError::WriteError(format!("Delta Lake changelog MERGE failed: {e}"))
777 })?;
778
779 let result = MergeResult {
780 rows_inserted: metrics.num_target_rows_inserted,
781 rows_updated: metrics.num_target_rows_updated,
782 rows_deleted: metrics.num_target_rows_deleted,
783 };
784
785 info!(
786 writer_id,
787 epoch,
788 rows_inserted = result.rows_inserted,
789 rows_updated = result.rows_updated,
790 rows_deleted = result.rows_deleted,
791 "Delta Lake changelog MERGE complete"
792 );
793
794 Ok((table, result))
795}
796
797#[cfg(feature = "delta-lake")]
799#[derive(Debug)]
800pub struct CompactionResult {
801 pub files_added: u64,
803 pub files_removed: u64,
805 pub partitions_optimized: u64,
807}
808
809#[cfg(feature = "delta-lake")]
818pub async fn run_compaction(
819 table: DeltaTable,
820 target_file_size: u64,
821 z_order_columns: &[String],
822) -> Result<(DeltaTable, CompactionResult), ConnectorError> {
823 use deltalake::operations::optimize::OptimizeType;
824
825 info!(target_file_size, "running Delta Lake compaction (OPTIMIZE)");
826
827 let optimize_type = if z_order_columns.is_empty() {
828 OptimizeType::Compact
829 } else {
830 OptimizeType::ZOrder(z_order_columns.to_vec())
831 };
832
833 let (table, metrics) = table
834 .optimize()
835 .with_type(optimize_type)
836 .with_target_size(target_file_size)
837 .await
838 .map_err(|e| ConnectorError::Internal(format!("compaction failed: {e}")))?;
839
840 let result = CompactionResult {
841 files_added: metrics.num_files_added,
842 files_removed: metrics.num_files_removed,
843 partitions_optimized: metrics.partitions_optimized,
844 };
845
846 info!(
847 files_added = result.files_added,
848 files_removed = result.files_removed,
849 partitions_optimized = result.partitions_optimized,
850 "Delta Lake compaction complete"
851 );
852
853 Ok((table, result))
854}
855
856#[cfg(feature = "delta-lake")]
862pub async fn run_vacuum(
863 table: DeltaTable,
864 retention: std::time::Duration,
865) -> Result<(DeltaTable, usize), ConnectorError> {
866 let retention_hours = retention.as_secs() / 3600;
867 info!(retention_hours, "running Delta Lake VACUUM");
868
869 let chrono_duration =
870 chrono::Duration::from_std(retention).unwrap_or_else(|_| chrono::Duration::hours(168)); let (table, metrics) = table
873 .vacuum()
874 .with_retention_period(chrono_duration)
875 .with_enforce_retention_duration(false)
876 .await
877 .map_err(|e| ConnectorError::Internal(format!("vacuum failed: {e}")))?;
878
879 let files_deleted = metrics.files_deleted.len();
880
881 info!(files_deleted, "Delta Lake VACUUM complete");
882
883 Ok((table, files_deleted))
884}
885
886#[cfg(feature = "delta-lake")]
896#[allow(clippy::implicit_hasher, clippy::unused_async)]
897pub async fn resolve_catalog_options(
898 catalog: &super::delta_config::DeltaCatalogType,
899 #[allow(unused_variables)] catalog_database: Option<&str>,
900 #[allow(unused_variables)] catalog_name: Option<&str>,
901 _catalog_schema: Option<&str>,
902 table_path: &str,
903 base_storage_options: &HashMap<String, String>,
904 catalog_properties: &HashMap<String, String>,
905) -> Result<(String, HashMap<String, String>), ConnectorError> {
906 use super::delta_config::DeltaCatalogType;
907
908 match catalog {
909 DeltaCatalogType::None => Ok((table_path.to_string(), base_storage_options.clone())),
910 #[cfg(feature = "delta-lake-glue")]
911 DeltaCatalogType::Glue => {
912 use deltalake::DataCatalog;
913 let database = catalog_database.ok_or_else(|| {
914 ConnectorError::ConfigurationError(
915 "Glue catalog requires 'catalog.database'".into(),
916 )
917 })?;
918 let glue = deltalake_catalog_glue::GlueDataCatalog::from_env()
919 .await
920 .map_err(|e| {
921 ConnectorError::ConnectionFailed(format!("failed to init Glue catalog: {e}"))
922 })?;
923 let resolved = glue
924 .get_table_storage_location(catalog_name.map(String::from), database, table_path)
925 .await
926 .map_err(|e| {
927 ConnectorError::ConnectionFailed(format!(
928 "Glue catalog lookup failed for '{database}.{table_path}': {e}"
929 ))
930 })?;
931 info!(
932 glue_database = database,
933 table = table_path,
934 resolved_path = %resolved,
935 "resolved table path via Glue catalog"
936 );
937 let mut opts = base_storage_options.clone();
938 opts.extend(
939 catalog_properties
940 .iter()
941 .map(|(k, v)| (k.clone(), v.clone())),
942 );
943 Ok((resolved, opts))
944 }
945 #[cfg(not(feature = "delta-lake-glue"))]
946 DeltaCatalogType::Glue => Err(ConnectorError::ConfigurationError(
947 "Glue catalog requires the 'delta-lake-glue' feature. \
948 Build with: cargo build --features delta-lake-glue"
949 .into(),
950 )),
951 #[cfg(feature = "delta-lake-unity")]
952 DeltaCatalogType::Unity {
953 workspace_url,
954 access_token,
955 } => {
956 let mut opts = base_storage_options.clone();
962 opts.extend(
963 catalog_properties
964 .iter()
965 .map(|(k, v)| (k.clone(), v.clone())),
966 );
967 if !workspace_url.is_empty() {
968 opts.insert("databricks_host".to_string(), workspace_url.clone());
969 }
970 if !access_token.is_empty() {
971 opts.insert("databricks_token".to_string(), access_token.clone());
972 }
973 Ok((table_path.to_string(), opts))
974 }
975 #[cfg(not(feature = "delta-lake-unity"))]
976 DeltaCatalogType::Unity { .. } => Err(ConnectorError::ConfigurationError(
977 "Unity catalog requires the 'delta-lake-unity' feature. \
978 Build with: cargo build --features delta-lake-unity"
979 .into(),
980 )),
981 }
982}
983
984#[cfg(all(test, feature = "delta-lake"))]
989mod tests {
990 use super::*;
991 use arrow_array::{Float64Array, Int64Array, StringArray};
992 use arrow_schema::{DataType, Field, Schema};
993 use std::sync::Arc;
994 use tempfile::TempDir;
995
996 fn test_schema() -> SchemaRef {
997 Arc::new(Schema::new(vec![
998 Field::new("id", DataType::Int64, false),
999 Field::new("name", DataType::Utf8, true),
1000 Field::new("value", DataType::Float64, true),
1001 ]))
1002 }
1003
1004 fn test_batch(n: usize) -> RecordBatch {
1005 let ids: Vec<i64> = (0..n as i64).collect();
1006 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1007 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1008
1009 RecordBatch::try_new(
1010 test_schema(),
1011 vec![
1012 Arc::new(Int64Array::from(ids)),
1013 Arc::new(StringArray::from(names)),
1014 Arc::new(Float64Array::from(values)),
1015 ],
1016 )
1017 .unwrap()
1018 }
1019
1020 #[tokio::test]
1021 async fn test_open_creates_table() {
1022 let temp_dir = TempDir::new().unwrap();
1023 let table_path = temp_dir.path().to_str().unwrap();
1024
1025 let schema = test_schema();
1027 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1028 .await
1029 .unwrap();
1030
1031 assert_eq!(table.version(), Some(0));
1032
1033 let delta_log = temp_dir.path().join("_delta_log");
1035 assert!(delta_log.exists(), "_delta_log directory should exist");
1036 }
1037
1038 #[tokio::test]
1039 async fn test_open_existing_table() {
1040 let temp_dir = TempDir::new().unwrap();
1041 let table_path = temp_dir.path().to_str().unwrap();
1042
1043 let schema = test_schema();
1045 let _ = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1046 .await
1047 .unwrap();
1048
1049 let table = open_or_create_table(table_path, HashMap::new(), None)
1051 .await
1052 .unwrap();
1053
1054 assert_eq!(table.version(), Some(0));
1055 }
1056
1057 #[tokio::test]
1058 async fn test_open_nonexistent_without_schema_fails() {
1059 let temp_dir = TempDir::new().unwrap();
1060 let nonexistent_table = temp_dir.path().join("nonexistent");
1062 std::fs::create_dir_all(&nonexistent_table).unwrap();
1063 let table_path = nonexistent_table.to_str().unwrap();
1064
1065 let result = open_or_create_table(table_path, HashMap::new(), None).await;
1067 assert!(result.is_err());
1068 let err = result.unwrap_err().to_string();
1069 assert!(err.contains("schema"), "error should mention schema: {err}");
1070 }
1071
1072 #[tokio::test]
1073 async fn test_write_batch_creates_parquet() {
1074 let temp_dir = TempDir::new().unwrap();
1075 let table_path = temp_dir.path().to_str().unwrap();
1076
1077 let schema = test_schema();
1079 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1080 .await
1081 .unwrap();
1082
1083 let batch = test_batch(100);
1085 let (table, version) = write_batches(
1086 table,
1087 vec![batch],
1088 "test-writer",
1089 1,
1090 SaveMode::Append,
1091 None,
1092 false,
1093 )
1094 .await
1095 .unwrap();
1096
1097 assert_eq!(version, 1);
1098 assert_eq!(table.version(), Some(1));
1099
1100 let parquet_files: Vec<_> = std::fs::read_dir(temp_dir.path())
1102 .unwrap()
1103 .filter_map(Result::ok)
1104 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1105 .collect();
1106
1107 assert!(
1108 !parquet_files.is_empty(),
1109 "should have created Parquet files"
1110 );
1111 }
1112
1113 #[tokio::test]
1114 async fn test_exactly_once_epoch_skip() {
1115 let temp_dir = TempDir::new().unwrap();
1116 let table_path = temp_dir.path().to_str().unwrap();
1117 let writer_id = "exactly-once-writer";
1118
1119 let schema = test_schema();
1121 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1122 .await
1123 .unwrap();
1124
1125 let batch = test_batch(10);
1126 let (table, _) = write_batches(
1127 table,
1128 vec![batch.clone()],
1129 writer_id,
1130 1,
1131 SaveMode::Append,
1132 None,
1133 false,
1134 )
1135 .await
1136 .unwrap();
1137
1138 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1140 assert_eq!(last_epoch, 1);
1141
1142 let reopened_table = open_or_create_table(table_path, HashMap::new(), None)
1144 .await
1145 .unwrap();
1146
1147 let recovered_epoch = get_last_committed_epoch(&reopened_table, writer_id).await;
1149 assert_eq!(recovered_epoch, 1);
1150 }
1151
1152 #[tokio::test]
1153 async fn test_multiple_epochs_sequential() {
1154 let temp_dir = TempDir::new().unwrap();
1155 let table_path = temp_dir.path().to_str().unwrap();
1156 let writer_id = "sequential-writer";
1157
1158 let schema = test_schema();
1159 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1160 .await
1161 .unwrap();
1162
1163 for epoch in 1..=3 {
1165 let batch = test_batch(10);
1166 let result = write_batches(
1167 table,
1168 vec![batch],
1169 writer_id,
1170 epoch,
1171 SaveMode::Append,
1172 None,
1173 false,
1174 )
1175 .await
1176 .unwrap();
1177 table = result.0;
1178 assert_eq!(result.1, epoch as i64);
1179 }
1180
1181 assert_eq!(table.version(), Some(3));
1183
1184 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1186 assert_eq!(last_epoch, 3);
1187 }
1188
1189 #[tokio::test]
1190 async fn test_get_table_schema() {
1191 let temp_dir = TempDir::new().unwrap();
1192 let table_path = temp_dir.path().to_str().unwrap();
1193
1194 let expected_schema = test_schema();
1195 let table = open_or_create_table(table_path, HashMap::new(), Some(&expected_schema))
1196 .await
1197 .unwrap();
1198
1199 let actual_schema = get_table_schema(&table).unwrap();
1200
1201 assert_eq!(actual_schema.fields().len(), expected_schema.fields().len());
1203 for (expected, actual) in expected_schema.fields().iter().zip(actual_schema.fields()) {
1204 assert_eq!(expected.name(), actual.name());
1205 }
1206 }
1207
1208 #[tokio::test]
1209 async fn test_write_empty_batches() {
1210 let temp_dir = TempDir::new().unwrap();
1211 let table_path = temp_dir.path().to_str().unwrap();
1212
1213 let schema = test_schema();
1214 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1215 .await
1216 .unwrap();
1217
1218 let (table, version) = write_batches(
1220 table,
1221 vec![],
1222 "test-writer",
1223 1,
1224 SaveMode::Append,
1225 None,
1226 false,
1227 )
1228 .await
1229 .unwrap();
1230
1231 assert_eq!(version, 0);
1233 assert_eq!(table.version(), Some(0));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_write_multiple_batches() {
1238 let temp_dir = TempDir::new().unwrap();
1240 let table_path = temp_dir.path().to_str().unwrap();
1241
1242 let schema = test_schema();
1243 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1244 .await
1245 .unwrap();
1246
1247 let batch1 = test_batch(50);
1249 let batch2 = test_batch(50);
1250 let (table, version) = write_batches(
1251 table,
1252 vec![batch1, batch2],
1253 "multi-batch-writer",
1254 1,
1255 SaveMode::Append,
1256 None,
1257 false,
1258 )
1259 .await
1260 .unwrap();
1261
1262 assert_eq!(version, 1);
1263 assert_eq!(table.version(), Some(1));
1264
1265 let reopened = open_or_create_table(table_path, HashMap::new(), None)
1267 .await
1268 .unwrap();
1269 assert_eq!(reopened.version(), Some(1));
1270 }
1271
1272 #[test]
1273 fn test_path_to_url_local() {
1274 let temp_dir = TempDir::new().unwrap();
1275 let path = temp_dir.path().to_str().unwrap();
1276
1277 let url = path_to_url(path).unwrap();
1278 assert!(url.scheme() == "file");
1279 }
1280
1281 #[test]
1282 fn test_path_to_url_s3() {
1283 let url = path_to_url("s3://my-bucket/path/to/table").unwrap();
1284 assert_eq!(url.scheme(), "s3");
1285 assert_eq!(url.host_str(), Some("my-bucket"));
1286 }
1287
1288 #[test]
1289 fn test_path_to_url_azure() {
1290 let url = path_to_url("az://my-container/path/to/table").unwrap();
1291 assert_eq!(url.scheme(), "az");
1292 }
1293
1294 #[test]
1295 fn test_path_to_url_gcs() {
1296 let url = path_to_url("gs://my-bucket/path/to/table").unwrap();
1297 assert_eq!(url.scheme(), "gs");
1298 }
1299
1300 #[tokio::test]
1303 async fn test_get_latest_version() {
1304 let temp_dir = TempDir::new().unwrap();
1305 let table_path = temp_dir.path().to_str().unwrap();
1306
1307 let schema = test_schema();
1308 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1309 .await
1310 .unwrap();
1311
1312 let v = get_latest_version(&mut table).await.unwrap();
1314 assert_eq!(v, 0);
1315
1316 let batch = test_batch(10);
1318 let (returned_table, version) = write_batches(
1319 table,
1320 vec![batch],
1321 "writer",
1322 1,
1323 SaveMode::Append,
1324 None,
1325 false,
1326 )
1327 .await
1328 .unwrap();
1329 assert_eq!(version, 1);
1330 table = returned_table;
1331
1332 let v = get_latest_version(&mut table).await.unwrap();
1333 assert_eq!(v, 1);
1334 }
1335
1336 #[tokio::test]
1337 async fn test_read_batches_at_version() {
1338 let temp_dir = TempDir::new().unwrap();
1339 let table_path = temp_dir.path().to_str().unwrap();
1340
1341 let schema = test_schema();
1342 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1343 .await
1344 .unwrap();
1345
1346 let batch = test_batch(50);
1348 let (table, _) = write_batches(
1349 table,
1350 vec![batch],
1351 "writer",
1352 1,
1353 SaveMode::Append,
1354 None,
1355 false,
1356 )
1357 .await
1358 .unwrap();
1359
1360 let batch = test_batch(30);
1362 let (_table, _) = write_batches(
1363 table,
1364 vec![batch],
1365 "writer",
1366 2,
1367 SaveMode::Append,
1368 None,
1369 false,
1370 )
1371 .await
1372 .unwrap();
1373
1374 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1376 .await
1377 .unwrap();
1378 let batches = read_batches_at_version(&mut read_table, 1, 10000)
1379 .await
1380 .unwrap();
1381 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1382 assert_eq!(total_rows, 50);
1383
1384 let batches = read_batches_at_version(&mut read_table, 2, 10000)
1386 .await
1387 .unwrap();
1388 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1389 assert_eq!(total_rows, 80);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_sink_source_roundtrip() {
1394 use super::super::delta::DeltaLakeSink;
1395 use super::super::delta_config::DeltaLakeSinkConfig;
1396 use super::super::delta_source::DeltaSource;
1397 use super::super::delta_source_config::DeltaSourceConfig;
1398 use crate::config::ConnectorConfig;
1399 use crate::connector::{SinkConnector, SourceConnector};
1400
1401 let temp_dir = TempDir::new().unwrap();
1402 let table_path = temp_dir.path().to_str().unwrap();
1403
1404 let sink_config = DeltaLakeSinkConfig::new(table_path);
1406 let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1407 let connector_config = ConnectorConfig::new("delta-lake");
1408 sink.open(&connector_config).await.unwrap();
1409
1410 sink.begin_epoch(1).await.unwrap();
1411 let batch = test_batch(25);
1412 sink.write_batch(&batch).await.unwrap();
1413 sink.pre_commit(1).await.unwrap();
1414 sink.commit_epoch(1).await.unwrap();
1415 sink.close().await.unwrap();
1416
1417 let mut source_config = DeltaSourceConfig::new(table_path);
1419 source_config.starting_version = Some(0);
1420 let mut source = DeltaSource::new(source_config);
1421 let source_connector_config = ConnectorConfig::new("delta-lake");
1422 source.open(&source_connector_config).await.unwrap();
1423
1424 let result = source.poll_batch(10000).await.unwrap();
1426 assert!(result.is_some(), "should have received a batch");
1427 let total_rows: usize = {
1428 let mut rows = result.unwrap().records.num_rows();
1429 while let Ok(Some(batch)) = source.poll_batch(10000).await {
1431 rows += batch.records.num_rows();
1432 }
1433 rows
1434 };
1435 assert_eq!(total_rows, 25);
1436
1437 source.close().await.unwrap();
1438 }
1439
1440 #[tokio::test]
1441 async fn test_source_checkpoint_restore() {
1442 use super::super::delta_source::DeltaSource;
1443 use super::super::delta_source_config::DeltaSourceConfig;
1444 use crate::config::ConnectorConfig;
1445 use crate::connector::SourceConnector;
1446
1447 let temp_dir = TempDir::new().unwrap();
1448 let table_path = temp_dir.path().to_str().unwrap();
1449
1450 let schema = test_schema();
1452 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1453 .await
1454 .unwrap();
1455
1456 let (table, _) = write_batches(
1457 table,
1458 vec![test_batch(10)],
1459 "writer",
1460 1,
1461 SaveMode::Append,
1462 None,
1463 false,
1464 )
1465 .await
1466 .unwrap();
1467 let (_table, _) = write_batches(
1468 table,
1469 vec![test_batch(20)],
1470 "writer",
1471 2,
1472 SaveMode::Append,
1473 None,
1474 false,
1475 )
1476 .await
1477 .unwrap();
1478
1479 let mut source_config = DeltaSourceConfig::new(table_path);
1482 source_config.starting_version = Some(0);
1483 let mut source = DeltaSource::new(source_config.clone());
1484 let connector_config = ConnectorConfig::new("delta-lake");
1485 source.open(&connector_config).await.unwrap();
1486
1487 let _ = source.poll_batch(10000).await.unwrap();
1489 while let Ok(Some(_)) = source.poll_batch(10000).await {}
1491
1492 let cp = source.checkpoint();
1494 assert_eq!(cp.get_offset("delta_version"), Some("2"));
1495 source.close().await.unwrap();
1496
1497 let mut source2 = DeltaSource::new(source_config);
1499 source2.open(&connector_config).await.unwrap();
1500 source2.restore(&cp).await.unwrap();
1501
1502 assert_eq!(source2.current_version(), 2);
1503
1504 let result = source2.poll_batch(10000).await.unwrap();
1506 assert!(result.is_none());
1507
1508 source2.close().await.unwrap();
1509 }
1510
1511 #[tokio::test]
1512 async fn test_auto_flush_writes_data() {
1513 use super::super::delta::DeltaLakeSink;
1514 use super::super::delta_config::DeltaLakeSinkConfig;
1515 use crate::config::ConnectorConfig;
1516 use crate::connector::SinkConnector;
1517
1518 let temp_dir = TempDir::new().unwrap();
1519 let table_path = temp_dir.path().to_str().unwrap();
1520
1521 let mut sink_config = DeltaLakeSinkConfig::new(table_path);
1523 sink_config.max_buffer_records = 10;
1524 let mut sink = DeltaLakeSink::with_schema(sink_config, test_schema());
1525
1526 let connector_config = ConnectorConfig::new("delta-lake");
1527 sink.open(&connector_config).await.unwrap();
1528
1529 sink.begin_epoch(1).await.unwrap();
1530
1531 let batch = test_batch(25);
1533 sink.write_batch(&batch).await.unwrap();
1534
1535 sink.pre_commit(1).await.unwrap();
1537 sink.commit_epoch(1).await.unwrap();
1538 sink.close().await.unwrap();
1539
1540 let mut table = open_or_create_table(table_path, HashMap::new(), None)
1542 .await
1543 .unwrap();
1544 let latest = get_latest_version(&mut table).await.unwrap();
1545 assert!(latest >= 1, "should have at least 1 version");
1546
1547 let batches = read_batches_at_version(&mut table, latest, 10000)
1548 .await
1549 .unwrap();
1550 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1551 assert_eq!(
1552 total_rows, 25,
1553 "all 25 rows should be written, not dropped by auto-flush"
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn test_sink_exactly_once_epoch() {
1559 let temp_dir = TempDir::new().unwrap();
1560 let table_path = temp_dir.path().to_str().unwrap();
1561 let writer_id = "exactly-once-test";
1562
1563 let schema = test_schema();
1564 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1565 .await
1566 .unwrap();
1567
1568 let (table, v1) = write_batches(
1570 table,
1571 vec![test_batch(10)],
1572 writer_id,
1573 1,
1574 SaveMode::Append,
1575 None,
1576 false,
1577 )
1578 .await
1579 .unwrap();
1580 assert_eq!(v1, 1);
1581
1582 let (table, v2) = write_batches(
1584 table,
1585 vec![test_batch(15)],
1586 writer_id,
1587 2,
1588 SaveMode::Append,
1589 None,
1590 false,
1591 )
1592 .await
1593 .unwrap();
1594 assert_eq!(v2, 2);
1595
1596 let last_epoch = get_last_committed_epoch(&table, writer_id).await;
1598 assert_eq!(last_epoch, 2);
1599
1600 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1602 .await
1603 .unwrap();
1604 let batches = read_batches_at_version(&mut read_table, 2, 10000)
1605 .await
1606 .unwrap();
1607 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1608 assert_eq!(total_rows, 25);
1609 }
1610
1611 #[tokio::test]
1612 async fn test_schema_evolution_adds_column() {
1613 let temp_dir = TempDir::new().unwrap();
1614 let table_path = temp_dir.path().to_str().unwrap();
1615
1616 let schema_v1 = Arc::new(Schema::new(vec![
1618 Field::new("id", DataType::Int64, false),
1619 Field::new("name", DataType::Utf8, true),
1620 ]));
1621 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema_v1))
1622 .await
1623 .unwrap();
1624
1625 let batch_v1 = RecordBatch::try_new(
1627 schema_v1.clone(),
1628 vec![
1629 Arc::new(Int64Array::from(vec![1, 2])),
1630 Arc::new(StringArray::from(vec!["a", "b"])),
1631 ],
1632 )
1633 .unwrap();
1634 let (table, _) = write_batches(
1635 table,
1636 vec![batch_v1],
1637 "evo-writer",
1638 1,
1639 SaveMode::Append,
1640 None,
1641 true, )
1643 .await
1644 .unwrap();
1645
1646 let schema_v2 = Arc::new(Schema::new(vec![
1648 Field::new("id", DataType::Int64, false),
1649 Field::new("name", DataType::Utf8, true),
1650 Field::new("score", DataType::Float64, true),
1651 ]));
1652 let batch_v2 = RecordBatch::try_new(
1653 schema_v2,
1654 vec![
1655 Arc::new(Int64Array::from(vec![3])),
1656 Arc::new(StringArray::from(vec!["c"])),
1657 Arc::new(Float64Array::from(vec![99.5])),
1658 ],
1659 )
1660 .unwrap();
1661 let (table, _) = write_batches(
1662 table,
1663 vec![batch_v2],
1664 "evo-writer",
1665 2,
1666 SaveMode::Append,
1667 None,
1668 true,
1669 )
1670 .await
1671 .unwrap();
1672
1673 let final_schema = get_table_schema(&table).unwrap();
1675 assert_eq!(final_schema.fields().len(), 3);
1676 assert_eq!(final_schema.field(0).name(), "id");
1677 assert_eq!(final_schema.field(1).name(), "name");
1678 assert_eq!(final_schema.field(2).name(), "score");
1679
1680 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1682 .await
1683 .unwrap();
1684 let batches = read_batches_at_version(&mut read_table, 2, 10000)
1685 .await
1686 .unwrap();
1687 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1688 assert_eq!(total_rows, 3);
1689 }
1690
1691 #[tokio::test]
1692 async fn test_compaction_reduces_files() {
1693 let temp_dir = TempDir::new().unwrap();
1694 let table_path = temp_dir.path().to_str().unwrap();
1695
1696 let schema = test_schema();
1697 let mut table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1698 .await
1699 .unwrap();
1700
1701 for epoch in 1..=10u64 {
1703 let batch = test_batch(5);
1704 let (t, _) = write_batches(
1705 table,
1706 vec![batch],
1707 "compaction-writer",
1708 epoch,
1709 SaveMode::Append,
1710 None,
1711 false,
1712 )
1713 .await
1714 .unwrap();
1715 table = t;
1716 }
1717
1718 let parquet_before: Vec<_> = std::fs::read_dir(temp_dir.path())
1720 .unwrap()
1721 .filter_map(Result::ok)
1722 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1723 .collect();
1724 assert!(
1725 parquet_before.len() >= 10,
1726 "should have at least 10 Parquet files before compaction, got {}",
1727 parquet_before.len()
1728 );
1729
1730 let (table, result) = run_compaction(table, 128 * 1024 * 1024, &[]).await.unwrap();
1732 assert!(
1733 result.files_removed > 0,
1734 "compaction should have removed files"
1735 );
1736
1737 let (_table, files_deleted) = run_vacuum(table, std::time::Duration::from_secs(0))
1739 .await
1740 .unwrap();
1741
1742 let parquet_after: Vec<_> = std::fs::read_dir(temp_dir.path())
1744 .unwrap()
1745 .filter_map(Result::ok)
1746 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
1747 .collect();
1748 assert!(
1749 parquet_after.len() < parquet_before.len(),
1750 "should have fewer files after compaction+vacuum: before={}, after={}, vacuumed={}",
1751 parquet_before.len(),
1752 parquet_after.len(),
1753 files_deleted
1754 );
1755 }
1756
1757 #[tokio::test]
1760 async fn test_merge_insert_only() {
1761 let temp_dir = TempDir::new().unwrap();
1762 let table_path = temp_dir.path().to_str().unwrap();
1763
1764 let schema = test_schema();
1766 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1767 .await
1768 .unwrap();
1769 let initial = test_batch(3); let (table, _) = write_batches(
1771 table,
1772 vec![initial],
1773 "merge-writer",
1774 1,
1775 SaveMode::Append,
1776 None,
1777 false,
1778 )
1779 .await
1780 .unwrap();
1781
1782 let source = RecordBatch::try_new(
1784 test_schema(),
1785 vec![
1786 Arc::new(Int64Array::from(vec![10, 11])),
1787 Arc::new(StringArray::from(vec!["x", "y"])),
1788 Arc::new(Float64Array::from(vec![10.0, 11.0])),
1789 ],
1790 )
1791 .unwrap();
1792
1793 let (table, result) =
1794 merge_batches(table, source, &["id".to_string()], "merge-writer", 2, false)
1795 .await
1796 .unwrap();
1797
1798 assert_eq!(result.rows_inserted, 2);
1799 assert_eq!(result.rows_updated, 0);
1800
1801 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1803 .await
1804 .unwrap();
1805 let latest = get_latest_version(&mut read_table).await.unwrap();
1806 let batches = read_batches_at_version(&mut read_table, latest, 10000)
1807 .await
1808 .unwrap();
1809 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1810 assert_eq!(total_rows, 5);
1811
1812 drop(table);
1813 }
1814
1815 #[tokio::test]
1816 async fn test_merge_update() {
1817 let temp_dir = TempDir::new().unwrap();
1818 let table_path = temp_dir.path().to_str().unwrap();
1819
1820 let schema = test_schema();
1822 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1823 .await
1824 .unwrap();
1825 let initial = RecordBatch::try_new(
1826 test_schema(),
1827 vec![
1828 Arc::new(Int64Array::from(vec![1, 2, 3])),
1829 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1830 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
1831 ],
1832 )
1833 .unwrap();
1834 let (table, _) = write_batches(
1835 table,
1836 vec![initial],
1837 "merge-writer",
1838 1,
1839 SaveMode::Append,
1840 None,
1841 false,
1842 )
1843 .await
1844 .unwrap();
1845
1846 let source = RecordBatch::try_new(
1848 test_schema(),
1849 vec![
1850 Arc::new(Int64Array::from(vec![1, 2])),
1851 Arc::new(StringArray::from(vec!["a_updated", "b_updated"])),
1852 Arc::new(Float64Array::from(vec![100.0, 200.0])),
1853 ],
1854 )
1855 .unwrap();
1856
1857 let (table, result) =
1858 merge_batches(table, source, &["id".to_string()], "merge-writer", 2, false)
1859 .await
1860 .unwrap();
1861
1862 assert_eq!(result.rows_updated, 2);
1863 assert_eq!(result.rows_inserted, 0);
1864
1865 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1867 .await
1868 .unwrap();
1869 let latest = get_latest_version(&mut read_table).await.unwrap();
1870 let batches = read_batches_at_version(&mut read_table, latest, 10000)
1871 .await
1872 .unwrap();
1873 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1874 assert_eq!(total_rows, 3);
1875
1876 drop(table);
1877 }
1878
1879 #[tokio::test]
1880 async fn test_merge_delete() {
1881 let temp_dir = TempDir::new().unwrap();
1882 let table_path = temp_dir.path().to_str().unwrap();
1883
1884 let schema = test_schema();
1886 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1887 .await
1888 .unwrap();
1889 let initial = RecordBatch::try_new(
1890 test_schema(),
1891 vec![
1892 Arc::new(Int64Array::from(vec![1, 2, 3])),
1893 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1894 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
1895 ],
1896 )
1897 .unwrap();
1898 let (table, _) = write_batches(
1899 table,
1900 vec![initial],
1901 "merge-writer",
1902 1,
1903 SaveMode::Append,
1904 None,
1905 false,
1906 )
1907 .await
1908 .unwrap();
1909
1910 let delete_batch = RecordBatch::try_new(
1912 test_schema(),
1913 vec![
1914 Arc::new(Int64Array::from(vec![1, 3])),
1915 Arc::new(StringArray::from(vec!["a", "c"])),
1916 Arc::new(Float64Array::from(vec![1.0, 3.0])),
1917 ],
1918 )
1919 .unwrap();
1920
1921 let (table, rows_deleted) =
1922 delete_by_merge(table, delete_batch, &["id".to_string()], "merge-writer", 2)
1923 .await
1924 .unwrap();
1925
1926 assert_eq!(rows_deleted, 2);
1927
1928 let mut read_table = open_or_create_table(table_path, HashMap::new(), None)
1930 .await
1931 .unwrap();
1932 let latest = get_latest_version(&mut read_table).await.unwrap();
1933 let batches = read_batches_at_version(&mut read_table, latest, 10000)
1934 .await
1935 .unwrap();
1936 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1937 assert_eq!(total_rows, 1);
1938
1939 drop(table);
1940 }
1941
1942 #[tokio::test]
1943 async fn test_merge_empty_batch_noop() {
1944 let temp_dir = TempDir::new().unwrap();
1945 let table_path = temp_dir.path().to_str().unwrap();
1946
1947 let schema = test_schema();
1948 let table = open_or_create_table(table_path, HashMap::new(), Some(&schema))
1949 .await
1950 .unwrap();
1951
1952 let empty = RecordBatch::new_empty(test_schema());
1954 let (table, result) =
1955 merge_batches(table, empty, &["id".to_string()], "merge-writer", 1, false)
1956 .await
1957 .unwrap();
1958 assert_eq!(result.rows_inserted, 0);
1959 assert_eq!(result.rows_updated, 0);
1960
1961 let empty_del = RecordBatch::new_empty(test_schema());
1963 let (_table, deleted) =
1964 delete_by_merge(table, empty_del, &["id".to_string()], "merge-writer", 2)
1965 .await
1966 .unwrap();
1967 assert_eq!(deleted, 0);
1968 }
1969}