1use std::sync::Arc;
24use std::time::Instant;
25
26use arrow_array::{Array, RecordBatch};
27use arrow_schema::SchemaRef;
28use async_trait::async_trait;
29use tracing::{debug, info, warn};
30
31#[cfg(feature = "delta-lake")]
32use deltalake::DeltaTable;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::protocol::SaveMode;
36
37use crate::config::{ConnectorConfig, ConnectorState};
38use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
39use crate::error::ConnectorError;
40use crate::health::HealthStatus;
41use crate::metrics::ConnectorMetrics;
42
43use super::delta_config::{DeliveryGuarantee, DeltaLakeSinkConfig, DeltaWriteMode};
44use super::delta_metrics::DeltaLakeSinkMetrics;
45
46pub struct DeltaLakeSink {
74 config: DeltaLakeSinkConfig,
76 schema: Option<SchemaRef>,
78 state: ConnectorState,
80 current_epoch: u64,
82 last_committed_epoch: u64,
84 buffer: Vec<RecordBatch>,
86 buffered_rows: usize,
88 buffered_bytes: u64,
90 pending_files: usize,
92 delta_version: u64,
94 buffer_start_time: Option<Instant>,
96 metrics: DeltaLakeSinkMetrics,
98 #[cfg(feature = "delta-lake")]
100 table: Option<DeltaTable>,
101 epoch_skipped: bool,
103 staged_batches: Vec<RecordBatch>,
107 staged_rows: usize,
109 staged_bytes: u64,
111 #[cfg(feature = "delta-lake")]
113 compaction_cancel: Option<tokio_util::sync::CancellationToken>,
114 #[cfg(feature = "delta-lake")]
116 compaction_handle: Option<tokio::task::JoinHandle<()>>,
117}
118
119impl DeltaLakeSink {
120 #[must_use]
122 pub fn new(config: DeltaLakeSinkConfig) -> Self {
123 Self {
124 config,
125 schema: None,
126 state: ConnectorState::Created,
127 current_epoch: 0,
128 last_committed_epoch: 0,
129 buffer: Vec::with_capacity(16),
130 buffered_rows: 0,
131 buffered_bytes: 0,
132 pending_files: 0,
133 delta_version: 0,
134 buffer_start_time: None,
135 metrics: DeltaLakeSinkMetrics::new(),
136 epoch_skipped: false,
137 staged_batches: Vec::new(),
138 staged_rows: 0,
139 staged_bytes: 0,
140 #[cfg(feature = "delta-lake")]
141 table: None,
142 #[cfg(feature = "delta-lake")]
143 compaction_cancel: None,
144 #[cfg(feature = "delta-lake")]
145 compaction_handle: None,
146 }
147 }
148
149 #[must_use]
151 pub fn with_schema(config: DeltaLakeSinkConfig, schema: SchemaRef) -> Self {
152 let mut sink = Self::new(config);
153 sink.schema = Some(schema);
154 sink
155 }
156
157 #[must_use]
159 pub fn state(&self) -> ConnectorState {
160 self.state
161 }
162
163 #[must_use]
165 pub fn current_epoch(&self) -> u64 {
166 self.current_epoch
167 }
168
169 #[must_use]
171 pub fn last_committed_epoch(&self) -> u64 {
172 self.last_committed_epoch
173 }
174
175 #[must_use]
177 pub fn buffered_rows(&self) -> usize {
178 self.buffered_rows
179 }
180
181 #[must_use]
183 pub fn buffered_bytes(&self) -> u64 {
184 self.buffered_bytes
185 }
186
187 #[must_use]
189 pub fn delta_version(&self) -> u64 {
190 self.delta_version
191 }
192
193 #[must_use]
195 pub fn sink_metrics(&self) -> &DeltaLakeSinkMetrics {
196 &self.metrics
197 }
198
199 #[must_use]
201 pub fn config(&self) -> &DeltaLakeSinkConfig {
202 &self.config
203 }
204
205 #[must_use]
207 pub fn should_flush(&self) -> bool {
208 if self.buffered_rows >= self.config.max_buffer_records {
209 return true;
210 }
211 if self.buffered_bytes >= self.config.target_file_size as u64 {
212 return true;
213 }
214 if let Some(start) = self.buffer_start_time {
215 if start.elapsed() >= self.config.max_buffer_duration {
216 return true;
217 }
218 }
219 false
220 }
221
222 const CDC_METADATA_COLUMNS: &'static [&'static str] = &["_op", "_ts_ms"];
226
227 fn target_schema(batch_schema: &SchemaRef, write_mode: DeltaWriteMode) -> SchemaRef {
228 if write_mode == DeltaWriteMode::Upsert {
229 let fields: Vec<_> = batch_schema
230 .fields()
231 .iter()
232 .filter(|f| !Self::CDC_METADATA_COLUMNS.contains(&f.name().as_str()))
233 .cloned()
234 .collect();
235 Arc::new(arrow_schema::Schema::new(fields))
236 } else {
237 batch_schema.clone()
238 }
239 }
240
241 #[must_use]
243 pub fn estimate_batch_size(batch: &RecordBatch) -> u64 {
244 batch
245 .columns()
246 .iter()
247 .map(|col| col.get_array_memory_size() as u64)
248 .sum()
249 }
250
251 #[cfg(feature = "delta-lake")]
260 async fn flush_staged_to_delta(&mut self) -> Result<WriteResult, ConnectorError> {
261 if self.staged_batches.is_empty() {
262 return Ok(WriteResult::new(0, 0));
263 }
264
265 let total_rows = self.staged_rows;
266 let estimated_bytes = self.staged_bytes;
267
268 let table = self
272 .table
273 .take()
274 .ok_or_else(|| ConnectorError::InvalidState {
275 expected: "table initialized".into(),
276 actual: "table not initialized".into(),
277 })?;
278
279 let batches: Vec<RecordBatch> = self.staged_batches.clone();
282
283 let write_result = if self.config.write_mode == DeltaWriteMode::Upsert {
284 let combined =
286 match arrow_select::concat::concat_batches(&batches[0].schema(), &batches) {
287 Ok(c) => c,
288 Err(e) => {
289 self.table = Some(table);
291 return Err(ConnectorError::Internal(format!(
292 "failed to concat batches: {e}"
293 )));
294 }
295 };
296
297 super::delta_io::merge_changelog(
298 table,
299 combined,
300 &self.config.merge_key_columns,
301 &self.config.writer_id,
302 self.current_epoch,
303 self.config.schema_evolution,
304 )
305 .await
306 .map(|(t, result)| {
307 self.metrics.record_merge();
308 if result.rows_deleted > 0 {
309 self.metrics.record_deletes(result.rows_deleted as u64);
310 }
311 t
312 })
313 } else {
314 let save_mode = match self.config.write_mode {
316 DeltaWriteMode::Append => SaveMode::Append,
317 DeltaWriteMode::Overwrite => SaveMode::Overwrite,
318 DeltaWriteMode::Upsert => unreachable!("handled by the upsert branch above"),
319 };
320
321 let partition_cols = if self.config.partition_columns.is_empty() {
322 None
323 } else {
324 Some(self.config.partition_columns.as_slice())
325 };
326
327 super::delta_io::write_batches(
328 table,
329 batches,
330 &self.config.writer_id,
331 self.current_epoch,
332 save_mode,
333 partition_cols,
334 self.config.schema_evolution,
335 )
336 .await
337 .map(|(t, _version)| t)
338 };
339
340 let table = match write_result {
345 Ok(t) => t,
346 Err(e) => {
347 return Err(e);
350 }
351 };
352
353 #[allow(clippy::cast_sign_loss)]
355 {
356 self.delta_version = table.version().unwrap_or(0) as u64;
357 }
358 self.table = Some(table);
359 self.pending_files = 0;
360
361 self.staged_batches.clear();
363 self.staged_rows = 0;
364 self.staged_bytes = 0;
365
366 self.metrics
367 .record_flush(total_rows as u64, estimated_bytes);
368 self.metrics.record_commit(self.delta_version);
369
370 debug!(
371 rows = total_rows,
372 bytes = estimated_bytes,
373 delta_version = self.delta_version,
374 "Delta Lake: committed staged data to Delta"
375 );
376
377 Ok(WriteResult::new(total_rows, estimated_bytes))
378 }
379
380 #[cfg(not(feature = "delta-lake"))]
383 fn commit_local(&mut self, epoch: u64) {
384 self.delta_version += 1;
385 self.pending_files = 0;
386
387 self.metrics
389 .record_flush(self.staged_rows as u64, self.staged_bytes);
390 self.metrics.record_commit(self.delta_version);
391
392 debug!(
393 epoch,
394 delta_version = self.delta_version,
395 "Delta Lake: committed transaction"
396 );
397 }
398
399 pub fn split_changelog_batch(
412 batch: &RecordBatch,
413 ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
414 let op_idx = batch.schema().index_of("_op").map_err(|_| {
415 ConnectorError::ConfigurationError(
416 "upsert mode requires '_op' column in input schema".into(),
417 )
418 })?;
419
420 let op_array = batch
421 .column(op_idx)
422 .as_any()
423 .downcast_ref::<arrow_array::StringArray>()
424 .ok_or_else(|| {
425 ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
426 })?;
427
428 let len = op_array.len();
430 let mut insert_mask = Vec::with_capacity(len);
431 let mut delete_mask = Vec::with_capacity(len);
432
433 for i in 0..len {
434 if op_array.is_null(i) {
435 insert_mask.push(false);
436 delete_mask.push(false);
437 continue;
438 }
439 match op_array.value(i) {
440 "I" | "U" | "r" => {
441 insert_mask.push(true);
442 delete_mask.push(false);
443 }
444 "D" => {
445 insert_mask.push(false);
446 delete_mask.push(true);
447 }
448 _ => {
449 insert_mask.push(false);
450 delete_mask.push(false);
451 }
452 }
453 }
454
455 let user_col_indices: Vec<usize> = batch
457 .schema()
458 .fields()
459 .iter()
460 .enumerate()
461 .filter(|(_, f)| !f.name().starts_with('_'))
462 .map(|(i, _)| i)
463 .collect();
464
465 let insert_batch = filter_and_project(batch, &insert_mask, &user_col_indices)?;
466 let delete_batch = filter_and_project(batch, &delete_mask, &user_col_indices)?;
467
468 Ok((insert_batch, delete_batch))
469 }
470}
471
472#[cfg(feature = "delta-lake")]
476async fn compaction_loop(
477 table_path: String,
478 storage_options: Arc<std::collections::HashMap<String, String>>,
479 config: super::delta_config::CompactionConfig,
480 vacuum_retention: std::time::Duration,
481 cancel: tokio_util::sync::CancellationToken,
482) {
483 use super::delta_io;
484
485 info!(
486 table_path = %table_path,
487 check_interval_secs = config.check_interval.as_secs(),
488 "compaction background task started"
489 );
490
491 let mut interval = tokio::time::interval(config.check_interval);
492 interval.tick().await;
494
495 loop {
496 tokio::select! {
497 () = cancel.cancelled() => {
498 info!("compaction background task cancelled");
499 return;
500 }
501 _ = interval.tick() => {
502 let table = match delta_io::open_or_create_table(
506 &table_path,
507 (*storage_options).clone(),
508 None,
509 )
510 .await
511 {
512 Ok(t) => t,
513 Err(e) => {
514 warn!(error = %e, "compaction: failed to open table, will retry");
515 continue;
516 }
517 };
518
519 match table.snapshot() {
521 Ok(snapshot) => {
522 let file_count = snapshot.log_data().num_files();
523 if file_count < config.min_files_for_compaction {
524 debug!(
525 file_count,
526 min = config.min_files_for_compaction,
527 "compaction: skipping, not enough files"
528 );
529 continue;
530 }
531 }
532 Err(e) => {
533 warn!(error = %e, "compaction: snapshot failed, skipping tick");
534 continue;
535 }
536 }
537
538 let target_size = config.target_file_size as u64;
540 match delta_io::run_compaction(table, target_size, &config.z_order_columns).await {
541 Ok((table, result)) => {
542 debug!(
543 files_added = result.files_added,
544 files_removed = result.files_removed,
545 "compaction: OPTIMIZE complete"
546 );
547
548 match delta_io::run_vacuum(table, vacuum_retention).await {
550 Ok((_table, files_deleted)) => {
551 debug!(files_deleted, "compaction: VACUUM complete");
552 }
553 Err(e) => {
554 warn!(error = %e, "compaction: VACUUM failed");
555 }
556 }
557 }
558 Err(e) => {
559 warn!(error = %e, "compaction: OPTIMIZE failed");
560 }
561 }
562 }
563 }
564 }
565}
566
567#[async_trait]
568impl SinkConnector for DeltaLakeSink {
569 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
570 self.state = ConnectorState::Initializing;
571
572 if !config.properties().is_empty() {
574 self.config = DeltaLakeSinkConfig::from_config(config)?;
575 }
576
577 info!(
578 table_path = %self.config.table_path,
579 mode = %self.config.write_mode,
580 guarantee = %self.config.delivery_guarantee,
581 "opening Delta Lake sink connector"
582 );
583
584 #[cfg(feature = "delta-lake")]
586 {
587 use super::delta_io;
588
589 let (resolved_path, merged_options) = delta_io::resolve_catalog_options(
591 &self.config.catalog_type,
592 self.config.catalog_database.as_deref(),
593 self.config.catalog_name.as_deref(),
594 self.config.catalog_schema.as_deref(),
595 &self.config.table_path,
596 &self.config.storage_options,
597 &self.config.catalog_properties,
598 )
599 .await?;
600
601 let table = delta_io::open_or_create_table(
602 &resolved_path,
603 merged_options.clone(),
604 self.schema.as_ref(),
605 )
606 .await?;
607
608 if self.schema.is_none() {
610 if let Ok(schema) = delta_io::get_table_schema(&table) {
611 self.schema = Some(schema);
612 }
613 }
614
615 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
617 self.last_committed_epoch =
618 delta_io::get_last_committed_epoch(&table, &self.config.writer_id).await;
619 if self.last_committed_epoch > 0 {
620 info!(
621 writer_id = %self.config.writer_id,
622 last_committed_epoch = self.last_committed_epoch,
623 "recovered last committed epoch from Delta Lake txn metadata"
624 );
625 }
626 }
627
628 #[allow(clippy::cast_sign_loss)]
632 {
633 self.delta_version = table.version().unwrap_or(0) as u64;
634 }
635 self.table = Some(table);
636
637 if self.config.compaction.enabled {
639 let cancel = tokio_util::sync::CancellationToken::new();
640 let handle = tokio::spawn(compaction_loop(
641 resolved_path.clone(),
642 Arc::new(merged_options),
643 self.config.compaction.clone(),
644 self.config.vacuum_retention,
645 cancel.clone(),
646 ));
647 self.compaction_cancel = Some(cancel);
648 self.compaction_handle = Some(handle);
649 }
650 }
651
652 #[cfg(not(feature = "delta-lake"))]
653 {
654 self.state = ConnectorState::Failed;
655 return Err(ConnectorError::ConfigurationError(
656 "Delta Lake sink requires the 'delta-lake' feature to be enabled. \
657 Build with: cargo build --features delta-lake"
658 .into(),
659 ));
660 }
661
662 #[cfg(feature = "delta-lake")]
663 {
664 self.state = ConnectorState::Running;
665 info!("Delta Lake sink connector opened successfully");
666 Ok(())
667 }
668 }
669
670 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
671 if self.state != ConnectorState::Running {
672 return Err(ConnectorError::InvalidState {
673 expected: "Running".into(),
674 actual: self.state.to_string(),
675 });
676 }
677
678 if batch.num_rows() == 0 {
679 return Ok(WriteResult::new(0, 0));
680 }
681
682 if self.epoch_skipped {
683 return Ok(WriteResult::new(0, 0));
684 }
685
686 if self.schema.is_none() {
689 self.schema = Some(Self::target_schema(&batch.schema(), self.config.write_mode));
690 }
691
692 let num_rows = batch.num_rows();
693 let estimated_bytes = Self::estimate_batch_size(batch);
694
695 if self.buffer_start_time.is_none() {
697 self.buffer_start_time = Some(Instant::now());
698 }
699 self.buffer.push(batch.clone());
700 self.buffered_rows += num_rows;
701 self.buffered_bytes += estimated_bytes;
702
703 Ok(WriteResult::new(0, 0))
706 }
707
708 fn schema(&self) -> SchemaRef {
709 self.schema
710 .clone()
711 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
712 }
713
714 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
715 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
717 && epoch <= self.last_committed_epoch
718 {
719 warn!(
720 epoch,
721 last_committed = self.last_committed_epoch,
722 "Delta Lake: skipping already-committed epoch"
723 );
724 self.epoch_skipped = true;
725 return Ok(());
726 }
727
728 self.epoch_skipped = false;
729 self.current_epoch = epoch;
730 self.buffer.clear();
731 self.buffered_rows = 0;
732 self.buffered_bytes = 0;
733 self.pending_files = 0;
734 self.buffer_start_time = None;
735
736 debug!(epoch, "Delta Lake: began epoch");
737 Ok(())
738 }
739
740 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
741 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
743 && epoch <= self.last_committed_epoch
744 {
745 return Ok(());
746 }
747
748 if !self.buffer.is_empty() {
752 self.staged_batches = std::mem::take(&mut self.buffer);
753 self.staged_rows = self.buffered_rows;
754 self.staged_bytes = self.buffered_bytes;
755 self.buffered_rows = 0;
756 self.buffered_bytes = 0;
757 self.buffer_start_time = None;
758 }
759
760 #[cfg(not(feature = "delta-lake"))]
761 {
762 self.pending_files += 1;
763 }
764
765 debug!(epoch, "Delta Lake: pre-committed (batches staged)");
766 Ok(())
767 }
768
769 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
770 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
772 && epoch <= self.last_committed_epoch
773 {
774 return Ok(());
775 }
776
777 #[cfg(feature = "delta-lake")]
779 {
780 if !self.staged_batches.is_empty() {
781 self.flush_staged_to_delta().await?;
782 }
783 }
784 #[cfg(not(feature = "delta-lake"))]
785 {
786 if self.pending_files > 0 || !self.staged_batches.is_empty() {
787 self.commit_local(epoch);
788 self.staged_batches.clear();
789 self.staged_rows = 0;
790 self.staged_bytes = 0;
791 }
792 }
793
794 self.last_committed_epoch = epoch;
795
796 info!(
797 epoch,
798 delta_version = self.delta_version,
799 "Delta Lake: committed epoch"
800 );
801
802 Ok(())
803 }
804
805 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
806 self.buffer.clear();
809 self.buffered_rows = 0;
810 self.buffered_bytes = 0;
811 self.pending_files = 0;
812 self.buffer_start_time = None;
813 self.staged_batches.clear();
814 self.staged_rows = 0;
815 self.staged_bytes = 0;
816
817 self.epoch_skipped = false;
818 self.metrics.record_rollback();
819 warn!(epoch, "Delta Lake: rolled back epoch");
820 Ok(())
821 }
822
823 fn health_check(&self) -> HealthStatus {
824 match self.state {
825 ConnectorState::Running => HealthStatus::Healthy,
826 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
827 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
828 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
829 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
830 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
831 }
832 }
833
834 fn metrics(&self) -> ConnectorMetrics {
835 self.metrics.to_connector_metrics()
836 }
837
838 fn capabilities(&self) -> SinkConnectorCapabilities {
839 let mut caps = SinkConnectorCapabilities::default().with_idempotent();
840
841 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
842 caps = caps.with_exactly_once().with_two_phase_commit();
843 }
844 if self.config.write_mode == DeltaWriteMode::Upsert {
845 caps = caps.with_upsert().with_changelog();
846 }
847 if self.config.schema_evolution {
848 caps = caps.with_schema_evolution();
849 }
850 if !self.config.partition_columns.is_empty() {
851 caps = caps.with_partitioned();
852 }
853
854 caps
855 }
856
857 async fn flush(&mut self) -> Result<(), ConnectorError> {
858 if self.buffer.len() > 1 {
861 let schema = self.buffer[0].schema();
862 let combined = arrow_select::concat::concat_batches(&schema, &self.buffer)
863 .map_err(|e| ConnectorError::Internal(format!("concat failed: {e}")))?;
864 self.buffer.clear();
865 self.buffer.push(combined);
866 }
867 Ok(())
868 }
869
870 async fn close(&mut self) -> Result<(), ConnectorError> {
871 info!("closing Delta Lake sink connector");
872
873 if !self.buffer.is_empty() {
875 self.pre_commit(self.current_epoch).await?;
876 self.commit_epoch(self.current_epoch).await?;
877 }
878
879 #[cfg(feature = "delta-lake")]
881 {
882 if let Some(cancel) = self.compaction_cancel.take() {
883 cancel.cancel();
884 }
885 if let Some(handle) = self.compaction_handle.take() {
886 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
888 }
889 }
890
891 #[cfg(feature = "delta-lake")]
893 {
894 self.table = None;
895 }
896
897 self.state = ConnectorState::Closed;
898
899 info!(
900 table_path = %self.config.table_path,
901 delta_version = self.delta_version,
902 "Delta Lake sink connector closed"
903 );
904
905 Ok(())
906 }
907}
908
909impl std::fmt::Debug for DeltaLakeSink {
910 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
911 f.debug_struct("DeltaLakeSink")
912 .field("state", &self.state)
913 .field("table_path", &self.config.table_path)
914 .field("mode", &self.config.write_mode)
915 .field("guarantee", &self.config.delivery_guarantee)
916 .field("current_epoch", &self.current_epoch)
917 .field("last_committed_epoch", &self.last_committed_epoch)
918 .field("buffered_rows", &self.buffered_rows)
919 .field("delta_version", &self.delta_version)
920 .field("epoch_skipped", &self.epoch_skipped)
921 .finish_non_exhaustive()
922 }
923}
924
925fn filter_and_project(
931 batch: &RecordBatch,
932 mask: &[bool],
933 col_indices: &[usize],
934) -> Result<RecordBatch, ConnectorError> {
935 use arrow_array::BooleanArray;
936 use arrow_select::filter::filter_record_batch;
937
938 let bool_array = BooleanArray::from(mask.to_vec());
939
940 let filtered = filter_record_batch(batch, &bool_array)
942 .map_err(|e| ConnectorError::Internal(format!("arrow filter failed: {e}")))?;
943
944 filtered
945 .project(col_indices)
946 .map_err(|e| ConnectorError::Internal(format!("batch projection failed: {e}")))
947}
948
949#[cfg(test)]
950#[allow(clippy::cast_possible_wrap)]
951#[allow(clippy::cast_precision_loss)]
952#[allow(clippy::float_cmp)]
953mod tests {
954 use super::*;
955 use arrow_array::{Float64Array, Int64Array, StringArray};
956 use arrow_schema::{DataType, Field, Schema};
957
958 fn test_schema() -> SchemaRef {
959 Arc::new(Schema::new(vec![
960 Field::new("id", DataType::Int64, false),
961 Field::new("name", DataType::Utf8, true),
962 Field::new("value", DataType::Float64, true),
963 ]))
964 }
965
966 fn test_config() -> DeltaLakeSinkConfig {
967 DeltaLakeSinkConfig::new("/tmp/delta_test")
968 }
969
970 fn upsert_config() -> DeltaLakeSinkConfig {
971 let mut cfg = test_config();
972 cfg.write_mode = DeltaWriteMode::Upsert;
973 cfg.merge_key_columns = vec!["id".to_string()];
974 cfg
975 }
976
977 fn test_batch(n: usize) -> RecordBatch {
978 let ids: Vec<i64> = (0..n as i64).collect();
979 let names: Vec<&str> = (0..n).map(|_| "test").collect();
980 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
981
982 RecordBatch::try_new(
983 test_schema(),
984 vec![
985 Arc::new(Int64Array::from(ids)),
986 Arc::new(StringArray::from(names)),
987 Arc::new(Float64Array::from(values)),
988 ],
989 )
990 .unwrap()
991 }
992
993 #[test]
996 fn test_new_defaults() {
997 let sink = DeltaLakeSink::new(test_config());
998 assert_eq!(sink.state(), ConnectorState::Created);
999 assert_eq!(sink.current_epoch(), 0);
1000 assert_eq!(sink.last_committed_epoch(), 0);
1001 assert_eq!(sink.buffered_rows(), 0);
1002 assert_eq!(sink.buffered_bytes(), 0);
1003 assert_eq!(sink.delta_version(), 0);
1004 assert!(sink.schema.is_none());
1005 }
1006
1007 #[test]
1008 fn test_with_schema() {
1009 let schema = test_schema();
1010 let sink = DeltaLakeSink::with_schema(test_config(), schema.clone());
1011 assert_eq!(sink.schema(), schema);
1012 }
1013
1014 #[test]
1015 fn test_schema_empty_when_none() {
1016 let sink = DeltaLakeSink::new(test_config());
1017 let schema = sink.schema();
1018 assert_eq!(schema.fields().len(), 0);
1019 }
1020
1021 #[test]
1024 fn test_estimate_batch_size() {
1025 let batch = test_batch(100);
1026 let size = DeltaLakeSink::estimate_batch_size(&batch);
1027 assert!(size > 0);
1028 }
1029
1030 #[test]
1031 fn test_estimate_batch_size_empty() {
1032 let batch = RecordBatch::new_empty(test_schema());
1033 let size = DeltaLakeSink::estimate_batch_size(&batch);
1034 assert!(size < 1024);
1037 }
1038
1039 #[test]
1042 fn test_should_flush_by_rows() {
1043 let mut config = test_config();
1044 config.max_buffer_records = 100;
1045 let mut sink = DeltaLakeSink::new(config);
1046 sink.buffered_rows = 99;
1047 assert!(!sink.should_flush());
1048 sink.buffered_rows = 100;
1049 assert!(sink.should_flush());
1050 }
1051
1052 #[test]
1053 fn test_should_flush_by_bytes() {
1054 let mut config = test_config();
1055 config.target_file_size = 1000;
1056 let mut sink = DeltaLakeSink::new(config);
1057 sink.buffered_bytes = 999;
1058 assert!(!sink.should_flush());
1059 sink.buffered_bytes = 1000;
1060 assert!(sink.should_flush());
1061 }
1062
1063 #[test]
1064 fn test_should_flush_empty() {
1065 let sink = DeltaLakeSink::new(test_config());
1066 assert!(!sink.should_flush());
1067 }
1068
1069 #[tokio::test]
1072 async fn test_write_batch_buffering() {
1073 let mut config = test_config();
1074 config.max_buffer_records = 100;
1075 let mut sink = DeltaLakeSink::new(config);
1076 sink.state = ConnectorState::Running;
1077
1078 let batch = test_batch(10);
1079 let result = sink.write_batch(&batch).await.unwrap();
1080
1081 assert_eq!(result.records_written, 0);
1083 assert_eq!(sink.buffered_rows(), 10);
1084 assert!(sink.buffered_bytes() > 0);
1085 }
1086
1087 #[tokio::test]
1088 async fn test_write_batch_buffers_without_commit() {
1089 let mut config = test_config();
1090 config.max_buffer_records = 10;
1091 let mut sink = DeltaLakeSink::new(config);
1092 sink.state = ConnectorState::Running;
1093
1094 let batch = test_batch(6);
1096 sink.write_batch(&batch).await.unwrap();
1097 sink.write_batch(&batch).await.unwrap();
1098
1099 assert_eq!(sink.buffered_rows(), 12);
1100 assert_eq!(sink.buffer.len(), 2);
1101 assert_eq!(sink.pending_files, 0);
1102 }
1103
1104 #[tokio::test]
1105 async fn test_write_batch_empty() {
1106 let mut sink = DeltaLakeSink::new(test_config());
1107 sink.state = ConnectorState::Running;
1108
1109 let batch = test_batch(0);
1110 let result = sink.write_batch(&batch).await.unwrap();
1111 assert_eq!(result.records_written, 0);
1112 assert_eq!(sink.buffered_rows(), 0);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_write_batch_not_running() {
1117 let mut sink = DeltaLakeSink::new(test_config());
1118 let batch = test_batch(10);
1121 let result = sink.write_batch(&batch).await;
1122 assert!(result.is_err());
1123 }
1124
1125 #[tokio::test]
1126 async fn test_write_batch_sets_schema() {
1127 let mut sink = DeltaLakeSink::new(test_config());
1128 sink.state = ConnectorState::Running;
1129 assert!(sink.schema.is_none());
1130
1131 let batch = test_batch(5);
1132 sink.write_batch(&batch).await.unwrap();
1133 assert!(sink.schema.is_some());
1134 assert_eq!(sink.schema.unwrap().fields().len(), 3);
1135 }
1136
1137 #[tokio::test]
1138 async fn test_multiple_write_batches_accumulate() {
1139 let mut config = test_config();
1140 config.max_buffer_records = 100;
1141 let mut sink = DeltaLakeSink::new(config);
1142 sink.state = ConnectorState::Running;
1143
1144 let batch = test_batch(10);
1145 sink.write_batch(&batch).await.unwrap();
1146 sink.write_batch(&batch).await.unwrap();
1147 sink.write_batch(&batch).await.unwrap();
1148
1149 assert_eq!(sink.buffered_rows(), 30);
1150 }
1151
1152 #[cfg(not(feature = "delta-lake"))]
1158 #[tokio::test]
1159 async fn test_epoch_lifecycle() {
1160 let mut sink = DeltaLakeSink::new(test_config());
1161 sink.state = ConnectorState::Running;
1162
1163 sink.begin_epoch(1).await.unwrap();
1165 assert_eq!(sink.current_epoch(), 1);
1166
1167 let batch = test_batch(10);
1169 sink.write_batch(&batch).await.unwrap();
1170
1171 sink.pre_commit(1).await.unwrap();
1173 assert_eq!(sink.buffered_rows(), 0);
1174 sink.commit_epoch(1).await.unwrap();
1175 assert_eq!(sink.last_committed_epoch(), 1);
1176 assert_eq!(sink.delta_version(), 1);
1177
1178 let m = sink.metrics();
1180 let commits = m.custom.iter().find(|(k, _)| k == "delta.commits");
1181 assert_eq!(commits.unwrap().1, 1.0);
1182 }
1183
1184 #[cfg(not(feature = "delta-lake"))]
1185 #[tokio::test]
1186 async fn test_epoch_skip_already_committed() {
1187 let mut config = test_config();
1188 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1189 let mut sink = DeltaLakeSink::new(config);
1190 sink.state = ConnectorState::Running;
1191
1192 sink.begin_epoch(1).await.unwrap();
1194 let batch = test_batch(5);
1195 sink.write_batch(&batch).await.unwrap();
1196 sink.pre_commit(1).await.unwrap();
1197 sink.commit_epoch(1).await.unwrap();
1198 assert_eq!(sink.last_committed_epoch(), 1);
1199
1200 sink.begin_epoch(1).await.unwrap();
1202 sink.pre_commit(1).await.unwrap();
1207 sink.commit_epoch(1).await.unwrap();
1208 assert_eq!(sink.last_committed_epoch(), 1);
1209 assert_eq!(sink.delta_version(), 1); }
1211
1212 #[cfg(not(feature = "delta-lake"))]
1213 #[tokio::test]
1214 async fn test_epoch_at_least_once_no_skip() {
1215 let mut config = test_config();
1216 config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
1217 let mut sink = DeltaLakeSink::new(config);
1218 sink.state = ConnectorState::Running;
1219
1220 sink.begin_epoch(1).await.unwrap();
1221 let batch = test_batch(5);
1222 sink.write_batch(&batch).await.unwrap();
1223 sink.pre_commit(1).await.unwrap();
1224 sink.commit_epoch(1).await.unwrap();
1225
1226 sink.begin_epoch(1).await.unwrap();
1228 assert_eq!(sink.current_epoch(), 1);
1229 assert_eq!(sink.buffered_rows(), 0); }
1231
1232 #[tokio::test]
1233 async fn test_rollback_clears_buffer() {
1234 let mut config = test_config();
1235 config.max_buffer_records = 1000;
1236 let mut sink = DeltaLakeSink::new(config);
1237 sink.state = ConnectorState::Running;
1238
1239 let batch = test_batch(50);
1240 sink.write_batch(&batch).await.unwrap();
1241 assert_eq!(sink.buffered_rows(), 50);
1242
1243 sink.rollback_epoch(0).await.unwrap();
1244 assert_eq!(sink.buffered_rows(), 0);
1245 assert_eq!(sink.buffered_bytes(), 0);
1246 assert_eq!(sink.pending_files, 0);
1247 }
1248
1249 #[tokio::test]
1252 async fn test_rollback_after_pre_commit_discards_staged() {
1253 let mut config = test_config();
1254 config.max_buffer_records = 1000;
1255 let mut sink = DeltaLakeSink::new(config);
1256 sink.state = ConnectorState::Running;
1257
1258 sink.begin_epoch(1).await.unwrap();
1259 let batch = test_batch(50);
1260 sink.write_batch(&batch).await.unwrap();
1261 assert_eq!(sink.buffered_rows(), 50);
1262
1263 sink.pre_commit(1).await.unwrap();
1265 assert_eq!(sink.buffered_rows(), 0);
1266 assert_eq!(sink.staged_rows, 50);
1267 assert!(!sink.staged_batches.is_empty());
1268
1269 sink.rollback_epoch(1).await.unwrap();
1271 assert_eq!(sink.buffered_rows(), 0);
1272 assert_eq!(sink.staged_rows, 0);
1273 assert_eq!(sink.staged_bytes, 0);
1274 assert!(sink.staged_batches.is_empty());
1275 assert_eq!(sink.delta_version(), 0); }
1277
1278 #[tokio::test]
1282 async fn test_staged_data_preserved_until_commit_or_rollback() {
1283 let mut config = test_config();
1284 config.max_buffer_records = 1000;
1285 let mut sink = DeltaLakeSink::new(config);
1286 sink.state = ConnectorState::Running;
1287
1288 sink.begin_epoch(1).await.unwrap();
1289 sink.write_batch(&test_batch(25)).await.unwrap();
1290 sink.write_batch(&test_batch(25)).await.unwrap();
1291
1292 sink.pre_commit(1).await.unwrap();
1294 assert_eq!(sink.staged_rows, 50);
1295 assert_eq!(sink.staged_batches.len(), 2);
1296 assert_eq!(sink.buffered_rows(), 0);
1297
1298 sink.rollback_epoch(1).await.unwrap();
1306 assert!(sink.staged_batches.is_empty());
1307 assert_eq!(sink.staged_rows, 0);
1308 assert_eq!(sink.staged_bytes, 0);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_commit_empty_epoch() {
1313 let mut sink = DeltaLakeSink::new(test_config());
1314 sink.state = ConnectorState::Running;
1315
1316 sink.begin_epoch(1).await.unwrap();
1317 sink.commit_epoch(1).await.unwrap();
1319 assert_eq!(sink.last_committed_epoch(), 1);
1320 assert_eq!(sink.delta_version(), 0); }
1322
1323 #[cfg(not(feature = "delta-lake"))]
1324 #[tokio::test]
1325 async fn test_sequential_epochs() {
1326 let mut sink = DeltaLakeSink::new(test_config());
1327 sink.state = ConnectorState::Running;
1328
1329 for epoch in 1..=5 {
1330 sink.begin_epoch(epoch).await.unwrap();
1331 let batch = test_batch(10);
1332 sink.write_batch(&batch).await.unwrap();
1333 sink.pre_commit(epoch).await.unwrap();
1334 sink.commit_epoch(epoch).await.unwrap();
1335 }
1336
1337 assert_eq!(sink.last_committed_epoch(), 5);
1338 assert_eq!(sink.delta_version(), 5);
1339 }
1340
1341 #[tokio::test]
1345 async fn test_flush_coalesces_buffer() {
1346 let mut sink = DeltaLakeSink::new(test_config());
1347 sink.state = ConnectorState::Running;
1348
1349 let batch = test_batch(10);
1350 sink.write_batch(&batch).await.unwrap();
1351 sink.write_batch(&batch).await.unwrap();
1352 assert_eq!(sink.buffer.len(), 2);
1353
1354 sink.flush().await.unwrap();
1356 assert_eq!(sink.buffer.len(), 1);
1357 assert_eq!(sink.buffered_rows(), 20);
1358 }
1359
1360 #[cfg(not(feature = "delta-lake"))]
1366 #[tokio::test]
1367 async fn test_open_requires_feature() {
1368 let mut sink = DeltaLakeSink::new(test_config());
1369
1370 let connector_config = ConnectorConfig::new("delta-lake");
1371 let result = sink.open(&connector_config).await;
1372
1373 assert!(result.is_err());
1374 let err = result.unwrap_err().to_string();
1375 assert!(err.contains("delta-lake"), "error: {err}");
1376 }
1377
1378 #[tokio::test]
1379 async fn test_close() {
1380 let mut sink = DeltaLakeSink::new(test_config());
1381 sink.state = ConnectorState::Running;
1382
1383 sink.close().await.unwrap();
1384 assert_eq!(sink.state(), ConnectorState::Closed);
1385 }
1386
1387 #[cfg(not(feature = "delta-lake"))]
1388 #[tokio::test]
1389 async fn test_close_flushes_remaining() {
1390 let mut config = test_config();
1391 config.max_buffer_records = 1000;
1392 let mut sink = DeltaLakeSink::new(config);
1393 sink.state = ConnectorState::Running;
1394
1395 let batch = test_batch(30);
1396 sink.write_batch(&batch).await.unwrap();
1397 assert_eq!(sink.buffered_rows(), 30);
1398
1399 sink.close().await.unwrap();
1400 assert_eq!(sink.buffered_rows(), 0);
1401
1402 let m = sink.metrics();
1403 assert_eq!(m.records_total, 30);
1404 }
1405
1406 #[test]
1409 fn test_health_check_created() {
1410 let sink = DeltaLakeSink::new(test_config());
1411 assert_eq!(sink.health_check(), HealthStatus::Unknown);
1412 }
1413
1414 #[test]
1415 fn test_health_check_running() {
1416 let mut sink = DeltaLakeSink::new(test_config());
1417 sink.state = ConnectorState::Running;
1418 assert_eq!(sink.health_check(), HealthStatus::Healthy);
1419 }
1420
1421 #[test]
1422 fn test_health_check_closed() {
1423 let mut sink = DeltaLakeSink::new(test_config());
1424 sink.state = ConnectorState::Closed;
1425 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1426 }
1427
1428 #[test]
1429 fn test_health_check_failed() {
1430 let mut sink = DeltaLakeSink::new(test_config());
1431 sink.state = ConnectorState::Failed;
1432 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1433 }
1434
1435 #[test]
1436 fn test_health_check_paused() {
1437 let mut sink = DeltaLakeSink::new(test_config());
1438 sink.state = ConnectorState::Paused;
1439 assert!(matches!(sink.health_check(), HealthStatus::Degraded(_)));
1440 }
1441
1442 #[test]
1445 fn test_capabilities_append_exactly_once() {
1446 let mut config = test_config();
1447 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1448 let sink = DeltaLakeSink::new(config);
1449 let caps = sink.capabilities();
1450 assert!(caps.exactly_once);
1451 assert!(caps.idempotent);
1452 assert!(!caps.upsert);
1453 assert!(!caps.changelog);
1454 assert!(!caps.schema_evolution);
1455 assert!(!caps.partitioned);
1456 }
1457
1458 #[test]
1459 fn test_capabilities_upsert() {
1460 let sink = DeltaLakeSink::new(upsert_config());
1461 let caps = sink.capabilities();
1462 assert!(caps.upsert);
1463 assert!(caps.changelog);
1464 assert!(caps.idempotent);
1465 }
1466
1467 #[test]
1468 fn test_capabilities_schema_evolution() {
1469 let mut config = test_config();
1470 config.schema_evolution = true;
1471 let sink = DeltaLakeSink::new(config);
1472 let caps = sink.capabilities();
1473 assert!(caps.schema_evolution);
1474 }
1475
1476 #[test]
1477 fn test_capabilities_partitioned() {
1478 let mut config = test_config();
1479 config.partition_columns = vec!["trade_date".to_string()];
1480 let sink = DeltaLakeSink::new(config);
1481 let caps = sink.capabilities();
1482 assert!(caps.partitioned);
1483 }
1484
1485 #[test]
1486 fn test_capabilities_at_least_once() {
1487 let mut config = test_config();
1488 config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
1489 let sink = DeltaLakeSink::new(config);
1490 let caps = sink.capabilities();
1491 assert!(!caps.exactly_once);
1492 assert!(caps.idempotent);
1493 }
1494
1495 #[test]
1498 fn test_metrics_initial() {
1499 let sink = DeltaLakeSink::new(test_config());
1500 let m = sink.metrics();
1501 assert_eq!(m.records_total, 0);
1502 assert_eq!(m.bytes_total, 0);
1503 assert_eq!(m.errors_total, 0);
1504 }
1505
1506 #[cfg(not(feature = "delta-lake"))]
1509 #[tokio::test]
1510 async fn test_metrics_after_commit() {
1511 let mut sink = DeltaLakeSink::new(test_config());
1512 sink.state = ConnectorState::Running;
1513
1514 let batch = test_batch(10);
1515 sink.write_batch(&batch).await.unwrap();
1516 assert_eq!(sink.buffered_rows(), 10);
1517
1518 sink.pre_commit(0).await.unwrap();
1520 sink.commit_epoch(0).await.unwrap();
1521 let m = sink.metrics();
1522 assert_eq!(m.records_total, 10);
1523 assert!(m.bytes_total > 0);
1524 }
1525
1526 fn changelog_schema() -> SchemaRef {
1529 Arc::new(Schema::new(vec![
1530 Field::new("id", DataType::Int64, false),
1531 Field::new("name", DataType::Utf8, true),
1532 Field::new("_op", DataType::Utf8, false),
1533 Field::new("_ts_ms", DataType::Int64, false),
1534 ]))
1535 }
1536
1537 fn changelog_batch() -> RecordBatch {
1538 RecordBatch::try_new(
1539 changelog_schema(),
1540 vec![
1541 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1542 Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1543 Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1544 Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1545 ],
1546 )
1547 .unwrap()
1548 }
1549
1550 #[test]
1551 fn test_split_changelog_batch() {
1552 let batch = changelog_batch();
1553 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1554
1555 assert_eq!(inserts.num_rows(), 3);
1557 assert_eq!(deletes.num_rows(), 2);
1559
1560 assert_eq!(inserts.num_columns(), 2); assert_eq!(deletes.num_columns(), 2);
1563
1564 let insert_ids = inserts
1566 .column(0)
1567 .as_any()
1568 .downcast_ref::<Int64Array>()
1569 .unwrap();
1570 assert_eq!(insert_ids.value(0), 1);
1571 assert_eq!(insert_ids.value(1), 2);
1572 assert_eq!(insert_ids.value(2), 4);
1573
1574 let delete_ids = deletes
1576 .column(0)
1577 .as_any()
1578 .downcast_ref::<Int64Array>()
1579 .unwrap();
1580 assert_eq!(delete_ids.value(0), 3);
1581 assert_eq!(delete_ids.value(1), 5);
1582 }
1583
1584 #[test]
1585 fn test_split_changelog_all_inserts() {
1586 let schema = changelog_schema();
1587 let batch = RecordBatch::try_new(
1588 schema,
1589 vec![
1590 Arc::new(Int64Array::from(vec![1, 2])),
1591 Arc::new(StringArray::from(vec!["a", "b"])),
1592 Arc::new(StringArray::from(vec!["I", "I"])),
1593 Arc::new(Int64Array::from(vec![100, 200])),
1594 ],
1595 )
1596 .unwrap();
1597
1598 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1599 assert_eq!(inserts.num_rows(), 2);
1600 assert_eq!(deletes.num_rows(), 0);
1601 }
1602
1603 #[test]
1604 fn test_split_changelog_all_deletes() {
1605 let schema = changelog_schema();
1606 let batch = RecordBatch::try_new(
1607 schema,
1608 vec![
1609 Arc::new(Int64Array::from(vec![1, 2])),
1610 Arc::new(StringArray::from(vec!["a", "b"])),
1611 Arc::new(StringArray::from(vec!["D", "D"])),
1612 Arc::new(Int64Array::from(vec![100, 200])),
1613 ],
1614 )
1615 .unwrap();
1616
1617 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1618 assert_eq!(inserts.num_rows(), 0);
1619 assert_eq!(deletes.num_rows(), 2);
1620 }
1621
1622 #[test]
1623 fn test_split_changelog_missing_op_column() {
1624 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1625 let batch =
1626 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
1627
1628 let result = DeltaLakeSink::split_changelog_batch(&batch);
1629 assert!(result.is_err());
1630 }
1631
1632 #[test]
1633 fn test_split_changelog_snapshot_read() {
1634 let schema = changelog_schema();
1635 let batch = RecordBatch::try_new(
1636 schema,
1637 vec![
1638 Arc::new(Int64Array::from(vec![1])),
1639 Arc::new(StringArray::from(vec!["a"])),
1640 Arc::new(StringArray::from(vec!["r"])), Arc::new(Int64Array::from(vec![100])),
1642 ],
1643 )
1644 .unwrap();
1645
1646 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1647 assert_eq!(inserts.num_rows(), 1);
1648 assert_eq!(deletes.num_rows(), 0);
1649 }
1650
1651 #[test]
1654 fn test_debug_output() {
1655 let sink = DeltaLakeSink::new(test_config());
1656 let debug = format!("{sink:?}");
1657 assert!(debug.contains("DeltaLakeSink"));
1658 assert!(debug.contains("/tmp/delta_test"));
1659 }
1660}