1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::{Array, RecordBatch};
27use arrow_schema::SchemaRef;
28use async_trait::async_trait;
29use tracing::{debug, info, warn};
30
31#[cfg(feature = "delta-lake")]
32use deltalake::DeltaTable;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::protocol::SaveMode;
36
37use crate::config::{ConnectorConfig, ConnectorState};
38use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
39use crate::error::ConnectorError;
40
41use super::delta_config::{DeltaLakeSinkConfig, DeltaWriteMode};
42use super::delta_metrics::DeltaLakeSinkMetrics;
43use crate::connector::DeliveryGuarantee;
44
45#[cfg(feature = "delta-lake")]
49fn count_collapsed_ops(batch: &RecordBatch) -> (u64, u64) {
50 let Ok(idx) = batch.schema().index_of("_op") else {
51 return (0, 0);
52 };
53 let Some(ops) = batch
54 .column(idx)
55 .as_any()
56 .downcast_ref::<arrow_array::StringArray>()
57 else {
58 return (0, 0);
59 };
60 let deletes = (0..ops.len())
61 .filter(|&i| !ops.is_null(i) && ops.value(i) == "D")
62 .count() as u64;
63 let upserts = ops.len() as u64 - deletes;
64 (upserts, deletes)
65}
66
67pub struct DeltaLakeSink {
95 config: DeltaLakeSinkConfig,
97 schema: Option<SchemaRef>,
99 state: ConnectorState,
101 current_epoch: u64,
103 last_committed_epoch: u64,
105 buffer: Vec<RecordBatch>,
107 buffered_rows: usize,
109 buffered_bytes: u64,
111 delta_version: u64,
113 buffer_start_time: Option<Instant>,
115 metrics: DeltaLakeSinkMetrics,
117 #[cfg(feature = "delta-lake")]
119 table: Option<DeltaTable>,
120 epoch_skipped: bool,
122 staged_batches: Vec<RecordBatch>,
126 staged_rows: usize,
128 staged_bytes: u64,
130 #[cfg(feature = "delta-lake")]
134 resolved_table_path: String,
135 #[cfg(feature = "delta-lake")]
137 resolved_storage_options: std::collections::HashMap<String, String>,
138 #[cfg(feature = "delta-lake")]
140 compaction_cancel: Option<tokio_util::sync::CancellationToken>,
141 #[cfg(feature = "delta-lake")]
143 compaction_handle: Option<tokio::task::JoinHandle<()>>,
144 #[cfg(feature = "delta-lake")]
148 needs_deferred_delta_init: bool,
149 #[cfg(feature = "delta-lake")]
152 pending_reopen: Option<tokio::task::JoinHandle<Result<DeltaTable, ConnectorError>>>,
153 #[cfg(feature = "delta-lake")]
158 cached_writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
159 #[cfg(feature = "delta-lake")]
164 merge_session: Option<datafusion::prelude::SessionContext>,
165}
166
167impl DeltaLakeSink {
168 #[must_use]
170 pub fn new(config: DeltaLakeSinkConfig, registry: Option<&prometheus::Registry>) -> Self {
171 Self {
172 config,
173 schema: None,
174 state: ConnectorState::Created,
175 current_epoch: 0,
176 last_committed_epoch: 0,
177 buffer: Vec::with_capacity(16),
178 buffered_rows: 0,
179 buffered_bytes: 0,
180 delta_version: 0,
181 buffer_start_time: None,
182 metrics: DeltaLakeSinkMetrics::new(registry),
183 epoch_skipped: false,
184 staged_batches: Vec::new(),
185 staged_rows: 0,
186 staged_bytes: 0,
187 #[cfg(feature = "delta-lake")]
188 table: None,
189 #[cfg(feature = "delta-lake")]
190 resolved_table_path: String::new(),
191 #[cfg(feature = "delta-lake")]
192 resolved_storage_options: std::collections::HashMap::new(),
193 #[cfg(feature = "delta-lake")]
194 compaction_cancel: None,
195 #[cfg(feature = "delta-lake")]
196 compaction_handle: None,
197 #[cfg(feature = "delta-lake")]
198 needs_deferred_delta_init: false,
199 #[cfg(feature = "delta-lake")]
200 pending_reopen: None,
201 #[cfg(feature = "delta-lake")]
202 cached_writer_properties: None,
203 #[cfg(feature = "delta-lake")]
204 merge_session: None,
205 }
206 }
207
208 #[must_use]
214 pub fn with_schema(config: DeltaLakeSinkConfig, schema: SchemaRef) -> Self {
215 let write_mode = config.write_mode;
216 let mut sink = Self::new(config, None);
217 sink.schema = Some(if write_mode == DeltaWriteMode::Upsert {
218 Self::target_schema(&schema, write_mode)
219 } else {
220 schema
221 });
222 sink
223 }
224
225 #[cfg(feature = "delta-lake")]
230 async fn init_delta_table(&mut self) -> Result<(), ConnectorError> {
231 use super::delta_io;
232
233 #[cfg(feature = "delta-lake-unity")]
236 ensure_uc_table_exists(&self.config, self.schema.as_ref()).await?;
237
238 let (resolved_path, mut merged_options) = delta_io::resolve_catalog_options(
241 &self.config.catalog_type,
242 self.config.catalog_database.as_deref(),
243 self.config.catalog_name.as_deref(),
244 self.config.catalog_schema.as_deref(),
245 &self.config.table_path,
246 &self.config.storage_options,
247 )
248 .await?;
249
250 merged_options
254 .entry("timeout".to_string())
255 .or_insert_with(|| "120s".to_string());
256 merged_options
257 .entry("connect_timeout".to_string())
258 .or_insert_with(|| "30s".to_string());
259 merged_options
260 .entry("pool_idle_timeout".to_string())
261 .or_insert_with(|| "60s".to_string());
262
263 self.resolved_table_path.clone_from(&resolved_path);
265 self.resolved_storage_options.clone_from(&merged_options);
266
267 let init_timeout = self
268 .config
269 .write_timeout
270 .max(std::time::Duration::from_secs(120));
271 let table = tokio::time::timeout(
272 init_timeout,
273 delta_io::open_or_create_table(
274 &resolved_path,
275 merged_options.clone(),
276 self.schema.as_ref(),
277 ),
278 )
279 .await
280 .map_err(|_| {
281 ConnectorError::ConnectionFailed(format!(
282 "Delta table init timed out after {}s",
283 init_timeout.as_secs()
284 ))
285 })??;
286
287 if self.schema.is_none() {
289 if let Ok(schema) = delta_io::get_table_schema(&table) {
290 self.schema = Some(schema);
291 }
292 }
293
294 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
296 self.last_committed_epoch =
297 delta_io::get_last_committed_epoch(&table, &self.config.writer_id).await;
298 if self.last_committed_epoch > 0 {
299 info!(
300 writer_id = %self.config.writer_id,
301 last_committed_epoch = self.last_committed_epoch,
302 "recovered last committed epoch from Delta Lake txn metadata"
303 );
304 }
305 }
306
307 #[allow(clippy::cast_sign_loss)]
309 {
310 self.delta_version = table.version().unwrap_or(0) as u64;
311 }
312 self.table = Some(table);
313
314 self.cached_writer_properties = self.config.parquet.to_writer_properties().ok();
322 if self.config.write_mode == DeltaWriteMode::Upsert {
323 self.merge_session = Some(datafusion::prelude::SessionContext::new());
324 }
325
326 if self.config.compaction.enabled {
330 let cancel = tokio_util::sync::CancellationToken::new();
331 let compaction_props = self.config.parquet.compaction_writer_properties().ok();
332 let handle = tokio::spawn(compaction_loop(
333 resolved_path.clone(),
334 Arc::new(merged_options),
335 self.config.compaction.clone(),
336 self.config.vacuum_retention,
337 compaction_props,
338 cancel.clone(),
339 ));
340 self.compaction_cancel = Some(cancel);
341 self.compaction_handle = Some(handle);
342 }
343
344 Ok(())
345 }
346
347 #[must_use]
349 pub fn state(&self) -> ConnectorState {
350 self.state
351 }
352
353 #[must_use]
355 pub fn current_epoch(&self) -> u64 {
356 self.current_epoch
357 }
358
359 #[must_use]
361 pub fn last_committed_epoch(&self) -> u64 {
362 self.last_committed_epoch
363 }
364
365 #[must_use]
367 pub fn buffered_rows(&self) -> usize {
368 self.buffered_rows
369 }
370
371 #[must_use]
373 pub fn buffered_bytes(&self) -> u64 {
374 self.buffered_bytes
375 }
376
377 #[must_use]
379 pub fn delta_version(&self) -> u64 {
380 self.delta_version
381 }
382
383 #[must_use]
385 pub fn sink_metrics(&self) -> &DeltaLakeSinkMetrics {
386 &self.metrics
387 }
388
389 #[must_use]
391 pub fn config(&self) -> &DeltaLakeSinkConfig {
392 &self.config
393 }
394
395 #[must_use]
397 pub fn should_flush(&self) -> bool {
398 if self.buffered_rows >= self.config.max_buffer_records {
399 return true;
400 }
401 if self.buffered_bytes >= self.config.target_file_size as u64 {
402 return true;
403 }
404 if let Some(start) = self.buffer_start_time {
405 if start.elapsed() >= self.config.max_buffer_duration {
406 return true;
407 }
408 }
409 false
410 }
411
412 const CHANGELOG_METADATA_COLUMNS: &'static [&'static str] =
417 &["_op", "_ts_ms", laminar_core::changelog::WEIGHT_COLUMN];
418
419 fn target_schema(batch_schema: &SchemaRef, write_mode: DeltaWriteMode) -> SchemaRef {
420 if write_mode == DeltaWriteMode::Upsert {
421 let fields: Vec<_> = batch_schema
422 .fields()
423 .iter()
424 .filter(|f| !Self::CHANGELOG_METADATA_COLUMNS.contains(&f.name().as_str()))
425 .cloned()
426 .collect();
427 Arc::new(arrow_schema::Schema::new(fields))
428 } else {
429 batch_schema.clone()
430 }
431 }
432
433 #[must_use]
435 pub fn estimate_batch_size(batch: &RecordBatch) -> u64 {
436 batch
437 .columns()
438 .iter()
439 .map(|col| col.get_array_memory_size() as u64)
440 .sum()
441 }
442
443 #[cfg(feature = "delta-lake")]
447 fn is_conflict_error(err: &ConnectorError) -> bool {
448 let msg = err.to_string().to_lowercase();
449 msg.contains("conflicting commit")
450 || msg.contains("version already exists")
451 || msg.contains("concurrent")
452 || (msg.contains("conflict") && !msg.contains("log") && !msg.contains("corrupt"))
453 }
454
455 #[cfg(feature = "delta-lake")]
459 async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
460 use super::delta_io;
461
462 let table = delta_io::open_or_create_table(
463 &self.resolved_table_path,
464 self.resolved_storage_options.clone(),
465 self.schema.as_ref(),
466 )
467 .await?;
468
469 #[allow(clippy::cast_sign_loss)]
470 {
471 self.delta_version = table.version().unwrap_or(0) as u64;
472 }
473 self.table = Some(table);
474 Ok(())
475 }
476
477 #[cfg(feature = "delta-lake")]
481 fn schedule_background_reopen(&mut self) {
482 if let Some(prev) = self.pending_reopen.take() {
483 prev.abort();
484 }
485 let path = self.resolved_table_path.clone();
486 let opts = self.resolved_storage_options.clone();
487 let schema = self.schema.clone();
488 self.pending_reopen = Some(tokio::spawn(async move {
489 super::delta_io::open_or_create_table(&path, opts, schema.as_ref()).await
490 }));
491 }
492
493 #[cfg(feature = "delta-lake")]
497 async fn try_install_pending_reopen(&mut self, timeout: std::time::Duration) -> bool {
498 let Some(mut pending) = self.pending_reopen.take() else {
499 return false;
500 };
501 let table = match tokio::time::timeout(timeout, &mut pending).await {
502 Ok(Ok(Ok(t))) => t,
503 Ok(Ok(Err(e))) => {
504 warn!(error = %e, "Delta background reopen failed");
505 return false;
506 }
507 Ok(Err(e)) => {
508 warn!(error = %e, "Delta background reopen task ended unexpectedly");
509 return false;
510 }
511 Err(_) => {
512 warn!(
513 timeout_secs = timeout.as_secs(),
514 "Delta background reopen timed out"
515 );
516 pending.abort();
517 return false;
518 }
519 };
520 #[allow(clippy::cast_sign_loss)]
521 {
522 self.delta_version = table.version().unwrap_or(0) as u64;
523 }
524 self.table = Some(table);
525 true
526 }
527
528 #[cfg(feature = "delta-lake")]
530 async fn attempt_delta_write(
531 &mut self,
532 table: DeltaTable,
533 ) -> Result<DeltaTable, ConnectorError> {
534 let batches: Vec<RecordBatch> = self.staged_batches.clone();
537
538 if self.config.write_mode == DeltaWriteMode::Upsert {
539 let combined = if batches.len() == 1 {
544 batches.into_iter().next().expect("len == 1 checked")
545 } else {
546 match arrow_select::concat::concat_batches(&batches[0].schema(), &batches) {
547 Ok(c) => c,
548 Err(e) => {
549 self.table = Some(table);
551 return Err(ConnectorError::Internal(format!(
552 "failed to concat batches: {e}"
553 )));
554 }
555 }
556 };
557
558 let writer_props = self.cached_writer_properties.clone();
562 let merge_session = self
563 .merge_session
564 .as_ref()
565 .expect("merge_session built in init_delta_table for Upsert mode");
566
567 super::delta_io::merge_changelog(
568 table,
569 combined,
570 &self.config.merge_key_columns,
571 &self.config.writer_id,
572 self.current_epoch,
573 self.config.schema_evolution,
574 writer_props,
575 merge_session,
576 )
577 .await
578 .map(|(t, result)| {
579 self.metrics.record_merge();
580 if result.rows_deleted > 0 {
581 self.metrics.record_deletes(result.rows_deleted as u64);
582 }
583 t
584 })
585 } else {
586 let save_mode = match self.config.write_mode {
588 DeltaWriteMode::Append => SaveMode::Append,
589 DeltaWriteMode::Overwrite => SaveMode::Overwrite,
590 DeltaWriteMode::Upsert => unreachable!("handled by the upsert branch above"),
591 };
592
593 let partition_cols = if self.config.partition_columns.is_empty() {
594 None
595 } else {
596 Some(self.config.partition_columns.as_slice())
597 };
598
599 let should_checkpoint = self.config.checkpoint_interval > 0
601 && self.delta_version > 0
602 && (self.delta_version + 1).is_multiple_of(self.config.checkpoint_interval);
603
604 super::delta_io::write_batches(
605 table,
606 batches,
607 &self.config.writer_id,
608 self.current_epoch,
609 save_mode,
610 partition_cols,
611 self.config.schema_evolution,
612 Some(self.config.target_file_size),
613 should_checkpoint,
614 self.cached_writer_properties.clone(),
615 )
616 .await
617 .map(|(t, _version)| t)
618 }
619 }
620
621 #[cfg(feature = "delta-lake")]
626 #[allow(clippy::too_many_lines)]
627 async fn flush_staged_to_delta(&mut self) -> Result<WriteResult, ConnectorError> {
628 if self.staged_batches.is_empty() {
629 return Ok(WriteResult::new(0, 0));
630 }
631
632 if self.config.write_mode == DeltaWriteMode::Upsert {
641 let combined = if self.staged_batches.len() == 1 {
642 self.staged_batches[0].clone()
643 } else {
644 let schema = self.staged_batches[0].schema();
645 arrow_select::concat::concat_batches(&schema, &self.staged_batches).map_err(
646 |e| ConnectorError::Internal(format!("failed to concat staged batches: {e}")),
647 )?
648 };
649 let rows_in = combined.num_rows() as u64;
650 let collapse_start = Instant::now();
651 let collapsed =
652 crate::changelog::collapse_changelog(&combined, &self.config.merge_key_columns)?;
653 let (upserts_out, deletes_out) = count_collapsed_ops(&collapsed);
654 self.metrics.observe_collapse(
655 rows_in,
656 upserts_out,
657 deletes_out,
658 collapse_start.elapsed().as_secs_f64(),
659 );
660 self.staged_batches.clear();
661 self.staged_batches.push(collapsed);
662 }
663
664 let total_rows = self.staged_rows;
665 let estimated_bytes = self.staged_bytes;
666 let flush_start = Instant::now();
667
668 let backoff_ms = [100u64, 500, 2000];
670 let max_attempts = (self.config.max_commit_retries as usize).saturating_add(1);
671 let mut last_error: Option<ConnectorError> = None;
672
673 for attempt in 0..max_attempts {
674 if self.table.is_none() {
678 let reopen_timeout = self.config.write_timeout;
679 if !self.try_install_pending_reopen(reopen_timeout).await {
680 tokio::time::timeout(reopen_timeout, self.reopen_table())
681 .await
682 .map_err(|_| {
683 ConnectorError::ConnectionFailed(format!(
684 "Delta table reopen timed out after {}s",
685 reopen_timeout.as_secs()
686 ))
687 })??;
688 }
689 }
690
691 let table = self
692 .table
693 .take()
694 .ok_or_else(|| ConnectorError::InvalidState {
695 expected: "table initialized".into(),
696 actual: "table not initialized".into(),
697 })?;
698
699 let write_timeout = self.config.write_timeout;
703 let write_result =
704 tokio::time::timeout(write_timeout, self.attempt_delta_write(table)).await;
705
706 let write_result = match write_result {
709 Ok(inner) => inner,
710 Err(_elapsed) => Err(ConnectorError::WriteError(format!(
711 "Delta write timed out after {}s",
712 write_timeout.as_secs()
713 ))),
714 };
715
716 match write_result {
717 Ok(table) => {
718 #[allow(clippy::cast_sign_loss)]
719 {
720 self.delta_version = table.version().unwrap_or(0) as u64;
721 }
722
723 let crossed_checkpoint = self.config.checkpoint_interval > 0
728 && self.delta_version > 0
729 && self
730 .delta_version
731 .is_multiple_of(self.config.checkpoint_interval);
732 if crossed_checkpoint {
733 self.table = None;
734 self.schedule_background_reopen();
735 } else {
736 self.table = Some(table);
737 }
738
739 self.staged_batches.clear();
740 self.staged_rows = 0;
741 self.staged_bytes = 0;
742
743 self.metrics
744 .record_flush(total_rows as u64, estimated_bytes);
745 self.metrics.record_commit(self.delta_version);
746 self.metrics
747 .observe_flush_duration(flush_start.elapsed().as_secs_f64());
748
749 debug!(
750 rows = total_rows,
751 bytes = estimated_bytes,
752 delta_version = self.delta_version,
753 attempt = attempt + 1,
754 reopened = crossed_checkpoint,
755 "Delta Lake: committed staged data to Delta"
756 );
757
758 return Ok(WriteResult::new(total_rows, estimated_bytes));
759 }
760 Err(e) => {
761 if Self::is_conflict_error(&e) && attempt + 1 < max_attempts {
762 self.metrics.record_conflict();
763 self.metrics.record_retry();
764 let base = backoff_ms.get(attempt).copied().unwrap_or(2000);
767 let delay_ms = jittered_backoff_ms(base);
768 warn!(
769 attempt = attempt + 1,
770 max_attempts,
771 delay_ms,
772 error = %e,
773 "Delta Lake: conflict error, retrying after backoff"
774 );
775 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
776 last_error = Some(e);
777 continue;
779 }
780 self.metrics
782 .observe_flush_duration(flush_start.elapsed().as_secs_f64());
783 return Err(e);
784 }
785 }
786 }
787
788 Err(last_error.unwrap_or_else(|| {
790 ConnectorError::Internal("flush_staged_to_delta: no attempts made".into())
791 }))
792 }
793
794 pub fn split_changelog_batch(
807 batch: &RecordBatch,
808 ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
809 let op_idx = batch.schema().index_of("_op").map_err(|_| {
810 ConnectorError::ConfigurationError(
811 "upsert mode requires '_op' column in input schema".into(),
812 )
813 })?;
814
815 let op_array = batch
816 .column(op_idx)
817 .as_any()
818 .downcast_ref::<arrow_array::StringArray>()
819 .ok_or_else(|| {
820 ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
821 })?;
822
823 let len = op_array.len();
825 let mut insert_mask = Vec::with_capacity(len);
826 let mut delete_mask = Vec::with_capacity(len);
827
828 for i in 0..len {
829 if op_array.is_null(i) {
830 insert_mask.push(false);
831 delete_mask.push(false);
832 continue;
833 }
834 match op_array.value(i) {
835 "I" | "U" | "r" => {
836 insert_mask.push(true);
837 delete_mask.push(false);
838 }
839 "D" => {
840 insert_mask.push(false);
841 delete_mask.push(true);
842 }
843 _ => {
844 insert_mask.push(false);
845 delete_mask.push(false);
846 }
847 }
848 }
849
850 let user_col_indices: Vec<usize> = batch
852 .schema()
853 .fields()
854 .iter()
855 .enumerate()
856 .filter(|(_, f)| !f.name().starts_with('_'))
857 .map(|(i, _)| i)
858 .collect();
859
860 let insert_batch = filter_and_project(batch, insert_mask, &user_col_indices)?;
861 let delete_batch = filter_and_project(batch, delete_mask, &user_col_indices)?;
862
863 Ok((insert_batch, delete_batch))
864 }
865}
866
867#[cfg(feature = "delta-lake")]
871#[allow(clippy::too_many_lines)]
872async fn compaction_loop(
873 table_path: String,
874 storage_options: Arc<std::collections::HashMap<String, String>>,
875 config: super::delta_config::CompactionConfig,
876 vacuum_retention: std::time::Duration,
877 compaction_props: Option<deltalake::parquet::file::properties::WriterProperties>,
878 cancel: tokio_util::sync::CancellationToken,
879) {
880 use super::delta_io;
881
882 const MIN_COMPACTION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
884
885 let base_interval = config.check_interval;
886 let mut current_interval = base_interval;
887 let mut consecutive_skips: u32 = 0;
888
889 info!(
890 table_path = %table_path,
891 check_interval_secs = base_interval.as_secs(),
892 "compaction background task started (adaptive interval)"
893 );
894
895 tokio::select! {
897 () = cancel.cancelled() => {
898 info!("compaction background task cancelled");
899 return;
900 }
901 () = tokio::time::sleep(current_interval) => {}
902 }
903
904 loop {
905 let table =
907 match delta_io::open_or_create_table(&table_path, (*storage_options).clone(), None)
908 .await
909 {
910 Ok(t) => t,
911 Err(e) => {
912 warn!(error = %e, "compaction: failed to open table, will retry");
913 tokio::select! {
914 () = cancel.cancelled() => {
915 info!("compaction background task cancelled");
916 return;
917 }
918 () = tokio::time::sleep(current_interval) => {}
919 }
920 continue;
921 }
922 };
923
924 let should_compact = match table.snapshot() {
926 Ok(snapshot) => {
927 let file_count = snapshot.log_data().num_files();
928 if file_count < config.min_files_for_compaction {
929 debug!(
930 file_count,
931 min = config.min_files_for_compaction,
932 "compaction: skipping, not enough files"
933 );
934 false
935 } else {
936 true
937 }
938 }
939 Err(e) => {
940 warn!(error = %e, "compaction: snapshot failed, skipping tick");
941 false
942 }
943 };
944
945 if should_compact {
946 consecutive_skips = 0;
947 current_interval = (current_interval / 2).max(MIN_COMPACTION_INTERVAL);
949
950 let target_size = config.target_file_size as u64;
951 match delta_io::run_compaction(
954 table,
955 target_size,
956 &config.z_order_columns,
957 compaction_props.clone(),
958 )
959 .await
960 {
961 Ok((table, result)) => {
962 debug!(
963 files_added = result.files_added,
964 files_removed = result.files_removed,
965 interval_secs = current_interval.as_secs(),
966 "compaction: OPTIMIZE complete"
967 );
968
969 match delta_io::run_vacuum(table, vacuum_retention).await {
971 Ok((_table, files_deleted)) => {
972 debug!(files_deleted, "compaction: VACUUM complete");
973 }
974 Err(e) => {
975 warn!(error = %e, "compaction: VACUUM failed");
976 }
977 }
978 }
979 Err(e) => {
980 warn!(error = %e, "compaction: OPTIMIZE failed");
981 }
982 }
983 } else {
984 consecutive_skips = consecutive_skips.saturating_add(1);
985 if consecutive_skips >= 2 {
987 current_interval = (current_interval * 2).min(base_interval);
988 }
989 }
990
991 tokio::select! {
993 () = cancel.cancelled() => {
994 info!("compaction background task cancelled");
995 return;
996 }
997 () = tokio::time::sleep(current_interval) => {}
998 }
999 }
1000}
1001
1002#[cfg(all(feature = "delta-lake", feature = "delta-lake-unity"))]
1006async fn ensure_uc_table_exists(
1007 config: &DeltaLakeSinkConfig,
1008 schema: Option<&SchemaRef>,
1009) -> Result<(), ConnectorError> {
1010 let super::delta_config::DeltaCatalogType::Unity {
1011 ref workspace_url,
1012 ref access_token,
1013 } = config.catalog_type
1014 else {
1015 return Ok(());
1016 };
1017
1018 let Some(ref storage_location) = config.catalog_storage_location else {
1019 return Ok(());
1020 };
1021
1022 let Some(arrow_schema) = schema else {
1023 warn!(
1024 "catalog.storage.location is set but no schema available — \
1025 skipping Unity Catalog auto-create"
1026 );
1027 return Ok(());
1028 };
1029
1030 let catalog = config.catalog_name.as_deref().unwrap_or_default();
1031 let schema_name = config.catalog_schema.as_deref().unwrap_or_default();
1032 let table_name = config
1033 .table_path
1034 .strip_prefix("uc://")
1035 .and_then(|s| s.rsplit('.').next())
1036 .unwrap_or(&config.table_path);
1037
1038 let columns = super::unity_catalog::arrow_to_uc_columns(arrow_schema);
1039 super::unity_catalog::create_uc_table(
1040 workspace_url,
1041 access_token,
1042 catalog,
1043 schema_name,
1044 table_name,
1045 storage_location,
1046 &columns,
1047 )
1048 .await
1049}
1050
1051#[async_trait]
1052impl SinkConnector for DeltaLakeSink {
1053 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
1054 self.state = ConnectorState::Initializing;
1055
1056 if !config.properties().is_empty() {
1058 self.config = DeltaLakeSinkConfig::from_config(config)?;
1059 }
1060
1061 info!(
1062 table_path = %self.config.table_path,
1063 mode = %self.config.write_mode,
1064 guarantee = %self.config.delivery_guarantee,
1065 "opening Delta Lake sink connector"
1066 );
1067
1068 #[cfg(feature = "delta-lake")]
1072 {
1073 let should_defer = matches!(
1074 self.config.catalog_type,
1075 super::delta_config::DeltaCatalogType::Unity { .. }
1076 ) && self.config.catalog_storage_location.is_some()
1077 && self.schema.is_none();
1078
1079 if should_defer {
1080 info!(
1081 "Unity Catalog auto-create configured but pipeline schema not yet \
1082 available — deferring Delta table init to first begin_epoch"
1083 );
1084 self.needs_deferred_delta_init = true;
1085 self.state = ConnectorState::Initializing;
1086 return Ok(());
1087 }
1088
1089 self.init_delta_table().await?;
1090
1091 if self.table.as_ref().is_some_and(|t| t.version().is_none()) && self.schema.is_none() {
1094 self.needs_deferred_delta_init = true;
1095 self.state = ConnectorState::Initializing;
1096 return Ok(());
1097 }
1098 }
1099
1100 #[cfg(not(feature = "delta-lake"))]
1101 {
1102 self.state = ConnectorState::Failed;
1103 return Err(ConnectorError::ConfigurationError(
1104 "Delta Lake sink requires the 'delta-lake' feature to be enabled. \
1105 Build with: cargo build --features delta-lake"
1106 .into(),
1107 ));
1108 }
1109
1110 #[cfg(feature = "delta-lake")]
1111 {
1112 self.state = ConnectorState::Running;
1113 info!("Delta Lake sink connector opened successfully");
1114 Ok(())
1115 }
1116 }
1117
1118 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
1119 if self.state != ConnectorState::Running && self.state != ConnectorState::Initializing {
1121 return Err(ConnectorError::InvalidState {
1122 expected: "Running".into(),
1123 actual: self.state.to_string(),
1124 });
1125 }
1126
1127 if batch.num_rows() == 0 {
1128 return Ok(WriteResult::new(0, 0));
1129 }
1130
1131 if self.epoch_skipped {
1132 return Ok(WriteResult::new(0, 0));
1133 }
1134
1135 if self.schema.is_none() {
1138 self.schema = Some(Self::target_schema(&batch.schema(), self.config.write_mode));
1139 }
1140
1141 #[cfg(feature = "delta-lake")]
1145 if self.needs_deferred_delta_init {
1146 info!("schema now available from first batch — completing deferred Delta table init");
1147 match self.init_delta_table().await {
1148 Ok(()) => {
1149 self.needs_deferred_delta_init = false;
1150 self.state = ConnectorState::Running;
1151 info!("Delta Lake sink connector opened successfully (deferred)");
1152 }
1153 Err(e) => {
1154 self.state = ConnectorState::Failed;
1155 return Err(e);
1156 }
1157 }
1158 }
1159
1160 let num_rows = batch.num_rows();
1161 let estimated_bytes = Self::estimate_batch_size(batch);
1162
1163 let pending_rows = self.buffered_rows + self.staged_rows + num_rows;
1178 let pending_bytes = self.buffered_bytes + self.staged_bytes + estimated_bytes;
1179 let row_cap = self
1180 .config
1181 .max_buffer_records
1182 .saturating_mul(4)
1183 .max(num_rows);
1184 let byte_cap = (self.config.target_file_size as u64)
1185 .saturating_mul(4)
1186 .max(estimated_bytes);
1187 if pending_rows > row_cap || pending_bytes > byte_cap {
1188 return Err(ConnectorError::WriteError(format!(
1189 "delta sink buffer full ({pending_rows} rows, \
1190 {pending_bytes} bytes pending; cap {row_cap} rows, \
1191 {byte_cap} bytes) — backpressure until next flush/commit"
1192 )));
1193 }
1194
1195 if self.buffer_start_time.is_none() {
1197 self.buffer_start_time = Some(Instant::now());
1198 }
1199 self.buffer.push(batch.clone());
1200 self.buffered_rows += num_rows;
1201 self.buffered_bytes += estimated_bytes;
1202
1203 #[cfg(feature = "delta-lake")]
1204 if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce && self.should_flush() {
1205 if !self.staged_batches.is_empty() {
1206 self.flush_staged_to_delta().await?;
1207 }
1208 self.staged_batches = std::mem::take(&mut self.buffer);
1209 self.staged_rows = self.buffered_rows;
1210 self.staged_bytes = self.buffered_bytes;
1211 self.buffered_rows = 0;
1212 self.buffered_bytes = 0;
1213 self.buffer_start_time = None;
1214 self.flush_staged_to_delta().await?;
1215 }
1216
1217 Ok(WriteResult::new(0, 0))
1218 }
1219
1220 fn schema(&self) -> SchemaRef {
1221 self.schema
1222 .clone()
1223 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
1224 }
1225
1226 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1227 #[cfg(feature = "delta-lake")]
1233 if self.needs_deferred_delta_init {
1234 if self.schema.is_some() {
1239 info!("schema available — completing deferred Delta table init");
1240 match self.init_delta_table().await {
1241 Ok(()) => {
1242 self.needs_deferred_delta_init = false;
1243 self.state = ConnectorState::Running;
1244 info!("Delta Lake sink connector opened successfully (deferred)");
1245 }
1246 Err(e) => {
1247 self.state = ConnectorState::Failed;
1248 return Err(e);
1249 }
1250 }
1251 }
1252 }
1253
1254 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1256 && epoch <= self.last_committed_epoch
1257 {
1258 warn!(
1259 epoch,
1260 last_committed = self.last_committed_epoch,
1261 "Delta Lake: skipping already-committed epoch"
1262 );
1263 self.epoch_skipped = true;
1264 return Ok(());
1265 }
1266
1267 self.epoch_skipped = false;
1268 self.current_epoch = epoch;
1269 self.buffer.clear();
1270 self.buffered_rows = 0;
1271 self.buffered_bytes = 0;
1272 self.buffer_start_time = None;
1273
1274 debug!(epoch, "Delta Lake: began epoch");
1275 Ok(())
1276 }
1277
1278 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1279 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1281 && epoch <= self.last_committed_epoch
1282 {
1283 return Ok(());
1284 }
1285
1286 if !self.buffer.is_empty() {
1290 self.staged_batches = std::mem::take(&mut self.buffer);
1291 self.staged_rows = self.buffered_rows;
1292 self.staged_bytes = self.buffered_bytes;
1293 self.buffered_rows = 0;
1294 self.buffered_bytes = 0;
1295 self.buffer_start_time = None;
1296 }
1297
1298 debug!(epoch, "Delta Lake: pre-committed (batches staged)");
1299 Ok(())
1300 }
1301
1302 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1303 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1305 && epoch <= self.last_committed_epoch
1306 {
1307 return Ok(());
1308 }
1309
1310 #[cfg(feature = "delta-lake")]
1312 {
1313 if !self.staged_batches.is_empty() {
1314 self.flush_staged_to_delta().await?;
1315 }
1316 }
1317
1318 self.last_committed_epoch = epoch;
1319
1320 info!(
1321 epoch,
1322 delta_version = self.delta_version,
1323 "Delta Lake: committed epoch"
1324 );
1325
1326 Ok(())
1327 }
1328
1329 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1330 self.buffer.clear();
1333 self.buffered_rows = 0;
1334 self.buffered_bytes = 0;
1335 self.buffer_start_time = None;
1336 self.staged_batches.clear();
1337 self.staged_rows = 0;
1338 self.staged_bytes = 0;
1339
1340 self.epoch_skipped = false;
1341 self.metrics.record_rollback();
1342 warn!(epoch, "Delta Lake: rolled back epoch");
1343 Ok(())
1344 }
1345
1346 fn capabilities(&self) -> SinkConnectorCapabilities {
1347 let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(180)).with_idempotent();
1349
1350 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1351 caps = caps.with_exactly_once().with_two_phase_commit();
1352 }
1353 if self.config.write_mode == DeltaWriteMode::Upsert {
1354 caps = caps.with_upsert().with_changelog();
1355 }
1356 if self.config.schema_evolution {
1357 caps = caps.with_schema_evolution();
1358 }
1359 if !self.config.partition_columns.is_empty() {
1360 caps = caps.with_partitioned();
1361 }
1362
1363 caps
1364 }
1365
1366 async fn flush(&mut self) -> Result<(), ConnectorError> {
1367 #[cfg(feature = "delta-lake")]
1371 if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1372 if !self.staged_batches.is_empty() {
1376 self.flush_staged_to_delta().await?;
1377 }
1378
1379 if !self.buffer.is_empty() {
1381 self.staged_batches = std::mem::take(&mut self.buffer);
1382 self.staged_rows = self.buffered_rows;
1383 self.staged_bytes = self.buffered_bytes;
1384 self.buffered_rows = 0;
1385 self.buffered_bytes = 0;
1386 self.buffer_start_time = None;
1387
1388 self.flush_staged_to_delta().await?;
1389 }
1390 return Ok(());
1391 }
1392
1393 if self.buffer.is_empty() {
1394 return Ok(());
1395 }
1396
1397 if self.buffer.len() > 1 {
1400 let schema = self.buffer[0].schema();
1401 let combined = arrow_select::concat::concat_batches(&schema, &self.buffer)
1402 .map_err(|e| ConnectorError::Internal(format!("concat failed: {e}")))?;
1403 self.buffer.clear();
1404 self.buffer.push(combined);
1405 }
1406 Ok(())
1407 }
1408
1409 async fn close(&mut self) -> Result<(), ConnectorError> {
1410 info!("closing Delta Lake sink connector");
1411
1412 #[cfg(feature = "delta-lake")]
1416 if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1417 self.flush().await?;
1418 } else if !self.buffer.is_empty() {
1419 self.pre_commit(self.current_epoch).await?;
1420 self.commit_epoch(self.current_epoch).await?;
1421 }
1422
1423 #[cfg(feature = "delta-lake")]
1425 {
1426 if let Some(cancel) = self.compaction_cancel.take() {
1427 cancel.cancel();
1428 }
1429 if let Some(handle) = self.compaction_handle.take() {
1430 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
1432 }
1433 }
1434
1435 #[cfg(feature = "delta-lake")]
1436 if let Some(pending) = self.pending_reopen.take() {
1437 pending.abort();
1438 }
1439
1440 #[cfg(feature = "delta-lake")]
1442 {
1443 self.table = None;
1444 }
1445
1446 self.state = ConnectorState::Closed;
1447
1448 info!(
1449 table_path = %self.config.table_path,
1450 delta_version = self.delta_version,
1451 "Delta Lake sink connector closed"
1452 );
1453
1454 Ok(())
1455 }
1456}
1457
1458#[cfg(feature = "delta-lake")]
1462impl Drop for DeltaLakeSink {
1463 fn drop(&mut self) {
1464 if let Some(cancel) = self.compaction_cancel.take() {
1465 cancel.cancel();
1466 }
1467 if let Some(handle) = self.compaction_handle.take() {
1468 handle.abort();
1469 }
1470 if let Some(handle) = self.pending_reopen.take() {
1471 handle.abort();
1472 }
1473 }
1474}
1475
1476impl std::fmt::Debug for DeltaLakeSink {
1477 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1478 f.debug_struct("DeltaLakeSink")
1479 .field("state", &self.state)
1480 .field("table_path", &self.config.table_path)
1481 .field("mode", &self.config.write_mode)
1482 .field("guarantee", &self.config.delivery_guarantee)
1483 .field("current_epoch", &self.current_epoch)
1484 .field("last_committed_epoch", &self.last_committed_epoch)
1485 .field("buffered_rows", &self.buffered_rows)
1486 .field("delta_version", &self.delta_version)
1487 .field("epoch_skipped", &self.epoch_skipped)
1488 .finish_non_exhaustive()
1489 }
1490}
1491
1492#[cfg(feature = "delta-lake")]
1500fn jittered_backoff_ms(base_ms: u64) -> u64 {
1501 use rand::RngExt as _;
1502 let factor: f64 = rand::rng().random_range(0.75_f64..=1.25_f64);
1503 #[allow(
1504 clippy::cast_possible_truncation,
1505 clippy::cast_sign_loss,
1506 clippy::cast_precision_loss
1507 )]
1508 let jittered = (base_ms as f64 * factor) as u64;
1509 jittered.max(1)
1510}
1511
1512fn filter_and_project(
1518 batch: &RecordBatch,
1519 mask: Vec<bool>,
1520 col_indices: &[usize],
1521) -> Result<RecordBatch, ConnectorError> {
1522 use arrow_array::BooleanArray;
1523 use arrow_select::filter::filter_record_batch;
1524
1525 let bool_array = BooleanArray::from(mask);
1526
1527 let projected = batch
1528 .project(col_indices)
1529 .map_err(|e| ConnectorError::Internal(format!("batch projection failed: {e}")))?;
1530
1531 filter_record_batch(&projected, &bool_array)
1532 .map_err(|e| ConnectorError::Internal(format!("arrow filter failed: {e}")))
1533}
1534
1535#[cfg(test)]
1536#[allow(clippy::cast_possible_wrap)]
1537#[allow(clippy::cast_precision_loss)]
1538#[allow(clippy::float_cmp)]
1539mod tests {
1540 use super::super::delta_config::DeltaCatalogType;
1541 use super::*;
1542 use arrow_array::{Float64Array, Int64Array, StringArray};
1543 use arrow_schema::{DataType, Field, Schema};
1544
1545 fn test_schema() -> SchemaRef {
1546 Arc::new(Schema::new(vec![
1547 Field::new("id", DataType::Int64, false),
1548 Field::new("name", DataType::Utf8, true),
1549 Field::new("value", DataType::Float64, true),
1550 ]))
1551 }
1552
1553 fn test_config() -> DeltaLakeSinkConfig {
1554 #[cfg(unix)]
1555 let path = "/tmp/delta_test_nonexistent_8f3a";
1556 #[cfg(windows)]
1557 let path = "C:\\delta_test_nonexistent_8f3a";
1558 DeltaLakeSinkConfig::new(path)
1559 }
1560
1561 fn upsert_config() -> DeltaLakeSinkConfig {
1562 let mut cfg = test_config();
1563 cfg.write_mode = DeltaWriteMode::Upsert;
1564 cfg.merge_key_columns = vec!["id".to_string()];
1565 cfg
1566 }
1567
1568 fn test_batch(n: usize) -> RecordBatch {
1569 let ids: Vec<i64> = (0..n as i64).collect();
1570 let names: Vec<&str> = (0..n).map(|_| "test").collect();
1571 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1572
1573 RecordBatch::try_new(
1574 test_schema(),
1575 vec![
1576 Arc::new(Int64Array::from(ids)),
1577 Arc::new(StringArray::from(names)),
1578 Arc::new(Float64Array::from(values)),
1579 ],
1580 )
1581 .unwrap()
1582 }
1583
1584 #[test]
1587 fn test_new_defaults() {
1588 let sink = DeltaLakeSink::new(test_config(), None);
1589 assert_eq!(sink.state(), ConnectorState::Created);
1590 assert_eq!(sink.current_epoch(), 0);
1591 assert_eq!(sink.last_committed_epoch(), 0);
1592 assert_eq!(sink.buffered_rows(), 0);
1593 assert_eq!(sink.buffered_bytes(), 0);
1594 assert_eq!(sink.delta_version(), 0);
1595 assert!(sink.schema.is_none());
1596 }
1597
1598 #[test]
1599 fn test_with_schema() {
1600 let schema = test_schema();
1601 let sink = DeltaLakeSink::with_schema(test_config(), schema.clone());
1602 assert_eq!(sink.schema(), schema);
1603 }
1604
1605 #[test]
1606 fn test_schema_empty_when_none() {
1607 let sink = DeltaLakeSink::new(test_config(), None);
1608 let schema = sink.schema();
1609 assert_eq!(schema.fields().len(), 0);
1610 }
1611
1612 #[cfg(feature = "delta-lake")]
1613 #[test]
1614 fn test_deferred_init_flag_default_false() {
1615 let sink = DeltaLakeSink::new(test_config(), None);
1616 assert!(!sink.needs_deferred_delta_init);
1617 }
1618
1619 fn unity_config() -> DeltaLakeSinkConfig {
1620 let mut config = test_config();
1621 config.catalog_type = DeltaCatalogType::Unity {
1622 workspace_url: "https://test.azuredatabricks.net".to_string(),
1623 access_token: "dapi123".to_string(),
1624 };
1625 config.catalog_name = Some("main".to_string());
1626 config.catalog_schema = Some("default".to_string());
1627 config.catalog_storage_location = Some("abfss://c@acct.dfs.core.windows.net/t".to_string());
1628 config
1629 }
1630
1631 #[cfg(feature = "delta-lake")]
1632 #[tokio::test]
1633 async fn test_open_defers_init_for_unity_no_schema() {
1634 use crate::config::ConnectorConfig;
1635
1636 let config = unity_config();
1637 let mut sink = DeltaLakeSink::new(config, None);
1638
1639 let connector_config = ConnectorConfig::new("delta-lake");
1641 let result = sink.open(&connector_config).await;
1646 assert!(result.is_ok());
1647
1648 assert!(sink.needs_deferred_delta_init);
1650 assert_eq!(sink.state(), ConnectorState::Initializing);
1651 assert!(sink.schema.is_none());
1652 }
1653
1654 #[cfg(feature = "delta-lake")]
1655 #[tokio::test]
1656 async fn test_deferred_init_transitions_to_failed_on_error() {
1657 let mut sink = DeltaLakeSink::new(test_config(), None);
1660 sink.state = ConnectorState::Initializing;
1661 sink.needs_deferred_delta_init = true;
1662 sink.schema = Some(test_schema());
1663
1664 let result = sink.begin_epoch(1).await;
1668 assert!(result.is_err());
1669 assert_eq!(sink.state(), ConnectorState::Failed);
1670 }
1672
1673 #[cfg(feature = "delta-lake")]
1674 #[tokio::test]
1675 async fn test_write_batch_accepts_initializing_state() {
1676 let mut sink = DeltaLakeSink::new(test_config(), None);
1679 sink.state = ConnectorState::Initializing;
1680 sink.needs_deferred_delta_init = true;
1681
1682 let batch = test_batch(5);
1683 let result = sink.write_batch(&batch).await;
1686 assert!(result.is_err());
1687 assert_eq!(sink.state(), ConnectorState::Failed);
1688 assert!(sink.schema.is_some());
1690 }
1691
1692 #[test]
1693 fn test_no_deferred_init_without_catalog_storage_location() {
1694 let mut config = unity_config();
1696 config.catalog_storage_location = None;
1697 let sink = DeltaLakeSink::new(config, None);
1698
1699 let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1700 && sink.config.catalog_storage_location.is_some()
1701 && sink.schema.is_none();
1702 assert!(!should_defer);
1703 }
1704
1705 #[test]
1706 fn test_no_deferred_init_with_schema() {
1707 let config = unity_config();
1709 let sink = DeltaLakeSink::with_schema(config, test_schema());
1710
1711 let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1712 && sink.config.catalog_storage_location.is_some()
1713 && sink.schema.is_none();
1714 assert!(!should_defer);
1715 }
1716
1717 #[test]
1720 fn test_estimate_batch_size() {
1721 let batch = test_batch(100);
1722 let size = DeltaLakeSink::estimate_batch_size(&batch);
1723 assert!(size > 0);
1724 }
1725
1726 #[test]
1727 fn test_estimate_batch_size_empty() {
1728 let batch = RecordBatch::new_empty(test_schema());
1729 let size = DeltaLakeSink::estimate_batch_size(&batch);
1730 assert!(size < 1024);
1733 }
1734
1735 #[test]
1738 fn test_should_flush_by_rows() {
1739 let mut config = test_config();
1740 config.max_buffer_records = 100;
1741 let mut sink = DeltaLakeSink::new(config, None);
1742 sink.buffered_rows = 99;
1743 assert!(!sink.should_flush());
1744 sink.buffered_rows = 100;
1745 assert!(sink.should_flush());
1746 }
1747
1748 #[test]
1749 fn test_should_flush_by_bytes() {
1750 let mut config = test_config();
1751 config.target_file_size = 1000;
1752 let mut sink = DeltaLakeSink::new(config, None);
1753 sink.buffered_bytes = 999;
1754 assert!(!sink.should_flush());
1755 sink.buffered_bytes = 1000;
1756 assert!(sink.should_flush());
1757 }
1758
1759 #[test]
1760 fn test_should_flush_empty() {
1761 let sink = DeltaLakeSink::new(test_config(), None);
1762 assert!(!sink.should_flush());
1763 }
1764
1765 #[tokio::test]
1766 async fn test_exactly_once_buffer_backpressure() {
1767 let mut config = test_config();
1773 config.delivery_guarantee = crate::connector::DeliveryGuarantee::ExactlyOnce;
1774 config.max_buffer_records = 10;
1775 let mut sink = DeltaLakeSink::new(config, None);
1776 sink.state = ConnectorState::Running;
1777
1778 let first = test_batch(50);
1781 sink.write_batch(&first)
1782 .await
1783 .expect("single oversized batch must be admitted");
1784 assert_eq!(sink.buffered_rows(), 50);
1785
1786 let second = test_batch(5);
1789 let err = sink
1790 .write_batch(&second)
1791 .await
1792 .expect_err("should reject once cumulative buffer exceeds cap");
1793 let msg = err.to_string();
1794 assert!(
1795 msg.contains("buffer full"),
1796 "expected backpressure error, got: {msg}"
1797 );
1798 assert_eq!(sink.buffered_rows(), 50);
1800 }
1801
1802 #[tokio::test]
1805 async fn test_write_batch_buffering() {
1806 let mut config = test_config();
1807 config.max_buffer_records = 100;
1808 let mut sink = DeltaLakeSink::new(config, None);
1809 sink.state = ConnectorState::Running;
1810
1811 let batch = test_batch(10);
1812 let result = sink.write_batch(&batch).await.unwrap();
1813
1814 assert_eq!(result.records_written, 0);
1816 assert_eq!(sink.buffered_rows(), 10);
1817 assert!(sink.buffered_bytes() > 0);
1818 }
1819
1820 #[tokio::test]
1821 async fn test_write_batch_empty() {
1822 let mut sink = DeltaLakeSink::new(test_config(), None);
1823 sink.state = ConnectorState::Running;
1824
1825 let batch = test_batch(0);
1826 let result = sink.write_batch(&batch).await.unwrap();
1827 assert_eq!(result.records_written, 0);
1828 assert_eq!(sink.buffered_rows(), 0);
1829 }
1830
1831 #[tokio::test]
1832 async fn test_write_batch_not_running() {
1833 let mut sink = DeltaLakeSink::new(test_config(), None);
1834 let batch = test_batch(10);
1837 let result = sink.write_batch(&batch).await;
1838 assert!(result.is_err());
1839 }
1840
1841 #[tokio::test]
1842 async fn test_write_batch_sets_schema() {
1843 let mut sink = DeltaLakeSink::new(test_config(), None);
1844 sink.state = ConnectorState::Running;
1845 assert!(sink.schema.is_none());
1846
1847 let batch = test_batch(5);
1848 sink.write_batch(&batch).await.unwrap();
1849 assert!(sink.schema.is_some());
1850 assert_eq!(sink.schema.as_ref().unwrap().fields().len(), 3);
1851 }
1852
1853 #[tokio::test]
1854 async fn test_multiple_write_batches_accumulate() {
1855 let mut config = test_config();
1856 config.max_buffer_records = 100;
1857 let mut sink = DeltaLakeSink::new(config, None);
1858 sink.state = ConnectorState::Running;
1859
1860 let batch = test_batch(10);
1861 sink.write_batch(&batch).await.unwrap();
1862 sink.write_batch(&batch).await.unwrap();
1863 sink.write_batch(&batch).await.unwrap();
1864
1865 assert_eq!(sink.buffered_rows(), 30);
1866 }
1867
1868 #[tokio::test]
1872 async fn test_rollback_clears_buffer() {
1873 let mut config = test_config();
1874 config.max_buffer_records = 1000;
1875 let mut sink = DeltaLakeSink::new(config, None);
1876 sink.state = ConnectorState::Running;
1877
1878 let batch = test_batch(50);
1879 sink.write_batch(&batch).await.unwrap();
1880 assert_eq!(sink.buffered_rows(), 50);
1881
1882 sink.rollback_epoch(0).await.unwrap();
1883 assert_eq!(sink.buffered_rows(), 0);
1884 assert_eq!(sink.buffered_bytes(), 0);
1885 }
1886
1887 #[tokio::test]
1890 async fn test_rollback_after_pre_commit_discards_staged() {
1891 let mut config = test_config();
1892 config.max_buffer_records = 1000;
1893 let mut sink = DeltaLakeSink::new(config, None);
1894 sink.state = ConnectorState::Running;
1895
1896 sink.begin_epoch(1).await.unwrap();
1897 let batch = test_batch(50);
1898 sink.write_batch(&batch).await.unwrap();
1899 assert_eq!(sink.buffered_rows(), 50);
1900
1901 sink.pre_commit(1).await.unwrap();
1903 assert_eq!(sink.buffered_rows(), 0);
1904 assert_eq!(sink.staged_rows, 50);
1905 assert!(!sink.staged_batches.is_empty());
1906
1907 sink.rollback_epoch(1).await.unwrap();
1909 assert_eq!(sink.buffered_rows(), 0);
1910 assert_eq!(sink.staged_rows, 0);
1911 assert_eq!(sink.staged_bytes, 0);
1912 assert!(sink.staged_batches.is_empty());
1913 assert_eq!(sink.delta_version(), 0); }
1915
1916 #[tokio::test]
1920 async fn test_staged_data_preserved_until_commit_or_rollback() {
1921 let mut config = test_config();
1922 config.max_buffer_records = 1000;
1923 let mut sink = DeltaLakeSink::new(config, None);
1924 sink.state = ConnectorState::Running;
1925
1926 sink.begin_epoch(1).await.unwrap();
1927 sink.write_batch(&test_batch(25)).await.unwrap();
1928 sink.write_batch(&test_batch(25)).await.unwrap();
1929
1930 sink.pre_commit(1).await.unwrap();
1932 assert_eq!(sink.staged_rows, 50);
1933 assert_eq!(sink.staged_batches.len(), 2);
1934 assert_eq!(sink.buffered_rows(), 0);
1935
1936 sink.rollback_epoch(1).await.unwrap();
1944 assert!(sink.staged_batches.is_empty());
1945 assert_eq!(sink.staged_rows, 0);
1946 assert_eq!(sink.staged_bytes, 0);
1947 }
1948
1949 #[tokio::test]
1950 async fn test_commit_empty_epoch() {
1951 let mut sink = DeltaLakeSink::new(test_config(), None);
1952 sink.state = ConnectorState::Running;
1953
1954 sink.begin_epoch(1).await.unwrap();
1955 sink.commit_epoch(1).await.unwrap();
1957 assert_eq!(sink.last_committed_epoch(), 1);
1958 assert_eq!(sink.delta_version(), 0); }
1960
1961 #[tokio::test]
1965 async fn test_flush_coalesces_buffer() {
1966 let mut config = test_config();
1967 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1968 config.writer_id = "test-writer".to_string();
1969 let mut sink = DeltaLakeSink::new(config, None);
1970 sink.state = ConnectorState::Running;
1971
1972 let batch = test_batch(10);
1973 sink.write_batch(&batch).await.unwrap();
1974 sink.write_batch(&batch).await.unwrap();
1975 assert_eq!(sink.buffer.len(), 2);
1976
1977 sink.flush().await.unwrap();
1979 assert_eq!(sink.buffer.len(), 1);
1980 assert_eq!(sink.buffered_rows(), 20);
1981 }
1982
1983 #[tokio::test]
1989 async fn test_close() {
1990 let mut sink = DeltaLakeSink::new(test_config(), None);
1991 sink.state = ConnectorState::Running;
1992
1993 sink.close().await.unwrap();
1994 assert_eq!(sink.state(), ConnectorState::Closed);
1995 }
1996
1997 #[test]
2000 fn test_capabilities_append_exactly_once() {
2001 let mut config = test_config();
2002 config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2003 let sink = DeltaLakeSink::new(config, None);
2004 let caps = sink.capabilities();
2005 assert!(caps.exactly_once);
2006 assert!(caps.idempotent);
2007 assert!(!caps.upsert);
2008 assert!(!caps.changelog);
2009 assert!(!caps.schema_evolution);
2010 assert!(!caps.partitioned);
2011 }
2012
2013 #[test]
2014 fn test_capabilities_upsert() {
2015 let sink = DeltaLakeSink::new(upsert_config(), None);
2016 let caps = sink.capabilities();
2017 assert!(caps.upsert);
2018 assert!(caps.changelog);
2019 assert!(caps.idempotent);
2020 }
2021
2022 #[test]
2023 fn test_capabilities_schema_evolution() {
2024 let mut config = test_config();
2025 config.schema_evolution = true;
2026 let sink = DeltaLakeSink::new(config, None);
2027 let caps = sink.capabilities();
2028 assert!(caps.schema_evolution);
2029 }
2030
2031 #[test]
2032 fn test_capabilities_partitioned() {
2033 let mut config = test_config();
2034 config.partition_columns = vec!["trade_date".to_string()];
2035 let sink = DeltaLakeSink::new(config, None);
2036 let caps = sink.capabilities();
2037 assert!(caps.partitioned);
2038 }
2039
2040 #[test]
2041 fn test_capabilities_at_least_once() {
2042 let mut config = test_config();
2043 config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
2044 let sink = DeltaLakeSink::new(config, None);
2045 let caps = sink.capabilities();
2046 assert!(!caps.exactly_once);
2047 assert!(caps.idempotent);
2048 }
2049
2050 fn changelog_schema() -> SchemaRef {
2053 Arc::new(Schema::new(vec![
2054 Field::new("id", DataType::Int64, false),
2055 Field::new("name", DataType::Utf8, true),
2056 Field::new("_op", DataType::Utf8, false),
2057 Field::new("_ts_ms", DataType::Int64, false),
2058 ]))
2059 }
2060
2061 fn changelog_batch() -> RecordBatch {
2062 RecordBatch::try_new(
2063 changelog_schema(),
2064 vec![
2065 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
2066 Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
2067 Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
2068 Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
2069 ],
2070 )
2071 .unwrap()
2072 }
2073
2074 #[test]
2075 fn test_split_changelog_batch() {
2076 let batch = changelog_batch();
2077 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2078
2079 assert_eq!(inserts.num_rows(), 3);
2081 assert_eq!(deletes.num_rows(), 2);
2083
2084 assert_eq!(inserts.num_columns(), 2); assert_eq!(deletes.num_columns(), 2);
2087
2088 let insert_ids = inserts
2090 .column(0)
2091 .as_any()
2092 .downcast_ref::<Int64Array>()
2093 .unwrap();
2094 assert_eq!(insert_ids.value(0), 1);
2095 assert_eq!(insert_ids.value(1), 2);
2096 assert_eq!(insert_ids.value(2), 4);
2097
2098 let delete_ids = deletes
2100 .column(0)
2101 .as_any()
2102 .downcast_ref::<Int64Array>()
2103 .unwrap();
2104 assert_eq!(delete_ids.value(0), 3);
2105 assert_eq!(delete_ids.value(1), 5);
2106 }
2107
2108 #[test]
2109 fn test_split_changelog_all_inserts() {
2110 let schema = changelog_schema();
2111 let batch = RecordBatch::try_new(
2112 schema,
2113 vec![
2114 Arc::new(Int64Array::from(vec![1, 2])),
2115 Arc::new(StringArray::from(vec!["a", "b"])),
2116 Arc::new(StringArray::from(vec!["I", "I"])),
2117 Arc::new(Int64Array::from(vec![100, 200])),
2118 ],
2119 )
2120 .unwrap();
2121
2122 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2123 assert_eq!(inserts.num_rows(), 2);
2124 assert_eq!(deletes.num_rows(), 0);
2125 }
2126
2127 #[test]
2128 fn test_split_changelog_all_deletes() {
2129 let schema = changelog_schema();
2130 let batch = RecordBatch::try_new(
2131 schema,
2132 vec![
2133 Arc::new(Int64Array::from(vec![1, 2])),
2134 Arc::new(StringArray::from(vec!["a", "b"])),
2135 Arc::new(StringArray::from(vec!["D", "D"])),
2136 Arc::new(Int64Array::from(vec![100, 200])),
2137 ],
2138 )
2139 .unwrap();
2140
2141 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2142 assert_eq!(inserts.num_rows(), 0);
2143 assert_eq!(deletes.num_rows(), 2);
2144 }
2145
2146 #[test]
2147 fn test_split_changelog_missing_op_column() {
2148 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
2149 let batch =
2150 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
2151
2152 let result = DeltaLakeSink::split_changelog_batch(&batch);
2153 assert!(result.is_err());
2154 }
2155
2156 #[test]
2157 fn test_split_changelog_snapshot_read() {
2158 let schema = changelog_schema();
2159 let batch = RecordBatch::try_new(
2160 schema,
2161 vec![
2162 Arc::new(Int64Array::from(vec![1])),
2163 Arc::new(StringArray::from(vec!["a"])),
2164 Arc::new(StringArray::from(vec!["r"])), Arc::new(Int64Array::from(vec![100])),
2166 ],
2167 )
2168 .unwrap();
2169
2170 let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2171 assert_eq!(inserts.num_rows(), 1);
2172 assert_eq!(deletes.num_rows(), 0);
2173 }
2174
2175 #[test]
2178 fn test_debug_output() {
2179 let sink = DeltaLakeSink::new(test_config(), None);
2180 let debug = format!("{sink:?}");
2181 assert!(debug.contains("DeltaLakeSink"));
2182 assert!(debug.contains("delta_test_nonexistent_8f3a"));
2183 }
2184
2185 #[cfg(feature = "delta-lake")]
2190 fn zset_changelog(rows: &[(&str, i64, i64)]) -> RecordBatch {
2191 let schema = Arc::new(Schema::new(vec![
2192 Field::new("region", DataType::Utf8, false),
2193 Field::new("total", DataType::Int64, false),
2194 Field::new(
2195 laminar_core::changelog::WEIGHT_COLUMN,
2196 DataType::Int64,
2197 false,
2198 ),
2199 ]));
2200 RecordBatch::try_new(
2201 schema,
2202 vec![
2203 Arc::new(StringArray::from(
2204 rows.iter().map(|r| r.0).collect::<Vec<_>>(),
2205 )),
2206 Arc::new(Int64Array::from(
2207 rows.iter().map(|r| r.1).collect::<Vec<_>>(),
2208 )),
2209 Arc::new(Int64Array::from(
2210 rows.iter().map(|r| r.2).collect::<Vec<_>>(),
2211 )),
2212 ],
2213 )
2214 .unwrap()
2215 }
2216
2217 #[cfg(feature = "delta-lake")]
2219 async fn run_epoch(sink: &mut DeltaLakeSink, epoch: u64, batch: &RecordBatch) {
2220 sink.begin_epoch(epoch).await.unwrap();
2221 sink.write_batch(batch).await.unwrap();
2222 sink.pre_commit(epoch).await.unwrap();
2223 sink.commit_epoch(epoch).await.unwrap();
2224 }
2225
2226 #[cfg(feature = "delta-lake")]
2228 async fn read_regions(path: &str) -> Vec<(String, i64)> {
2229 let ctx = datafusion::prelude::SessionContext::new();
2230 crate::lakehouse::delta_table_provider::register_delta_table(
2231 &ctx,
2232 "t",
2233 path,
2234 std::collections::HashMap::new(),
2235 )
2236 .await
2237 .unwrap();
2238 let batches = ctx
2239 .sql("SELECT region, total FROM t")
2240 .await
2241 .unwrap()
2242 .collect()
2243 .await
2244 .unwrap();
2245 let mut out = Vec::new();
2246 for b in &batches {
2247 let region_arr = arrow_cast::cast(
2250 b.column(b.schema().index_of("region").unwrap()),
2251 &DataType::Utf8,
2252 )
2253 .unwrap();
2254 let total_arr = arrow_cast::cast(
2255 b.column(b.schema().index_of("total").unwrap()),
2256 &DataType::Int64,
2257 )
2258 .unwrap();
2259 let regions = region_arr.as_any().downcast_ref::<StringArray>().unwrap();
2260 let totals = total_arr.as_any().downcast_ref::<Int64Array>().unwrap();
2261 for i in 0..b.num_rows() {
2262 out.push((regions.value(i).to_string(), totals.value(i)));
2263 }
2264 }
2265 out.sort();
2266 out
2267 }
2268
2269 #[cfg(feature = "delta-lake")]
2275 #[tokio::test]
2276 async fn upsert_collapses_aggregating_mv_to_current_state() {
2277 let dir = tempfile::tempdir().unwrap();
2278 let table_dir = dir.path().join("agg");
2279 std::fs::create_dir_all(&table_dir).unwrap();
2281 let path = table_dir.to_string_lossy().to_string();
2282
2283 let mut cfg = DeltaLakeSinkConfig::new(&path);
2284 cfg.write_mode = DeltaWriteMode::Upsert;
2285 cfg.merge_key_columns = vec!["region".to_string()];
2286 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2289
2290 let mut sink = DeltaLakeSink::new(cfg, None);
2294 sink.open(&ConnectorConfig::new("delta-lake"))
2295 .await
2296 .unwrap();
2297
2298 run_epoch(
2300 &mut sink,
2301 1,
2302 &zset_changelog(&[("east", 10, 1), ("west", 5, 1)]),
2303 )
2304 .await;
2305 assert_eq!(
2306 read_regions(&path).await,
2307 vec![("east".into(), 10), ("west".into(), 5)]
2308 );
2309
2310 run_epoch(
2312 &mut sink,
2313 2,
2314 &zset_changelog(&[
2315 ("east", 10, -1),
2316 ("east", 30, 1),
2317 ("west", 5, -1),
2318 ("north", 7, 1),
2319 ]),
2320 )
2321 .await;
2322 assert_eq!(
2323 read_regions(&path).await,
2324 vec![("east".into(), 30), ("north".into(), 7)]
2325 );
2326
2327 run_epoch(
2331 &mut sink,
2332 3,
2333 &zset_changelog(&[
2334 ("east", 30, -1),
2335 ("east", 40, 1),
2336 ("east", 40, -1),
2337 ("east", 55, 1),
2338 ]),
2339 )
2340 .await;
2341 assert_eq!(
2342 read_regions(&path).await,
2343 vec![("east".into(), 55), ("north".into(), 7)]
2344 );
2345
2346 assert!(sink.schema.as_ref().unwrap().index_of("__weight").is_err());
2348
2349 let m = sink.sink_metrics();
2351 assert_eq!(m.collapse_rows_in.get(), 10);
2352 assert!(m.collapse_deletes_out.get() >= 1, "west was dropped");
2353 assert!(m.collapse_upserts_out.get() >= 4);
2354
2355 sink.close().await.unwrap();
2356 }
2357
2358 #[cfg(feature = "delta-lake")]
2362 #[tokio::test]
2363 async fn upsert_replay_of_committed_epoch_is_idempotent() {
2364 let dir = tempfile::tempdir().unwrap();
2365 let table_dir = dir.path().join("agg_replay");
2366 std::fs::create_dir_all(&table_dir).unwrap();
2367 let path = table_dir.to_string_lossy().to_string();
2368
2369 let mk_cfg = || {
2370 let mut cfg = DeltaLakeSinkConfig::new(&path);
2371 cfg.write_mode = DeltaWriteMode::Upsert;
2372 cfg.merge_key_columns = vec!["region".to_string()];
2373 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2374 cfg.writer_id = "replay-writer".to_string();
2377 cfg
2378 };
2379 let epoch2 = || {
2380 zset_changelog(&[
2381 ("east", 10, -1),
2382 ("east", 30, 1),
2383 ("west", 5, -1),
2384 ("north", 7, 1),
2385 ])
2386 };
2387
2388 let mut sink_a = DeltaLakeSink::new(mk_cfg(), None);
2390 sink_a
2391 .open(&ConnectorConfig::new("delta-lake"))
2392 .await
2393 .unwrap();
2394 run_epoch(
2395 &mut sink_a,
2396 1,
2397 &zset_changelog(&[("east", 10, 1), ("west", 5, 1)]),
2398 )
2399 .await;
2400 run_epoch(&mut sink_a, 2, &epoch2()).await;
2401 let expected = vec![("east".to_string(), 30), ("north".to_string(), 7)];
2402 assert_eq!(read_regions(&path).await, expected);
2403 let version_after_2 = sink_a.delta_version();
2404 sink_a.close().await.unwrap();
2405
2406 let mut sink_b = DeltaLakeSink::new(mk_cfg(), None);
2408 sink_b
2409 .open(&ConnectorConfig::new("delta-lake"))
2410 .await
2411 .unwrap();
2412 assert_eq!(
2413 sink_b.last_committed_epoch(),
2414 2,
2415 "epoch must be recovered from Delta txn metadata"
2416 );
2417
2418 sink_b.begin_epoch(2).await.unwrap();
2420 sink_b.write_batch(&epoch2()).await.unwrap();
2421 sink_b.pre_commit(2).await.unwrap();
2422 sink_b.commit_epoch(2).await.unwrap();
2423
2424 assert_eq!(
2425 read_regions(&path).await,
2426 expected,
2427 "replay must not change state"
2428 );
2429 assert_eq!(
2430 sink_b.delta_version(),
2431 version_after_2,
2432 "replayed epoch must not create a new Delta version"
2433 );
2434
2435 run_epoch(
2437 &mut sink_b,
2438 3,
2439 &zset_changelog(&[("east", 30, -1), ("east", 55, 1)]),
2440 )
2441 .await;
2442 assert_eq!(
2443 read_regions(&path).await,
2444 vec![("east".to_string(), 55), ("north".to_string(), 7)]
2445 );
2446
2447 sink_b.close().await.unwrap();
2448 }
2449}