1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::{Array, RecordBatch};
27use arrow_schema::SchemaRef;
28use async_trait::async_trait;
29use tracing::{debug, info, warn};
30
31#[cfg(feature = "delta-lake")]
32use deltalake::DeltaTable;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::protocol::SaveMode;
36
37use crate::config::{ConnectorConfig, ConnectorState};
38use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
39use crate::error::ConnectorError;
40use crate::health::HealthStatus;
41use crate::metrics::ConnectorMetrics;
42
43use super::delta_config::{DeltaLakeSinkConfig, DeltaWriteMode};
44use super::delta_metrics::DeltaLakeSinkMetrics;
45use crate::connector::DeliveryGuarantee;
46
47pub struct DeltaLakeSink {
75 config: DeltaLakeSinkConfig,
77 schema: Option<SchemaRef>,
79 state: ConnectorState,
81 current_epoch: u64,
83 last_committed_epoch: u64,
85 buffer: Vec<RecordBatch>,
87 buffered_rows: usize,
89 buffered_bytes: u64,
91 delta_version: u64,
93 buffer_start_time: Option<Instant>,
95 metrics: DeltaLakeSinkMetrics,
97 #[cfg(feature = "delta-lake")]
99 table: Option<DeltaTable>,
100 epoch_skipped: bool,
102 staged_batches: Vec<RecordBatch>,
106 staged_rows: usize,
108 staged_bytes: u64,
110 #[cfg(feature = "delta-lake")]
114 resolved_table_path: String,
115 #[cfg(feature = "delta-lake")]
117 resolved_storage_options: std::collections::HashMap<String, String>,
118 #[cfg(feature = "delta-lake")]
120 compaction_cancel: Option<tokio_util::sync::CancellationToken>,
121 #[cfg(feature = "delta-lake")]
123 compaction_handle: Option<tokio::task::JoinHandle<()>>,
124 #[cfg(feature = "delta-lake")]
128 needs_deferred_delta_init: bool,
129 #[cfg(feature = "delta-lake")]
132 pending_reopen: Option<tokio::task::JoinHandle<Result<DeltaTable, ConnectorError>>>,
133 #[cfg(feature = "delta-lake")]
138 cached_writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
139 #[cfg(feature = "delta-lake")]
144 merge_session: Option<datafusion::prelude::SessionContext>,
145}
146
147impl DeltaLakeSink {
148 #[must_use]
150 pub fn new(config: DeltaLakeSinkConfig, registry: Option<&prometheus::Registry>) -> Self {
151 Self {
152 config,
153 schema: None,
154 state: ConnectorState::Created,
155 current_epoch: 0,
156 last_committed_epoch: 0,
157 buffer: Vec::with_capacity(16),
158 buffered_rows: 0,
159 buffered_bytes: 0,
160 delta_version: 0,
161 buffer_start_time: None,
162 metrics: DeltaLakeSinkMetrics::new(registry),
163 epoch_skipped: false,
164 staged_batches: Vec::new(),
165 staged_rows: 0,
166 staged_bytes: 0,
167 #[cfg(feature = "delta-lake")]
168 table: None,
169 #[cfg(feature = "delta-lake")]
170 resolved_table_path: String::new(),
171 #[cfg(feature = "delta-lake")]
172 resolved_storage_options: std::collections::HashMap::new(),
173 #[cfg(feature = "delta-lake")]
174 compaction_cancel: None,
175 #[cfg(feature = "delta-lake")]
176 compaction_handle: None,
177 #[cfg(feature = "delta-lake")]
178 needs_deferred_delta_init: false,
179 #[cfg(feature = "delta-lake")]
180 pending_reopen: None,
181 #[cfg(feature = "delta-lake")]
182 cached_writer_properties: None,
183 #[cfg(feature = "delta-lake")]
184 merge_session: None,
185 }
186 }
187
188 #[must_use]
190 pub fn with_schema(config: DeltaLakeSinkConfig, schema: SchemaRef) -> Self {
191 let mut sink = Self::new(config, None);
192 sink.schema = Some(schema);
193 sink
194 }
195
196 #[cfg(feature = "delta-lake")]
201 async fn init_delta_table(&mut self) -> Result<(), ConnectorError> {
202 use super::delta_io;
203
204 #[cfg(feature = "delta-lake-unity")]
207 ensure_uc_table_exists(&self.config, self.schema.as_ref()).await?;
208
209 let (resolved_path, mut merged_options) = delta_io::resolve_catalog_options(
212 &self.config.catalog_type,
213 self.config.catalog_database.as_deref(),
214 self.config.catalog_name.as_deref(),
215 self.config.catalog_schema.as_deref(),
216 &self.config.table_path,
217 &self.config.storage_options,
218 )
219 .await?;
220
221 merged_options
225 .entry("timeout".to_string())
226 .or_insert_with(|| "120s".to_string());
227 merged_options
228 .entry("connect_timeout".to_string())
229 .or_insert_with(|| "30s".to_string());
230 merged_options
231 .entry("pool_idle_timeout".to_string())
232 .or_insert_with(|| "60s".to_string());
233
234 self.resolved_table_path.clone_from(&resolved_path);
236 self.resolved_storage_options.clone_from(&merged_options);
237
238 let init_timeout = self
239 .config
240 .write_timeout
241 .max(std::time::Duration::from_secs(120));
242 let table = tokio::time::timeout(
243 init_timeout,
244 delta_io::open_or_create_table(
245 &resolved_path,
246 merged_options.clone(),
247 self.schema.as_ref(),
248 ),
249 )
250 .await
251 .map_err(|_| {
252 ConnectorError::ConnectionFailed(format!(
253 "Delta table init timed out after {}s",
254 init_timeout.as_secs()
255 ))
256 })??;
257
258 if self.schema.is_none() {
260 if let Ok(schema) = delta_io::get_table_schema(&table) {
261 self.schema = Some(schema);
262 }
263 }
264
265 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
267 self.last_committed_epoch =
268 delta_io::get_last_committed_epoch(&table, &self.config.writer_id).await;
269 if self.last_committed_epoch > 0 {
270 info!(
271 writer_id = %self.config.writer_id,
272 last_committed_epoch = self.last_committed_epoch,
273 "recovered last committed epoch from Delta Lake txn metadata"
274 );
275 }
276 }
277
278 #[allow(clippy::cast_sign_loss)]
280 {
281 self.delta_version = table.version().unwrap_or(0) as u64;
282 }
283 self.table = Some(table);
284
285 self.cached_writer_properties = self.config.parquet.to_writer_properties().ok();
293 if self.config.write_mode == DeltaWriteMode::Upsert {
294 self.merge_session = Some(datafusion::prelude::SessionContext::new());
295 }
296
297 if self.config.compaction.enabled {
301 let cancel = tokio_util::sync::CancellationToken::new();
302 let compaction_props = self.config.parquet.compaction_writer_properties().ok();
303 let handle = tokio::spawn(compaction_loop(
304 resolved_path.clone(),
305 Arc::new(merged_options),
306 self.config.compaction.clone(),
307 self.config.vacuum_retention,
308 compaction_props,
309 cancel.clone(),
310 ));
311 self.compaction_cancel = Some(cancel);
312 self.compaction_handle = Some(handle);
313 }
314
315 Ok(())
316 }
317
318 #[must_use]
320 pub fn state(&self) -> ConnectorState {
321 self.state
322 }
323
324 #[must_use]
326 pub fn current_epoch(&self) -> u64 {
327 self.current_epoch
328 }
329
330 #[must_use]
332 pub fn last_committed_epoch(&self) -> u64 {
333 self.last_committed_epoch
334 }
335
336 #[must_use]
338 pub fn buffered_rows(&self) -> usize {
339 self.buffered_rows
340 }
341
342 #[must_use]
344 pub fn buffered_bytes(&self) -> u64 {
345 self.buffered_bytes
346 }
347
348 #[must_use]
350 pub fn delta_version(&self) -> u64 {
351 self.delta_version
352 }
353
354 #[must_use]
356 pub fn sink_metrics(&self) -> &DeltaLakeSinkMetrics {
357 &self.metrics
358 }
359
360 #[must_use]
362 pub fn config(&self) -> &DeltaLakeSinkConfig {
363 &self.config
364 }
365
366 #[must_use]
368 pub fn should_flush(&self) -> bool {
369 if self.buffered_rows >= self.config.max_buffer_records {
370 return true;
371 }
372 if self.buffered_bytes >= self.config.target_file_size as u64 {
373 return true;
374 }
375 if let Some(start) = self.buffer_start_time {
376 if start.elapsed() >= self.config.max_buffer_duration {
377 return true;
378 }
379 }
380 false
381 }
382
383 const CDC_METADATA_COLUMNS: &'static [&'static str] = &["_op", "_ts_ms"];
387
388 fn target_schema(batch_schema: &SchemaRef, write_mode: DeltaWriteMode) -> SchemaRef {
389 if write_mode == DeltaWriteMode::Upsert {
390 let fields: Vec<_> = batch_schema
391 .fields()
392 .iter()
393 .filter(|f| !Self::CDC_METADATA_COLUMNS.contains(&f.name().as_str()))
394 .cloned()
395 .collect();
396 Arc::new(arrow_schema::Schema::new(fields))
397 } else {
398 batch_schema.clone()
399 }
400 }
401
402 #[must_use]
404 pub fn estimate_batch_size(batch: &RecordBatch) -> u64 {
405 batch
406 .columns()
407 .iter()
408 .map(|col| col.get_array_memory_size() as u64)
409 .sum()
410 }
411
412 #[cfg(feature = "delta-lake")]
416 fn is_conflict_error(err: &ConnectorError) -> bool {
417 let msg = err.to_string().to_lowercase();
418 msg.contains("conflicting commit")
419 || msg.contains("version already exists")
420 || msg.contains("concurrent")
421 || (msg.contains("conflict") && !msg.contains("log") && !msg.contains("corrupt"))
422 }
423
424 #[cfg(feature = "delta-lake")]
428 async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
429 use super::delta_io;
430
431 let table = delta_io::open_or_create_table(
432 &self.resolved_table_path,
433 self.resolved_storage_options.clone(),
434 self.schema.as_ref(),
435 )
436 .await?;
437
438 #[allow(clippy::cast_sign_loss)]
439 {
440 self.delta_version = table.version().unwrap_or(0) as u64;
441 }
442 self.table = Some(table);
443 Ok(())
444 }
445
446 #[cfg(feature = "delta-lake")]
450 fn schedule_background_reopen(&mut self) {
451 if let Some(prev) = self.pending_reopen.take() {
452 prev.abort();
453 }
454 let path = self.resolved_table_path.clone();
455 let opts = self.resolved_storage_options.clone();
456 let schema = self.schema.clone();
457 self.pending_reopen = Some(tokio::spawn(async move {
458 super::delta_io::open_or_create_table(&path, opts, schema.as_ref()).await
459 }));
460 }
461
462 #[cfg(feature = "delta-lake")]
466 async fn try_install_pending_reopen(&mut self, timeout: std::time::Duration) -> bool {
467 let Some(mut pending) = self.pending_reopen.take() else {
468 return false;
469 };
470 let table = match tokio::time::timeout(timeout, &mut pending).await {
471 Ok(Ok(Ok(t))) => t,
472 Ok(Ok(Err(e))) => {
473 warn!(error = %e, "Delta background reopen failed");
474 return false;
475 }
476 Ok(Err(e)) => {
477 warn!(error = %e, "Delta background reopen task ended unexpectedly");
478 return false;
479 }
480 Err(_) => {
481 warn!(
482 timeout_secs = timeout.as_secs(),
483 "Delta background reopen timed out"
484 );
485 pending.abort();
486 return false;
487 }
488 };
489 #[allow(clippy::cast_sign_loss)]
490 {
491 self.delta_version = table.version().unwrap_or(0) as u64;
492 }
493 self.table = Some(table);
494 true
495 }
496
497 #[cfg(feature = "delta-lake")]
499 async fn attempt_delta_write(
500 &mut self,
501 table: DeltaTable,
502 ) -> Result<DeltaTable, ConnectorError> {
503 let batches: Vec<RecordBatch> = self.staged_batches.clone();
506
507 if self.config.write_mode == DeltaWriteMode::Upsert {
508 let combined = if batches.len() == 1 {
513 batches.into_iter().next().expect("len == 1 checked")
514 } else {
515 match arrow_select::concat::concat_batches(&batches[0].schema(), &batches) {
516 Ok(c) => c,
517 Err(e) => {
518 self.table = Some(table);
520 return Err(ConnectorError::Internal(format!(
521 "failed to concat batches: {e}"
522 )));
523 }
524 }
525 };
526
527 let writer_props = self.cached_writer_properties.clone();
531 let merge_session = self
532 .merge_session
533 .as_ref()
534 .expect("merge_session built in init_delta_table for Upsert mode");
535
536 super::delta_io::merge_changelog(
537 table,
538 combined,
539 &self.config.merge_key_columns,
540 &self.config.writer_id,
541 self.current_epoch,
542 self.config.schema_evolution,
543 writer_props,
544 merge_session,
545 )
546 .await
547 .map(|(t, result)| {
548 self.metrics.record_merge();
549 if result.rows_deleted > 0 {
550 self.metrics.record_deletes(result.rows_deleted as u64);
551 }
552 t
553 })
554 } else {
555 let save_mode = match self.config.write_mode {
557 DeltaWriteMode::Append => SaveMode::Append,
558 DeltaWriteMode::Overwrite => SaveMode::Overwrite,
559 DeltaWriteMode::Upsert => unreachable!("handled by the upsert branch above"),
560 };
561
562 let partition_cols = if self.config.partition_columns.is_empty() {
563 None
564 } else {
565 Some(self.config.partition_columns.as_slice())
566 };
567
568 let should_checkpoint = self.config.checkpoint_interval > 0
570 && self.delta_version > 0
571 && (self.delta_version + 1).is_multiple_of(self.config.checkpoint_interval);
572
573 super::delta_io::write_batches(
574 table,
575 batches,
576 &self.config.writer_id,
577 self.current_epoch,
578 save_mode,
579 partition_cols,
580 self.config.schema_evolution,
581 Some(self.config.target_file_size),
582 should_checkpoint,
583 self.cached_writer_properties.clone(),
584 )
585 .await
586 .map(|(t, _version)| t)
587 }
588 }
589
590 #[cfg(feature = "delta-lake")]
595 #[allow(clippy::too_many_lines)]
596 async fn flush_staged_to_delta(&mut self) -> Result<WriteResult, ConnectorError> {
597 if self.staged_batches.is_empty() {
598 return Ok(WriteResult::new(0, 0));
599 }
600
601 if self.config.write_mode == DeltaWriteMode::Upsert && self.staged_batches.len() > 1 {
605 let schema = self.staged_batches[0].schema();
606 let combined = arrow_select::concat::concat_batches(&schema, &self.staged_batches)
607 .map_err(|e| {
608 ConnectorError::Internal(format!("failed to concat staged batches: {e}"))
609 })?;
610 self.staged_batches.clear();
611 self.staged_batches.push(combined);
612 }
613
614 let total_rows = self.staged_rows;
615 let estimated_bytes = self.staged_bytes;
616 let flush_start = Instant::now();
617
618 let backoff_ms = [100u64, 500, 2000];
620 let max_attempts = (self.config.max_commit_retries as usize).saturating_add(1);
621 let mut last_error: Option<ConnectorError> = None;
622
623 for attempt in 0..max_attempts {
624 if self.table.is_none() {
628 let reopen_timeout = self.config.write_timeout;
629 if !self.try_install_pending_reopen(reopen_timeout).await {
630 tokio::time::timeout(reopen_timeout, self.reopen_table())
631 .await
632 .map_err(|_| {
633 ConnectorError::ConnectionFailed(format!(
634 "Delta table reopen timed out after {}s",
635 reopen_timeout.as_secs()
636 ))
637 })??;
638 }
639 }
640
641 let table = self
642 .table
643 .take()
644 .ok_or_else(|| ConnectorError::InvalidState {
645 expected: "table initialized".into(),
646 actual: "table not initialized".into(),
647 })?;
648
649 let write_timeout = self.config.write_timeout;
653 let write_result =
654 tokio::time::timeout(write_timeout, self.attempt_delta_write(table)).await;
655
656 let write_result = match write_result {
659 Ok(inner) => inner,
660 Err(_elapsed) => Err(ConnectorError::WriteError(format!(
661 "Delta write timed out after {}s",
662 write_timeout.as_secs()
663 ))),
664 };
665
666 match write_result {
667 Ok(table) => {
668 #[allow(clippy::cast_sign_loss)]
669 {
670 self.delta_version = table.version().unwrap_or(0) as u64;
671 }
672
673 let crossed_checkpoint = self.config.checkpoint_interval > 0
678 && self.delta_version > 0
679 && self
680 .delta_version
681 .is_multiple_of(self.config.checkpoint_interval);
682 if crossed_checkpoint {
683 self.table = None;
684 self.schedule_background_reopen();
685 } else {
686 self.table = Some(table);
687 }
688
689 self.staged_batches.clear();
690 self.staged_rows = 0;
691 self.staged_bytes = 0;
692
693 self.metrics
694 .record_flush(total_rows as u64, estimated_bytes);
695 self.metrics.record_commit(self.delta_version);
696 self.metrics
697 .observe_flush_duration(flush_start.elapsed().as_secs_f64());
698
699 debug!(
700 rows = total_rows,
701 bytes = estimated_bytes,
702 delta_version = self.delta_version,
703 attempt = attempt + 1,
704 reopened = crossed_checkpoint,
705 "Delta Lake: committed staged data to Delta"
706 );
707
708 return Ok(WriteResult::new(total_rows, estimated_bytes));
709 }
710 Err(e) => {
711 if Self::is_conflict_error(&e) && attempt + 1 < max_attempts {
712 self.metrics.record_conflict();
713 self.metrics.record_retry();
714 let base = backoff_ms.get(attempt).copied().unwrap_or(2000);
717 let delay_ms = jittered_backoff_ms(base);
718 warn!(
719 attempt = attempt + 1,
720 max_attempts,
721 delay_ms,
722 error = %e,
723 "Delta Lake: conflict error, retrying after backoff"
724 );
725 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
726 last_error = Some(e);
727 continue;
729 }
730 self.metrics
732 .observe_flush_duration(flush_start.elapsed().as_secs_f64());
733 return Err(e);
734 }
735 }
736 }
737
738 Err(last_error.unwrap_or_else(|| {
740 ConnectorError::Internal("flush_staged_to_delta: no attempts made".into())
741 }))
742 }
743
744 pub fn split_changelog_batch(
757 batch: &RecordBatch,
758 ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
759 let op_idx = batch.schema().index_of("_op").map_err(|_| {
760 ConnectorError::ConfigurationError(
761 "upsert mode requires '_op' column in input schema".into(),
762 )
763 })?;
764
765 let op_array = batch
766 .column(op_idx)
767 .as_any()
768 .downcast_ref::<arrow_array::StringArray>()
769 .ok_or_else(|| {
770 ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
771 })?;
772
773 let len = op_array.len();
775 let mut insert_mask = Vec::with_capacity(len);
776 let mut delete_mask = Vec::with_capacity(len);
777
778 for i in 0..len {
779 if op_array.is_null(i) {
780 insert_mask.push(false);
781 delete_mask.push(false);
782 continue;
783 }
784 match op_array.value(i) {
785 "I" | "U" | "r" => {
786 insert_mask.push(true);
787 delete_mask.push(false);
788 }
789 "D" => {
790 insert_mask.push(false);
791 delete_mask.push(true);
792 }
793 _ => {
794 insert_mask.push(false);
795 delete_mask.push(false);
796 }
797 }
798 }
799
800 let user_col_indices: Vec<usize> = batch
802 .schema()
803 .fields()
804 .iter()
805 .enumerate()
806 .filter(|(_, f)| !f.name().starts_with('_'))
807 .map(|(i, _)| i)
808 .collect();
809
810 let insert_batch = filter_and_project(batch, insert_mask, &user_col_indices)?;
811 let delete_batch = filter_and_project(batch, delete_mask, &user_col_indices)?;
812
813 Ok((insert_batch, delete_batch))
814 }
815}
816
817#[cfg(feature = "delta-lake")]
821#[allow(clippy::too_many_lines)]
822async fn compaction_loop(
823 table_path: String,
824 storage_options: Arc<std::collections::HashMap<String, String>>,
825 config: super::delta_config::CompactionConfig,
826 vacuum_retention: std::time::Duration,
827 compaction_props: Option<deltalake::parquet::file::properties::WriterProperties>,
828 cancel: tokio_util::sync::CancellationToken,
829) {
830 use super::delta_io;
831
832 const MIN_COMPACTION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
834
835 let base_interval = config.check_interval;
836 let mut current_interval = base_interval;
837 let mut consecutive_skips: u32 = 0;
838
839 info!(
840 table_path = %table_path,
841 check_interval_secs = base_interval.as_secs(),
842 "compaction background task started (adaptive interval)"
843 );
844
845 tokio::select! {
847 () = cancel.cancelled() => {
848 info!("compaction background task cancelled");
849 return;
850 }
851 () = tokio::time::sleep(current_interval) => {}
852 }
853
854 loop {
855 let table =
857 match delta_io::open_or_create_table(&table_path, (*storage_options).clone(), None)
858 .await
859 {
860 Ok(t) => t,
861 Err(e) => {
862 warn!(error = %e, "compaction: failed to open table, will retry");
863 tokio::select! {
864 () = cancel.cancelled() => {
865 info!("compaction background task cancelled");
866 return;
867 }
868 () = tokio::time::sleep(current_interval) => {}
869 }
870 continue;
871 }
872 };
873
874 let should_compact = match table.snapshot() {
876 Ok(snapshot) => {
877 let file_count = snapshot.log_data().num_files();
878 if file_count < config.min_files_for_compaction {
879 debug!(
880 file_count,
881 min = config.min_files_for_compaction,
882 "compaction: skipping, not enough files"
883 );
884 false
885 } else {
886 true
887 }
888 }
889 Err(e) => {
890 warn!(error = %e, "compaction: snapshot failed, skipping tick");
891 false
892 }
893 };
894
895 if should_compact {
896 consecutive_skips = 0;
897 current_interval = (current_interval / 2).max(MIN_COMPACTION_INTERVAL);
899
900 let target_size = config.target_file_size as u64;
901 match delta_io::run_compaction(
904 table,
905 target_size,
906 &config.z_order_columns,
907 compaction_props.clone(),
908 )
909 .await
910 {
911 Ok((table, result)) => {
912 debug!(
913 files_added = result.files_added,
914 files_removed = result.files_removed,
915 interval_secs = current_interval.as_secs(),
916 "compaction: OPTIMIZE complete"
917 );
918
919 match delta_io::run_vacuum(table, vacuum_retention).await {
921 Ok((_table, files_deleted)) => {
922 debug!(files_deleted, "compaction: VACUUM complete");
923 }
924 Err(e) => {
925 warn!(error = %e, "compaction: VACUUM failed");
926 }
927 }
928 }
929 Err(e) => {
930 warn!(error = %e, "compaction: OPTIMIZE failed");
931 }
932 }
933 } else {
934 consecutive_skips = consecutive_skips.saturating_add(1);
935 if consecutive_skips >= 2 {
937 current_interval = (current_interval * 2).min(base_interval);
938 }
939 }
940
941 tokio::select! {
943 () = cancel.cancelled() => {
944 info!("compaction background task cancelled");
945 return;
946 }
947 () = tokio::time::sleep(current_interval) => {}
948 }
949 }
950}
951
952#[cfg(all(feature = "delta-lake", feature = "delta-lake-unity"))]
956async fn ensure_uc_table_exists(
957 config: &DeltaLakeSinkConfig,
958 schema: Option<&SchemaRef>,
959) -> Result<(), ConnectorError> {
960 let super::delta_config::DeltaCatalogType::Unity {
961 ref workspace_url,
962 ref access_token,
963 } = config.catalog_type
964 else {
965 return Ok(());
966 };
967
968 let Some(ref storage_location) = config.catalog_storage_location else {
969 return Ok(());
970 };
971
972 let Some(arrow_schema) = schema else {
973 warn!(
974 "catalog.storage.location is set but no schema available — \
975 skipping Unity Catalog auto-create"
976 );
977 return Ok(());
978 };
979
980 let catalog = config.catalog_name.as_deref().unwrap_or_default();
981 let schema_name = config.catalog_schema.as_deref().unwrap_or_default();
982 let table_name = config
983 .table_path
984 .strip_prefix("uc://")
985 .and_then(|s| s.rsplit('.').next())
986 .unwrap_or(&config.table_path);
987
988 let columns = super::unity_catalog::arrow_to_uc_columns(arrow_schema);
989 super::unity_catalog::create_uc_table(
990 workspace_url,
991 access_token,
992 catalog,
993 schema_name,
994 table_name,
995 storage_location,
996 &columns,
997 )
998 .await
999}
1000
1001#[async_trait]
1002impl SinkConnector for DeltaLakeSink {
1003 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
1004 self.state = ConnectorState::Initializing;
1005
1006 if !config.properties().is_empty() {
1008 self.config = DeltaLakeSinkConfig::from_config(config)?;
1009 }
1010
1011 info!(
1012 table_path = %self.config.table_path,
1013 mode = %self.config.write_mode,
1014 guarantee = %self.config.delivery_guarantee,
1015 "opening Delta Lake sink connector"
1016 );
1017
1018 #[cfg(feature = "delta-lake")]
1022 {
1023 let should_defer = matches!(
1024 self.config.catalog_type,
1025 super::delta_config::DeltaCatalogType::Unity { .. }
1026 ) && self.config.catalog_storage_location.is_some()
1027 && self.schema.is_none();
1028
1029 if should_defer {
1030 info!(
1031 "Unity Catalog auto-create configured but pipeline schema not yet \
1032 available — deferring Delta table init to first begin_epoch"
1033 );
1034 self.needs_deferred_delta_init = true;
1035 self.state = ConnectorState::Initializing;
1036 return Ok(());
1037 }
1038
1039 self.init_delta_table().await?;
1040
1041 if self.table.as_ref().is_some_and(|t| t.version().is_none()) && self.schema.is_none() {
1044 self.needs_deferred_delta_init = true;
1045 self.state = ConnectorState::Initializing;
1046 return Ok(());
1047 }
1048 }
1049
1050 #[cfg(not(feature = "delta-lake"))]
1051 {
1052 self.state = ConnectorState::Failed;
1053 return Err(ConnectorError::ConfigurationError(
1054 "Delta Lake sink requires the 'delta-lake' feature to be enabled. \
1055 Build with: cargo build --features delta-lake"
1056 .into(),
1057 ));
1058 }
1059
1060 #[cfg(feature = "delta-lake")]
1061 {
1062 self.state = ConnectorState::Running;
1063 info!("Delta Lake sink connector opened successfully");
1064 Ok(())
1065 }
1066 }
1067
1068 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
1069 if self.state != ConnectorState::Running && self.state != ConnectorState::Initializing {
1071 return Err(ConnectorError::InvalidState {
1072 expected: "Running".into(),
1073 actual: self.state.to_string(),
1074 });
1075 }
1076
1077 if batch.num_rows() == 0 {
1078 return Ok(WriteResult::new(0, 0));
1079 }
1080
1081 if self.epoch_skipped {
1082 return Ok(WriteResult::new(0, 0));
1083 }
1084
1085 if self.schema.is_none() {
1088 self.schema = Some(Self::target_schema(&batch.schema(), self.config.write_mode));
1089 }
1090
1091 #[cfg(feature = "delta-lake")]
1095 if self.needs_deferred_delta_init {
1096 info!("schema now available from first batch — completing deferred Delta table init");
1097 match self.init_delta_table().await {
1098 Ok(()) => {
1099 self.needs_deferred_delta_init = false;
1100 self.state = ConnectorState::Running;
1101 info!("Delta Lake sink connector opened successfully (deferred)");
1102 }
1103 Err(e) => {
1104 self.state = ConnectorState::Failed;
1105 return Err(e);
1106 }
1107 }
1108 }
1109
1110 let num_rows = batch.num_rows();
1111 let estimated_bytes = Self::estimate_batch_size(batch);
1112
1113 let pending_rows = self.buffered_rows + self.staged_rows + num_rows;
1128 let pending_bytes = self.buffered_bytes + self.staged_bytes + estimated_bytes;
1129 let row_cap = self
1130 .config
1131 .max_buffer_records
1132 .saturating_mul(4)
1133 .max(num_rows);
1134 let byte_cap = (self.config.target_file_size as u64)
1135 .saturating_mul(4)
1136 .max(estimated_bytes);
1137 if pending_rows > row_cap || pending_bytes > byte_cap {
1138 return Err(ConnectorError::WriteError(format!(
1139 "delta sink buffer full ({pending_rows} rows, \
1140 {pending_bytes} bytes pending; cap {row_cap} rows, \
1141 {byte_cap} bytes) — backpressure until next flush/commit"
1142 )));
1143 }
1144
1145 if self.buffer_start_time.is_none() {
1147 self.buffer_start_time = Some(Instant::now());
1148 }
1149 self.buffer.push(batch.clone());
1150 self.buffered_rows += num_rows;
1151 self.buffered_bytes += estimated_bytes;
1152
1153 #[cfg(feature = "delta-lake")]
1154 if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce && self.should_flush() {
1155 if !self.staged_batches.is_empty() {
1156 self.flush_staged_to_delta().await?;
1157 }
1158 self.staged_batches = std::mem::take(&mut self.buffer);
1159 self.staged_rows = self.buffered_rows;
1160 self.staged_bytes = self.buffered_bytes;
1161 self.buffered_rows = 0;
1162 self.buffered_bytes = 0;
1163 self.buffer_start_time = None;
1164 self.flush_staged_to_delta().await?;
1165 }
1166
1167 Ok(WriteResult::new(0, 0))
1168 }
1169
1170 fn schema(&self) -> SchemaRef {
1171 self.schema
1172 .clone()
1173 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
1174 }
1175
1176 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1177 #[cfg(feature = "delta-lake")]
1183 if self.needs_deferred_delta_init {
1184 if self.schema.is_some() {
1189 info!("schema available — completing deferred Delta table init");
1190 match self.init_delta_table().await {
1191 Ok(()) => {
1192 self.needs_deferred_delta_init = false;
1193 self.state = ConnectorState::Running;
1194 info!("Delta Lake sink connector opened successfully (deferred)");
1195 }
1196 Err(e) => {
1197 self.state = ConnectorState::Failed;
1198 return Err(e);
1199 }
1200 }
1201 }
1202 }
1203
1204 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1206 && epoch <= self.last_committed_epoch
1207 {
1208 warn!(
1209 epoch,
1210 last_committed = self.last_committed_epoch,
1211 "Delta Lake: skipping already-committed epoch"
1212 );
1213 self.epoch_skipped = true;
1214 return Ok(());
1215 }
1216
1217 self.epoch_skipped = false;
1218 self.current_epoch = epoch;
1219 self.buffer.clear();
1220 self.buffered_rows = 0;
1221 self.buffered_bytes = 0;
1222 self.buffer_start_time = None;
1223
1224 debug!(epoch, "Delta Lake: began epoch");
1225 Ok(())
1226 }
1227
1228 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1229 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1231 && epoch <= self.last_committed_epoch
1232 {
1233 return Ok(());
1234 }
1235
1236 if !self.buffer.is_empty() {
1240 self.staged_batches = std::mem::take(&mut self.buffer);
1241 self.staged_rows = self.buffered_rows;
1242 self.staged_bytes = self.buffered_bytes;
1243 self.buffered_rows = 0;
1244 self.buffered_bytes = 0;
1245 self.buffer_start_time = None;
1246 }
1247
1248 debug!(epoch, "Delta Lake: pre-committed (batches staged)");
1249 Ok(())
1250 }
1251
1252 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1253 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1255 && epoch <= self.last_committed_epoch
1256 {
1257 return Ok(());
1258 }
1259
1260 #[cfg(feature = "delta-lake")]
1262 {
1263 if !self.staged_batches.is_empty() {
1264 self.flush_staged_to_delta().await?;
1265 }
1266 }
1267
1268 self.last_committed_epoch = epoch;
1269
1270 info!(
1271 epoch,
1272 delta_version = self.delta_version,
1273 "Delta Lake: committed epoch"
1274 );
1275
1276 Ok(())
1277 }
1278
1279 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1280 self.buffer.clear();
1283 self.buffered_rows = 0;
1284 self.buffered_bytes = 0;
1285 self.buffer_start_time = None;
1286 self.staged_batches.clear();
1287 self.staged_rows = 0;
1288 self.staged_bytes = 0;
1289
1290 self.epoch_skipped = false;
1291 self.metrics.record_rollback();
1292 warn!(epoch, "Delta Lake: rolled back epoch");
1293 Ok(())
1294 }
1295
1296 fn health_check(&self) -> HealthStatus {
1297 match self.state {
1298 ConnectorState::Running => HealthStatus::Healthy,
1299 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1300 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1301 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1302 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1303 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1304 }
1305 }
1306
1307 fn metrics(&self) -> ConnectorMetrics {
1308 self.metrics.to_connector_metrics()
1309 }
1310
1311 fn capabilities(&self) -> SinkConnectorCapabilities {
1312 let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(180)).with_idempotent();
1314
1315 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1316 caps = caps.with_exactly_once().with_two_phase_commit();
1317 }
1318 if self.config.write_mode == DeltaWriteMode::Upsert {
1319 caps = caps.with_upsert().with_changelog();
1320 }
1321 if self.config.schema_evolution {
1322 caps = caps.with_schema_evolution();
1323 }
1324 if !self.config.partition_columns.is_empty() {
1325 caps = caps.with_partitioned();
1326 }
1327
1328 caps
1329 }
1330
1331 async fn flush(&mut self) -> Result<(), ConnectorError> {
1332 #[cfg(feature = "delta-lake")]
1336 if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1337 if !self.staged_batches.is_empty() {
1341 self.flush_staged_to_delta().await?;
1342 }
1343
1344 if !self.buffer.is_empty() {
1346 self.staged_batches = std::mem::take(&mut self.buffer);
1347 self.staged_rows = self.buffered_rows;
1348 self.staged_bytes = self.buffered_bytes;
1349 self.buffered_rows = 0;
1350 self.buffered_bytes = 0;
1351 self.buffer_start_time = None;
1352
1353 self.flush_staged_to_delta().await?;
1354 }
1355 return Ok(());
1356 }
1357
1358 if self.buffer.is_empty() {
1359 return Ok(());
1360 }
1361
1362 if self.buffer.len() > 1 {
1365 let schema = self.buffer[0].schema();
1366 let combined = arrow_select::concat::concat_batches(&schema, &self.buffer)
1367 .map_err(|e| ConnectorError::Internal(format!("concat failed: {e}")))?;
1368 self.buffer.clear();
1369 self.buffer.push(combined);
1370 }
1371 Ok(())
1372 }
1373
1374 async fn close(&mut self) -> Result<(), ConnectorError> {
1375 info!("closing Delta Lake sink connector");
1376
1377 #[cfg(feature = "delta-lake")]
1381 if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1382 self.flush().await?;
1383 } else if !self.buffer.is_empty() {
1384 self.pre_commit(self.current_epoch).await?;
1385 self.commit_epoch(self.current_epoch).await?;
1386 }
1387
1388 #[cfg(feature = "delta-lake")]
1390 {
1391 if let Some(cancel) = self.compaction_cancel.take() {
1392 cancel.cancel();
1393 }
1394 if let Some(handle) = self.compaction_handle.take() {
1395 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
1397 }
1398 }
1399
1400 #[cfg(feature = "delta-lake")]
1401 if let Some(pending) = self.pending_reopen.take() {
1402 pending.abort();
1403 }
1404
1405 #[cfg(feature = "delta-lake")]
1407 {
1408 self.table = None;
1409 }
1410
1411 self.state = ConnectorState::Closed;
1412
1413 info!(
1414 table_path = %self.config.table_path,
1415 delta_version = self.delta_version,
1416 "Delta Lake sink connector closed"
1417 );
1418
1419 Ok(())
1420 }
1421}
1422
1423#[cfg(feature = "delta-lake")]
1427impl Drop for DeltaLakeSink {
1428 fn drop(&mut self) {
1429 if let Some(cancel) = self.compaction_cancel.take() {
1430 cancel.cancel();
1431 }
1432 if let Some(handle) = self.compaction_handle.take() {
1433 handle.abort();
1434 }
1435 if let Some(handle) = self.pending_reopen.take() {
1436 handle.abort();
1437 }
1438 }
1439}
1440
1441impl std::fmt::Debug for DeltaLakeSink {
1442 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1443 f.debug_struct("DeltaLakeSink")
1444 .field("state", &self.state)
1445 .field("table_path", &self.config.table_path)
1446 .field("mode", &self.config.write_mode)
1447 .field("guarantee", &self.config.delivery_guarantee)
1448 .field("current_epoch", &self.current_epoch)
1449 .field("last_committed_epoch", &self.last_committed_epoch)
1450 .field("buffered_rows", &self.buffered_rows)
1451 .field("delta_version", &self.delta_version)
1452 .field("epoch_skipped", &self.epoch_skipped)
1453 .finish_non_exhaustive()
1454 }
1455}
1456
1457#[cfg(feature = "delta-lake")]
1465fn jittered_backoff_ms(base_ms: u64) -> u64 {
1466 use rand::RngExt as _;
1467 let factor: f64 = rand::rng().random_range(0.75_f64..=1.25_f64);
1468 #[allow(
1469 clippy::cast_possible_truncation,
1470 clippy::cast_sign_loss,
1471 clippy::cast_precision_loss
1472 )]
1473 let jittered = (base_ms as f64 * factor) as u64;
1474 jittered.max(1)
1475}
1476
1477fn filter_and_project(
1483 batch: &RecordBatch,
1484 mask: Vec<bool>,
1485 col_indices: &[usize],
1486) -> Result<RecordBatch, ConnectorError> {
1487 use arrow_array::BooleanArray;
1488 use arrow_select::filter::filter_record_batch;
1489
1490 let bool_array = BooleanArray::from(mask);
1491
1492 let projected = batch
1493 .project(col_indices)
1494 .map_err(|e| ConnectorError::Internal(format!("batch projection failed: {e}")))?;
1495
1496 filter_record_batch(&projected, &bool_array)
1497 .map_err(|e| ConnectorError::Internal(format!("arrow filter failed: {e}")))
1498}
1499
1500#[cfg(test)]
1501#[allow(clippy::cast_possible_wrap)]
1502#[allow(clippy::cast_precision_loss)]
1503#[allow(clippy::float_cmp)]
1504mod tests {
1505 use super::super::delta_config::DeltaCatalogType;
1506 use super::*;
1507 use arrow_array::{Float64Array, Int64Array, StringArray};
1508 use arrow_schema::{DataType, Field, Schema};
1509
1510 fn test_schema() -> SchemaRef {
1511 Arc::new(Schema::new(vec![
1512 Field::new("id", DataType::Int64, false),
1513 Field::new("name", DataType::Utf8, true),
1514 Field::new("value", DataType::Float64, true),
1515 ]))
1516 }
1517
1518 fn test_config() -> DeltaLakeSinkConfig {
1519 #[cfg(unix)]
1520 let path = "/tmp/delta_test_nonexistent_8f3a";
1521 #[cfg(windows)]
1522 let path = "C:\\delta_test_nonexistent_8f3a";
1523 DeltaLakeSinkConfig::new(path)
1524 }
1525
1526 fn upsert_config() -> DeltaLakeSinkConfig {
1527 let mut cfg = test_config();
1528 cfg.write_mode = DeltaWriteMode::Upsert;
1529 cfg.merge_key_columns = vec!["id".to_string()];
1530 cfg
1531 }
1532
1533 fn test_batch(n: usize) -> RecordBatch {
1534 let ids: Vec<i64> = (0..n as i64).collect();
1535 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1536 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1537
1538 RecordBatch::try_new(
1539 test_schema(),
1540 vec![
1541 Arc::new(Int64Array::from(ids)),
1542 Arc::new(StringArray::from(names)),
1543 Arc::new(Float64Array::from(values)),
1544 ],
1545 )
1546 .unwrap()
1547 }
1548
1549 #[test]
1552 fn test_new_defaults() {
1553 let sink = DeltaLakeSink::new(test_config(), None);
1554 assert_eq!(sink.state(), ConnectorState::Created);
1555 assert_eq!(sink.current_epoch(), 0);
1556 assert_eq!(sink.last_committed_epoch(), 0);
1557 assert_eq!(sink.buffered_rows(), 0);
1558 assert_eq!(sink.buffered_bytes(), 0);
1559 assert_eq!(sink.delta_version(), 0);
1560 assert!(sink.schema.is_none());
1561 }
1562
1563 #[test]
1564 fn test_with_schema() {
1565 let schema = test_schema();
1566 let sink = DeltaLakeSink::with_schema(test_config(), schema.clone());
1567 assert_eq!(sink.schema(), schema);
1568 }
1569
1570 #[test]
1571 fn test_schema_empty_when_none() {
1572 let sink = DeltaLakeSink::new(test_config(), None);
1573 let schema = sink.schema();
1574 assert_eq!(schema.fields().len(), 0);
1575 }
1576
1577 #[cfg(feature = "delta-lake")]
1578 #[test]
1579 fn test_deferred_init_flag_default_false() {
1580 let sink = DeltaLakeSink::new(test_config(), None);
1581 assert!(!sink.needs_deferred_delta_init);
1582 }
1583
1584 fn unity_config() -> DeltaLakeSinkConfig {
1585 let mut config = test_config();
1586 config.catalog_type = DeltaCatalogType::Unity {
1587 workspace_url: "https://test.azuredatabricks.net".to_string(),
1588 access_token: "dapi123".to_string(),
1589 };
1590 config.catalog_name = Some("main".to_string());
1591 config.catalog_schema = Some("default".to_string());
1592 config.catalog_storage_location = Some("abfss://c@acct.dfs.core.windows.net/t".to_string());
1593 config
1594 }
1595
1596 #[cfg(feature = "delta-lake")]
1597 #[tokio::test]
1598 async fn test_open_defers_init_for_unity_no_schema() {
1599 use crate::config::ConnectorConfig;
1600
1601 let config = unity_config();
1602 let mut sink = DeltaLakeSink::new(config, None);
1603
1604 let connector_config = ConnectorConfig::new("delta-lake");
1606 let result = sink.open(&connector_config).await;
1611 assert!(result.is_ok());
1612
1613 assert!(sink.needs_deferred_delta_init);
1615 assert_eq!(sink.state(), ConnectorState::Initializing);
1616 assert!(sink.schema.is_none());
1617 }
1618
1619 #[cfg(feature = "delta-lake")]
1620 #[tokio::test]
1621 async fn test_deferred_init_transitions_to_failed_on_error() {
1622 let mut sink = DeltaLakeSink::new(test_config(), None);
1625 sink.state = ConnectorState::Initializing;
1626 sink.needs_deferred_delta_init = true;
1627 sink.schema = Some(test_schema());
1628
1629 let result = sink.begin_epoch(1).await;
1633 assert!(result.is_err());
1634 assert_eq!(sink.state(), ConnectorState::Failed);
1635 }
1637
1638 #[cfg(feature = "delta-lake")]
1639 #[tokio::test]
1640 async fn test_write_batch_accepts_initializing_state() {
1641 let mut sink = DeltaLakeSink::new(test_config(), None);
1644 sink.state = ConnectorState::Initializing;
1645 sink.needs_deferred_delta_init = true;
1646
1647 let batch = test_batch(5);
1648 let result = sink.write_batch(&batch).await;
1651 assert!(result.is_err());
1652 assert_eq!(sink.state(), ConnectorState::Failed);
1653 assert!(sink.schema.is_some());
1655 }
1656
1657 #[test]
1658 fn test_no_deferred_init_without_catalog_storage_location() {
1659 let mut config = unity_config();
1661 config.catalog_storage_location = None;
1662 let sink = DeltaLakeSink::new(config, None);
1663
1664 let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1665 && sink.config.catalog_storage_location.is_some()
1666 && sink.schema.is_none();
1667 assert!(!should_defer);
1668 }
1669
1670 #[test]
1671 fn test_no_deferred_init_with_schema() {
1672 let config = unity_config();
1674 let sink = DeltaLakeSink::with_schema(config, test_schema());
1675
1676 let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1677 && sink.config.catalog_storage_location.is_some()
1678 && sink.schema.is_none();
1679 assert!(!should_defer);
1680 }
1681
1682 #[test]
1683 fn test_health_check_initializing_during_deferred() {
1684 let mut sink = DeltaLakeSink::new(test_config(), None);
1685 sink.state = ConnectorState::Initializing;
1686 assert!(matches!(sink.health_check(), HealthStatus::Unknown));
1688 }
1689
1690 #[test]
1693 fn test_estimate_batch_size() {
1694 let batch = test_batch(100);
1695 let size = DeltaLakeSink::estimate_batch_size(&batch);
1696 assert!(size > 0);
1697 }
1698
1699 #[test]
1700 fn test_estimate_batch_size_empty() {
1701 let batch = RecordBatch::new_empty(test_schema());
1702 let size = DeltaLakeSink::estimate_batch_size(&batch);
1703 assert!(size < 1024);
1706 }
1707
1708 #[test]
1711 fn test_should_flush_by_rows() {
1712 let mut config = test_config();
1713 config.max_buffer_records = 100;
1714 let mut sink = DeltaLakeSink::new(config, None);
1715 sink.buffered_rows = 99;
1716 assert!(!sink.should_flush());
1717 sink.buffered_rows = 100;
1718 assert!(sink.should_flush());
1719 }
1720
1721 #[test]
1722 fn test_should_flush_by_bytes() {
1723 let mut config = test_config();
1724 config.target_file_size = 1000;
1725 let mut sink = DeltaLakeSink::new(config, None);
1726 sink.buffered_bytes = 999;
1727 assert!(!sink.should_flush());
1728 sink.buffered_bytes = 1000;
1729 assert!(sink.should_flush());
1730 }
1731
1732 #[test]
1733 fn test_should_flush_empty() {
1734 let sink = DeltaLakeSink::new(test_config(), None);
1735 assert!(!sink.should_flush());
1736 }
1737
1738 #[tokio::test]
1739 async fn test_exactly_once_buffer_backpressure() {
1740 let mut config = test_config();
1746 config.delivery_guarantee = crate::connector::DeliveryGuarantee::ExactlyOnce;
1747 config.max_buffer_records = 10;
1748 let mut sink = DeltaLakeSink::new(config, None);
1749 sink.state = ConnectorState::Running;
1750
1751 let first = test_batch(50);
1754 sink.write_batch(&first)
1755 .await
1756 .expect("single oversized batch must be admitted");
1757 assert_eq!(sink.buffered_rows(), 50);
1758
1759 let second = test_batch(5);
1762 let err = sink
1763 .write_batch(&second)
1764 .await
1765 .expect_err("should reject once cumulative buffer exceeds cap");
1766 let msg = err.to_string();
1767 assert!(
1768 msg.contains("buffer full"),
1769 "expected backpressure error, got: {msg}"
1770 );
1771 assert_eq!(sink.buffered_rows(), 50);
1773 }
1774
1775 #[tokio::test]
1778 async fn test_write_batch_buffering() {
1779 let mut config = test_config();
1780 config.max_buffer_records = 100;
1781 let mut sink = DeltaLakeSink::new(config, None);
1782 sink.state = ConnectorState::Running;
1783
1784 let batch = test_batch(10);
1785 let result = sink.write_batch(&batch).await.unwrap();
1786
1787 assert_eq!(result.records_written, 0);
1789 assert_eq!(sink.buffered_rows(), 10);
1790 assert!(sink.buffered_bytes() > 0);
1791 }
1792
1793 #[tokio::test]
1794 async fn test_write_batch_empty() {
1795 let mut sink = DeltaLakeSink::new(test_config(), None);
1796 sink.state = ConnectorState::Running;
1797
1798 let batch = test_batch(0);
1799 let result = sink.write_batch(&batch).await.unwrap();
1800 assert_eq!(result.records_written, 0);
1801 assert_eq!(sink.buffered_rows(), 0);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_write_batch_not_running() {
1806 let mut sink = DeltaLakeSink::new(test_config(), None);
1807 let batch = test_batch(10);
1810 let result = sink.write_batch(&batch).await;
1811 assert!(result.is_err());
1812 }
1813
1814 #[tokio::test]
1815 async fn test_write_batch_sets_schema() {
1816 let mut sink = DeltaLakeSink::new(test_config(), None);
1817 sink.state = ConnectorState::Running;
1818 assert!(sink.schema.is_none());
1819
1820 let batch = test_batch(5);
1821 sink.write_batch(&batch).await.unwrap();
1822 assert!(sink.schema.is_some());
1823 assert_eq!(sink.schema.as_ref().unwrap().fields().len(), 3);
1824 }
1825
1826 #[tokio::test]
1827 async fn test_multiple_write_batches_accumulate() {
1828 let mut config = test_config();
1829 config.max_buffer_records = 100;
1830 let mut sink = DeltaLakeSink::new(config, None);
1831 sink.state = ConnectorState::Running;
1832
1833 let batch = test_batch(10);
1834 sink.write_batch(&batch).await.unwrap();
1835 sink.write_batch(&batch).await.unwrap();
1836 sink.write_batch(&batch).await.unwrap();
1837
1838 assert_eq!(sink.buffered_rows(), 30);
1839 }
1840
1841 #[tokio::test]
1845 async fn test_rollback_clears_buffer() {
1846 let mut config = test_config();
1847 config.max_buffer_records = 1000;
1848 let mut sink = DeltaLakeSink::new(config, None);
1849 sink.state = ConnectorState::Running;
1850
1851 let batch = test_batch(50);
1852 sink.write_batch(&batch).await.unwrap();
1853 assert_eq!(sink.buffered_rows(), 50);
1854
1855 sink.rollback_epoch(0).await.unwrap();
1856 assert_eq!(sink.buffered_rows(), 0);
1857 assert_eq!(sink.buffered_bytes(), 0);
1858 }
1859
1860 #[tokio::test]
1863 async fn test_rollback_after_pre_commit_discards_staged() {
1864 let mut config = test_config();
1865 config.max_buffer_records = 1000;
1866 let mut sink = DeltaLakeSink::new(config, None);
1867 sink.state = ConnectorState::Running;
1868
1869 sink.begin_epoch(1).await.unwrap();
1870 let batch = test_batch(50);
1871 sink.write_batch(&batch).await.unwrap();
1872 assert_eq!(sink.buffered_rows(), 50);
1873
1874 sink.pre_commit(1).await.unwrap();
1876 assert_eq!(sink.buffered_rows(), 0);
1877 assert_eq!(sink.staged_rows, 50);
1878 assert!(!sink.staged_batches.is_empty());
1879
1880 sink.rollback_epoch(1).await.unwrap();
1882 assert_eq!(sink.buffered_rows(), 0);
1883 assert_eq!(sink.staged_rows, 0);
1884 assert_eq!(sink.staged_bytes, 0);
1885 assert!(sink.staged_batches.is_empty());
1886 assert_eq!(sink.delta_version(), 0); }
1888
1889 #[tokio::test]
1893 async fn test_staged_data_preserved_until_commit_or_rollback() {
1894 let mut config = test_config();
1895 config.max_buffer_records = 1000;
1896 let mut sink = DeltaLakeSink::new(config, None);
1897 sink.state = ConnectorState::Running;
1898
1899 sink.begin_epoch(1).await.unwrap();
1900 sink.write_batch(&test_batch(25)).await.unwrap();
1901 sink.write_batch(&test_batch(25)).await.unwrap();
1902
1903 sink.pre_commit(1).await.unwrap();
1905 assert_eq!(sink.staged_rows, 50);
1906 assert_eq!(sink.staged_batches.len(), 2);
1907 assert_eq!(sink.buffered_rows(), 0);
1908
1909 sink.rollback_epoch(1).await.unwrap();
1917 assert!(sink.staged_batches.is_empty());
1918 assert_eq!(sink.staged_rows, 0);
1919 assert_eq!(sink.staged_bytes, 0);
1920 }
1921
1922 #[tokio::test]
1923 async fn test_commit_empty_epoch() {
1924 let mut sink = DeltaLakeSink::new(test_config(), None);
1925 sink.state = ConnectorState::Running;
1926
1927 sink.begin_epoch(1).await.unwrap();
1928 sink.commit_epoch(1).await.unwrap();
1930 assert_eq!(sink.last_committed_epoch(), 1);
1931 assert_eq!(sink.delta_version(), 0); }
1933
1934 #[tokio::test]
1938 async fn test_flush_coalesces_buffer() {
1939 let mut config = test_config();
1940 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1941 config.writer_id = "test-writer".to_string();
1942 let mut sink = DeltaLakeSink::new(config, None);
1943 sink.state = ConnectorState::Running;
1944
1945 let batch = test_batch(10);
1946 sink.write_batch(&batch).await.unwrap();
1947 sink.write_batch(&batch).await.unwrap();
1948 assert_eq!(sink.buffer.len(), 2);
1949
1950 sink.flush().await.unwrap();
1952 assert_eq!(sink.buffer.len(), 1);
1953 assert_eq!(sink.buffered_rows(), 20);
1954 }
1955
1956 #[tokio::test]
1962 async fn test_close() {
1963 let mut sink = DeltaLakeSink::new(test_config(), None);
1964 sink.state = ConnectorState::Running;
1965
1966 sink.close().await.unwrap();
1967 assert_eq!(sink.state(), ConnectorState::Closed);
1968 }
1969
1970 #[test]
1973 fn test_health_check_created() {
1974 let sink = DeltaLakeSink::new(test_config(), None);
1975 assert_eq!(sink.health_check(), HealthStatus::Unknown);
1976 }
1977
1978 #[test]
1979 fn test_health_check_running() {
1980 let mut sink = DeltaLakeSink::new(test_config(), None);
1981 sink.state = ConnectorState::Running;
1982 assert_eq!(sink.health_check(), HealthStatus::Healthy);
1983 }
1984
1985 #[test]
1986 fn test_health_check_closed() {
1987 let mut sink = DeltaLakeSink::new(test_config(), None);
1988 sink.state = ConnectorState::Closed;
1989 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1990 }
1991
1992 #[test]
1993 fn test_health_check_failed() {
1994 let mut sink = DeltaLakeSink::new(test_config(), None);
1995 sink.state = ConnectorState::Failed;
1996 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1997 }
1998
1999 #[test]
2000 fn test_health_check_paused() {
2001 let mut sink = DeltaLakeSink::new(test_config(), None);
2002 sink.state = ConnectorState::Paused;
2003 assert!(matches!(sink.health_check(), HealthStatus::Degraded(_)));
2004 }
2005
2006 #[test]
2009 fn test_capabilities_append_exactly_once() {
2010 let mut config = test_config();
2011 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2012 let sink = DeltaLakeSink::new(config, None);
2013 let caps = sink.capabilities();
2014 assert!(caps.exactly_once);
2015 assert!(caps.idempotent);
2016 assert!(!caps.upsert);
2017 assert!(!caps.changelog);
2018 assert!(!caps.schema_evolution);
2019 assert!(!caps.partitioned);
2020 }
2021
2022 #[test]
2023 fn test_capabilities_upsert() {
2024 let sink = DeltaLakeSink::new(upsert_config(), None);
2025 let caps = sink.capabilities();
2026 assert!(caps.upsert);
2027 assert!(caps.changelog);
2028 assert!(caps.idempotent);
2029 }
2030
2031 #[test]
2032 fn test_capabilities_schema_evolution() {
2033 let mut config = test_config();
2034 config.schema_evolution = true;
2035 let sink = DeltaLakeSink::new(config, None);
2036 let caps = sink.capabilities();
2037 assert!(caps.schema_evolution);
2038 }
2039
2040 #[test]
2041 fn test_capabilities_partitioned() {
2042 let mut config = test_config();
2043 config.partition_columns = vec!["trade_date".to_string()];
2044 let sink = DeltaLakeSink::new(config, None);
2045 let caps = sink.capabilities();
2046 assert!(caps.partitioned);
2047 }
2048
2049 #[test]
2050 fn test_capabilities_at_least_once() {
2051 let mut config = test_config();
2052 config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
2053 let sink = DeltaLakeSink::new(config, None);
2054 let caps = sink.capabilities();
2055 assert!(!caps.exactly_once);
2056 assert!(caps.idempotent);
2057 }
2058
2059 #[test]
2062 fn test_metrics_initial() {
2063 let sink = DeltaLakeSink::new(test_config(), None);
2064 let m = sink.metrics();
2065 assert_eq!(m.records_total, 0);
2066 assert_eq!(m.bytes_total, 0);
2067 assert_eq!(m.errors_total, 0);
2068 }
2069
2070 fn changelog_schema() -> SchemaRef {
2073 Arc::new(Schema::new(vec![
2074 Field::new("id", DataType::Int64, false),
2075 Field::new("name", DataType::Utf8, true),
2076 Field::new("_op", DataType::Utf8, false),
2077 Field::new("_ts_ms", DataType::Int64, false),
2078 ]))
2079 }
2080
2081 fn changelog_batch() -> RecordBatch {
2082 RecordBatch::try_new(
2083 changelog_schema(),
2084 vec![
2085 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
2086 Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
2087 Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
2088 Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
2089 ],
2090 )
2091 .unwrap()
2092 }
2093
2094 #[test]
2095 fn test_split_changelog_batch() {
2096 let batch = changelog_batch();
2097 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2098
2099 assert_eq!(inserts.num_rows(), 3);
2101 assert_eq!(deletes.num_rows(), 2);
2103
2104 assert_eq!(inserts.num_columns(), 2); assert_eq!(deletes.num_columns(), 2);
2107
2108 let insert_ids = inserts
2110 .column(0)
2111 .as_any()
2112 .downcast_ref::<Int64Array>()
2113 .unwrap();
2114 assert_eq!(insert_ids.value(0), 1);
2115 assert_eq!(insert_ids.value(1), 2);
2116 assert_eq!(insert_ids.value(2), 4);
2117
2118 let delete_ids = deletes
2120 .column(0)
2121 .as_any()
2122 .downcast_ref::<Int64Array>()
2123 .unwrap();
2124 assert_eq!(delete_ids.value(0), 3);
2125 assert_eq!(delete_ids.value(1), 5);
2126 }
2127
2128 #[test]
2129 fn test_split_changelog_all_inserts() {
2130 let schema = changelog_schema();
2131 let batch = RecordBatch::try_new(
2132 schema,
2133 vec![
2134 Arc::new(Int64Array::from(vec![1, 2])),
2135 Arc::new(StringArray::from(vec!["a", "b"])),
2136 Arc::new(StringArray::from(vec!["I", "I"])),
2137 Arc::new(Int64Array::from(vec![100, 200])),
2138 ],
2139 )
2140 .unwrap();
2141
2142 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2143 assert_eq!(inserts.num_rows(), 2);
2144 assert_eq!(deletes.num_rows(), 0);
2145 }
2146
2147 #[test]
2148 fn test_split_changelog_all_deletes() {
2149 let schema = changelog_schema();
2150 let batch = RecordBatch::try_new(
2151 schema,
2152 vec![
2153 Arc::new(Int64Array::from(vec![1, 2])),
2154 Arc::new(StringArray::from(vec!["a", "b"])),
2155 Arc::new(StringArray::from(vec!["D", "D"])),
2156 Arc::new(Int64Array::from(vec![100, 200])),
2157 ],
2158 )
2159 .unwrap();
2160
2161 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2162 assert_eq!(inserts.num_rows(), 0);
2163 assert_eq!(deletes.num_rows(), 2);
2164 }
2165
2166 #[test]
2167 fn test_split_changelog_missing_op_column() {
2168 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
2169 let batch =
2170 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
2171
2172 let result = DeltaLakeSink::split_changelog_batch(&batch);
2173 assert!(result.is_err());
2174 }
2175
2176 #[test]
2177 fn test_split_changelog_snapshot_read() {
2178 let schema = changelog_schema();
2179 let batch = RecordBatch::try_new(
2180 schema,
2181 vec![
2182 Arc::new(Int64Array::from(vec![1])),
2183 Arc::new(StringArray::from(vec!["a"])),
2184 Arc::new(StringArray::from(vec!["r"])), Arc::new(Int64Array::from(vec![100])),
2186 ],
2187 )
2188 .unwrap();
2189
2190 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2191 assert_eq!(inserts.num_rows(), 1);
2192 assert_eq!(deletes.num_rows(), 0);
2193 }
2194
2195 #[test]
2198 fn test_debug_output() {
2199 let sink = DeltaLakeSink::new(test_config(), None);
2200 let debug = format!("{sink:?}");
2201 assert!(debug.contains("DeltaLakeSink"));
2202 assert!(debug.contains("delta_test_nonexistent_8f3a"));
2203 }
2204}