Skip to main content

laminar_connectors/lakehouse/
delta.rs

1//! Delta Lake sink connector implementation.
2//!
3//! [`DeltaLakeSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to Delta Lake tables with ACID transactions and at-least-once delivery
5//! (exactly-once opt-in via `delivery.guarantee = 'exactly-once'`).
6//!
7//! # Write Strategies
8//!
9//! - **Append mode**: Arrow-to-Parquet zero-copy writes for immutable streams
10//! - **Overwrite mode**: Replace partition contents for recomputation
11//! - **Upsert mode**: CDC MERGE via Z-set changelog integration
12//!
13//! Exactly-once semantics use epoch-to-Delta-version mapping: each `LaminarDB`
14//! epoch maps to exactly one Delta Lake transaction via `txn` application
15//! transaction metadata in the Delta log.
16//!
17//! # Ring Architecture
18//!
19//! - **Ring 0**: No sink code. Data arrives via SPSC channel (~5ns push).
20//! - **Ring 1**: Batch buffering, Parquet writes, Delta log commits.
21//! - **Ring 2**: Schema management, configuration, health checks.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::{Array, RecordBatch};
27use arrow_schema::SchemaRef;
28use async_trait::async_trait;
29use tracing::{debug, info, warn};
30
31#[cfg(feature = "delta-lake")]
32use deltalake::DeltaTable;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::protocol::SaveMode;
36
37use crate::config::{ConnectorConfig, ConnectorState};
38use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
39use crate::error::ConnectorError;
40
41use super::delta_config::{DeltaLakeSinkConfig, DeltaWriteMode};
42use super::delta_metrics::DeltaLakeSinkMetrics;
43use crate::connector::DeliveryGuarantee;
44
45/// Counts `(upserts, deletes)` in a collapsed changelog batch's `_op` column.
46/// A row is a delete iff `_op == "D"`; everything else (including a missing or
47/// null op) counts as an upsert. Used only for collapse observability.
48#[cfg(feature = "delta-lake")]
49fn count_collapsed_ops(batch: &RecordBatch) -> (u64, u64) {
50    let Ok(idx) = batch.schema().index_of("_op") else {
51        return (0, 0);
52    };
53    let Some(ops) = batch
54        .column(idx)
55        .as_any()
56        .downcast_ref::<arrow_array::StringArray>()
57    else {
58        return (0, 0);
59    };
60    let deletes = (0..ops.len())
61        .filter(|&i| !ops.is_null(i) && ops.value(i) == "D")
62        .count() as u64;
63    let upserts = ops.len() as u64 - deletes;
64    (upserts, deletes)
65}
66
67/// Delta Lake sink connector.
68///
69/// Writes Arrow `RecordBatch` to Delta Lake tables with ACID transactions,
70/// at-least-once delivery (exactly-once opt-in), partitioning, and
71/// background compaction.
72///
73/// # Lifecycle
74///
75/// ```text
76/// new() -> open() -> [begin_epoch() -> write_batch()* -> commit_epoch()] -> close()
77///                          |                                    |
78///                          +--- rollback_epoch() (on failure) --+
79/// ```
80///
81/// # Exactly-Once Semantics
82///
83/// Each `LaminarDB` epoch maps to exactly one Delta Lake transaction (version).
84/// On recovery, the sink checks `_delta_log/` for the last committed epoch
85/// via `txn` (application transaction) metadata. If an epoch was already
86/// committed, it is skipped (idempotent commit).
87///
88/// # 2PC Protocol
89///
90/// `pre_commit()` coalesces the buffer but does NOT write to Delta.
91/// `commit_epoch()` performs the actual Delta write+commit atomically.
92/// `rollback_epoch()` discards the staged buffer without side effects.
93/// This ensures that a rolled-back epoch never leaves data in Delta.
94pub struct DeltaLakeSink {
95    /// Sink configuration.
96    config: DeltaLakeSinkConfig,
97    /// Arrow schema for input batches (set on first write or from existing table).
98    schema: Option<SchemaRef>,
99    /// Connector lifecycle state.
100    state: ConnectorState,
101    /// Current epoch being written.
102    current_epoch: u64,
103    /// Last successfully committed epoch.
104    last_committed_epoch: u64,
105    /// `RecordBatch` buffer for the current epoch.
106    buffer: Vec<RecordBatch>,
107    /// Total rows buffered in current epoch.
108    buffered_rows: usize,
109    /// Total bytes buffered (estimated) in current epoch.
110    buffered_bytes: u64,
111    /// Current Delta Lake table version.
112    delta_version: u64,
113    /// Time when the current buffer started accumulating.
114    buffer_start_time: Option<Instant>,
115    /// Sink metrics.
116    metrics: DeltaLakeSinkMetrics,
117    /// Delta Lake table handle (present when `delta-lake` feature is enabled).
118    #[cfg(feature = "delta-lake")]
119    table: Option<DeltaTable>,
120    /// Whether the current epoch was skipped (already committed).
121    epoch_skipped: bool,
122    /// Staged batches ready for commit (populated by `pre_commit()`, consumed
123    /// by `commit_epoch()`). This separation ensures `rollback_epoch()` can
124    /// discard prepared data without leaving orphan files in Delta.
125    staged_batches: Vec<RecordBatch>,
126    /// Rows staged for commit (mirrors `staged_batches`).
127    staged_rows: usize,
128    /// Estimated bytes staged for commit.
129    staged_bytes: u64,
130    /// Resolved table path after catalog lookup (may differ from `config.table_path`
131    /// when using Unity/Glue catalogs). Used by `reopen_table()` so retries
132    /// target the same resolved path that `open()` connected to.
133    #[cfg(feature = "delta-lake")]
134    resolved_table_path: String,
135    /// Resolved storage options after catalog lookup.
136    #[cfg(feature = "delta-lake")]
137    resolved_storage_options: std::collections::HashMap<String, String>,
138    /// Cancellation token for the background compaction task.
139    #[cfg(feature = "delta-lake")]
140    compaction_cancel: Option<tokio_util::sync::CancellationToken>,
141    /// Handle for the background compaction task.
142    #[cfg(feature = "delta-lake")]
143    compaction_handle: Option<tokio::task::JoinHandle<()>>,
144    /// When true, Delta table init is deferred until the first `write_batch()`
145    /// provides a schema. This happens when Unity Catalog auto-create is
146    /// configured but the pipeline schema is not yet available at `open()` time.
147    #[cfg(feature = "delta-lake")]
148    needs_deferred_delta_init: bool,
149    /// Background reopen kicked off after a checkpoint-boundary drop, so
150    /// the next flush doesn't pay the table-load cost on the commit path.
151    #[cfg(feature = "delta-lake")]
152    pending_reopen: Option<tokio::task::JoinHandle<Result<DeltaTable, ConnectorError>>>,
153    /// Pre-built Parquet writer properties for hot-path writes. Built once
154    /// in `init_delta_table()` from `config.parquet`; cloning this is far
155    /// cheaper than rebuilding (string parsing, bloom-filter column setup)
156    /// from scratch on every commit.
157    #[cfg(feature = "delta-lake")]
158    cached_writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
159    /// Shared `DataFusion` session for upsert/merge operations. Creating a
160    /// fresh `SessionContext` per merge allocated a runtime env, memory
161    /// pool, and object-store registry each commit; reusing one flattens
162    /// allocator churn under steady-state upsert load.
163    #[cfg(feature = "delta-lake")]
164    merge_session: Option<datafusion::prelude::SessionContext>,
165}
166
167impl DeltaLakeSink {
168    /// Creates a new Delta Lake sink with the given configuration.
169    #[must_use]
170    pub fn new(config: DeltaLakeSinkConfig, registry: Option<&prometheus::Registry>) -> Self {
171        Self {
172            config,
173            schema: None,
174            state: ConnectorState::Created,
175            current_epoch: 0,
176            last_committed_epoch: 0,
177            buffer: Vec::with_capacity(16),
178            buffered_rows: 0,
179            buffered_bytes: 0,
180            delta_version: 0,
181            buffer_start_time: None,
182            metrics: DeltaLakeSinkMetrics::new(registry),
183            epoch_skipped: false,
184            staged_batches: Vec::new(),
185            staged_rows: 0,
186            staged_bytes: 0,
187            #[cfg(feature = "delta-lake")]
188            table: None,
189            #[cfg(feature = "delta-lake")]
190            resolved_table_path: String::new(),
191            #[cfg(feature = "delta-lake")]
192            resolved_storage_options: std::collections::HashMap::new(),
193            #[cfg(feature = "delta-lake")]
194            compaction_cancel: None,
195            #[cfg(feature = "delta-lake")]
196            compaction_handle: None,
197            #[cfg(feature = "delta-lake")]
198            needs_deferred_delta_init: false,
199            #[cfg(feature = "delta-lake")]
200            pending_reopen: None,
201            #[cfg(feature = "delta-lake")]
202            cached_writer_properties: None,
203            #[cfg(feature = "delta-lake")]
204            merge_session: None,
205        }
206    }
207
208    /// Creates a new Delta Lake sink with an explicit schema.
209    ///
210    /// In upsert mode the changelog metadata columns (`_op`, `_ts_ms`,
211    /// `__weight`) are stripped, matching the deferred first-write path, so the
212    /// target table holds only user data regardless of how the schema arrives.
213    #[must_use]
214    pub fn with_schema(config: DeltaLakeSinkConfig, schema: SchemaRef) -> Self {
215        let write_mode = config.write_mode;
216        let mut sink = Self::new(config, None);
217        sink.schema = Some(if write_mode == DeltaWriteMode::Upsert {
218            Self::target_schema(&schema, write_mode)
219        } else {
220            schema
221        });
222        sink
223    }
224
225    /// Initializes the Delta table: auto-creates in Unity Catalog if needed,
226    /// resolves the catalog path, opens or creates the Delta table, and spawns
227    /// the compaction loop. Called from `open()` or deferred to the first
228    /// `write_batch()` when the schema is not yet available at open time.
229    #[cfg(feature = "delta-lake")]
230    async fn init_delta_table(&mut self) -> Result<(), ConnectorError> {
231        use super::delta_io;
232
233        // For uc:// tables, pre-create in Unity Catalog if needed.
234        // Must run before resolve_catalog_options which calls GET on the table.
235        #[cfg(feature = "delta-lake-unity")]
236        ensure_uc_table_exists(&self.config, self.schema.as_ref()).await?;
237
238        // Resolve catalog path: for Unity this calls GET to get the
239        // storage_location, bypassing delta-rs credential vending.
240        let (resolved_path, mut merged_options) = delta_io::resolve_catalog_options(
241            &self.config.catalog_type,
242            self.config.catalog_database.as_deref(),
243            self.config.catalog_name.as_deref(),
244            self.config.catalog_schema.as_deref(),
245            &self.config.table_path,
246            &self.config.storage_options,
247        )
248        .await?;
249
250        // Inject default connection timeouts if not explicitly set.
251        // Azure load balancers close idle connections after ~4 minutes.
252        // Without these, a stale connection causes writes to hang forever.
253        merged_options
254            .entry("timeout".to_string())
255            .or_insert_with(|| "120s".to_string());
256        merged_options
257            .entry("connect_timeout".to_string())
258            .or_insert_with(|| "30s".to_string());
259        merged_options
260            .entry("pool_idle_timeout".to_string())
261            .or_insert_with(|| "60s".to_string());
262
263        // Persist resolved values for reopen_table() on conflict retry.
264        self.resolved_table_path.clone_from(&resolved_path);
265        self.resolved_storage_options.clone_from(&merged_options);
266
267        let init_timeout = self
268            .config
269            .write_timeout
270            .max(std::time::Duration::from_secs(120));
271        let table = tokio::time::timeout(
272            init_timeout,
273            delta_io::open_or_create_table(
274                &resolved_path,
275                merged_options.clone(),
276                self.schema.as_ref(),
277            ),
278        )
279        .await
280        .map_err(|_| {
281            ConnectorError::ConnectionFailed(format!(
282                "Delta table init timed out after {}s",
283                init_timeout.as_secs()
284            ))
285        })??;
286
287        // Read schema from existing table if we don't have one.
288        if self.schema.is_none() {
289            if let Ok(schema) = delta_io::get_table_schema(&table) {
290                self.schema = Some(schema);
291            }
292        }
293
294        // Resolve last committed epoch for exactly-once recovery.
295        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
296            self.last_committed_epoch =
297                delta_io::get_last_committed_epoch(&table, &self.config.writer_id).await;
298            if self.last_committed_epoch > 0 {
299                info!(
300                    writer_id = %self.config.writer_id,
301                    last_committed_epoch = self.last_committed_epoch,
302                    "recovered last committed epoch from Delta Lake txn metadata"
303                );
304            }
305        }
306
307        // Store the Delta version.
308        #[allow(clippy::cast_sign_loss)]
309        {
310            self.delta_version = table.version().unwrap_or(0) as u64;
311        }
312        self.table = Some(table);
313
314        // Pre-build caches used on every commit. Rebuilding WriterProperties
315        // from config strings per commit was pure churn (HashMap<ColumnPath>
316        // cloned for bloom filters, string lowercase allocs); a cached value
317        // clones cheaply. Similarly, a shared SessionContext avoids
318        // allocating a new RuntimeEnv + MemoryPool + ObjectStoreRegistry
319        // per merge — a significant source of allocator fragmentation on
320        // long-running upsert streams.
321        self.cached_writer_properties = self.config.parquet.to_writer_properties().ok();
322        if self.config.write_mode == DeltaWriteMode::Upsert {
323            self.merge_session = Some(datafusion::prelude::SessionContext::new());
324        }
325
326        // Spawn background compaction task if enabled. Pre-build the
327        // compaction writer properties once; the loop clones per tick
328        // instead of re-parsing config strings.
329        if self.config.compaction.enabled {
330            let cancel = tokio_util::sync::CancellationToken::new();
331            let compaction_props = self.config.parquet.compaction_writer_properties().ok();
332            let handle = tokio::spawn(compaction_loop(
333                resolved_path.clone(),
334                Arc::new(merged_options),
335                self.config.compaction.clone(),
336                self.config.vacuum_retention,
337                compaction_props,
338                cancel.clone(),
339            ));
340            self.compaction_cancel = Some(cancel);
341            self.compaction_handle = Some(handle);
342        }
343
344        Ok(())
345    }
346
347    /// Returns the current connector state.
348    #[must_use]
349    pub fn state(&self) -> ConnectorState {
350        self.state
351    }
352
353    /// Returns the current epoch.
354    #[must_use]
355    pub fn current_epoch(&self) -> u64 {
356        self.current_epoch
357    }
358
359    /// Returns the last committed epoch.
360    #[must_use]
361    pub fn last_committed_epoch(&self) -> u64 {
362        self.last_committed_epoch
363    }
364
365    /// Returns the number of buffered rows pending flush.
366    #[must_use]
367    pub fn buffered_rows(&self) -> usize {
368        self.buffered_rows
369    }
370
371    /// Returns the estimated buffered bytes.
372    #[must_use]
373    pub fn buffered_bytes(&self) -> u64 {
374        self.buffered_bytes
375    }
376
377    /// Returns the current Delta Lake table version.
378    #[must_use]
379    pub fn delta_version(&self) -> u64 {
380        self.delta_version
381    }
382
383    /// Returns a reference to the sink metrics.
384    #[must_use]
385    pub fn sink_metrics(&self) -> &DeltaLakeSinkMetrics {
386        &self.metrics
387    }
388
389    /// Returns the sink configuration.
390    #[must_use]
391    pub fn config(&self) -> &DeltaLakeSinkConfig {
392        &self.config
393    }
394
395    /// Checks if a buffer flush is needed based on size or time thresholds.
396    #[must_use]
397    pub fn should_flush(&self) -> bool {
398        if self.buffered_rows >= self.config.max_buffer_records {
399            return true;
400        }
401        if self.buffered_bytes >= self.config.target_file_size as u64 {
402            return true;
403        }
404        if let Some(start) = self.buffer_start_time {
405            if start.elapsed() >= self.config.max_buffer_duration {
406                return true;
407            }
408        }
409        false
410    }
411
412    /// Changelog metadata columns stripped from the target Delta table schema
413    /// in upsert mode, so the table holds only user data — not the CDC `_op`/
414    /// `_ts_ms` envelope or the Z-set `__weight` column. `collapse_changelog`
415    /// strips the same columns from the MERGE source, keeping the two in sync.
416    const CHANGELOG_METADATA_COLUMNS: &'static [&'static str] =
417        &["_op", "_ts_ms", laminar_core::changelog::WEIGHT_COLUMN];
418
419    fn target_schema(batch_schema: &SchemaRef, write_mode: DeltaWriteMode) -> SchemaRef {
420        if write_mode == DeltaWriteMode::Upsert {
421            let fields: Vec<_> = batch_schema
422                .fields()
423                .iter()
424                .filter(|f| !Self::CHANGELOG_METADATA_COLUMNS.contains(&f.name().as_str()))
425                .cloned()
426                .collect();
427            Arc::new(arrow_schema::Schema::new(fields))
428        } else {
429            batch_schema.clone()
430        }
431    }
432
433    /// Estimates the byte size of a `RecordBatch`.
434    #[must_use]
435    pub fn estimate_batch_size(batch: &RecordBatch) -> u64 {
436        batch
437            .columns()
438            .iter()
439            .map(|col| col.get_array_memory_size() as u64)
440            .sum()
441    }
442
443    /// Returns `true` if the error is a Delta Lake optimistic concurrency
444    /// conflict (retryable). Matches specific delta-rs conflict indicators
445    /// only — not generic "transaction" mentions.
446    #[cfg(feature = "delta-lake")]
447    fn is_conflict_error(err: &ConnectorError) -> bool {
448        let msg = err.to_string().to_lowercase();
449        msg.contains("conflicting commit")
450            || msg.contains("version already exists")
451            || msg.contains("concurrent")
452            || (msg.contains("conflict") && !msg.contains("log") && !msg.contains("corrupt"))
453    }
454
455    /// Re-opens the Delta Lake table after a conflict error destroys the handle.
456    /// Uses the resolved path/options from `open()`, not `config.*`, so that
457    /// catalog-resolved paths (Unity/Glue) are used correctly.
458    #[cfg(feature = "delta-lake")]
459    async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
460        use super::delta_io;
461
462        let table = delta_io::open_or_create_table(
463            &self.resolved_table_path,
464            self.resolved_storage_options.clone(),
465            self.schema.as_ref(),
466        )
467        .await?;
468
469        #[allow(clippy::cast_sign_loss)]
470        {
471            self.delta_version = table.version().unwrap_or(0) as u64;
472        }
473        self.table = Some(table);
474        Ok(())
475    }
476
477    /// Spawn a background reopen of the Delta table so the next flush
478    /// doesn't pay the load cost on its hot path. An already-in-flight
479    /// reopen is aborted — the fresher one supersedes it.
480    #[cfg(feature = "delta-lake")]
481    fn schedule_background_reopen(&mut self) {
482        if let Some(prev) = self.pending_reopen.take() {
483            prev.abort();
484        }
485        let path = self.resolved_table_path.clone();
486        let opts = self.resolved_storage_options.clone();
487        let schema = self.schema.clone();
488        self.pending_reopen = Some(tokio::spawn(async move {
489            super::delta_io::open_or_create_table(&path, opts, schema.as_ref()).await
490        }));
491    }
492
493    /// Install a previously-scheduled background reopen. Returns `false`
494    /// on miss — no pending reopen, task failure, or timeout — so callers
495    /// fall through to the synchronous `reopen_table()` path.
496    #[cfg(feature = "delta-lake")]
497    async fn try_install_pending_reopen(&mut self, timeout: std::time::Duration) -> bool {
498        let Some(mut pending) = self.pending_reopen.take() else {
499            return false;
500        };
501        let table = match tokio::time::timeout(timeout, &mut pending).await {
502            Ok(Ok(Ok(t))) => t,
503            Ok(Ok(Err(e))) => {
504                warn!(error = %e, "Delta background reopen failed");
505                return false;
506            }
507            Ok(Err(e)) => {
508                warn!(error = %e, "Delta background reopen task ended unexpectedly");
509                return false;
510            }
511            Err(_) => {
512                warn!(
513                    timeout_secs = timeout.as_secs(),
514                    "Delta background reopen timed out"
515                );
516                pending.abort();
517                return false;
518            }
519        };
520        #[allow(clippy::cast_sign_loss)]
521        {
522            self.delta_version = table.version().unwrap_or(0) as u64;
523        }
524        self.table = Some(table);
525        true
526    }
527
528    /// Attempts a single Delta write/merge and returns the updated table on success.
529    #[cfg(feature = "delta-lake")]
530    async fn attempt_delta_write(
531        &mut self,
532        table: DeltaTable,
533    ) -> Result<DeltaTable, ConnectorError> {
534        // Clone batches for the write — staged_batches is only cleared on
535        // success. RecordBatch::clone is Arc-bump only (~16-48ns per batch).
536        let batches: Vec<RecordBatch> = self.staged_batches.clone();
537
538        if self.config.write_mode == DeltaWriteMode::Upsert {
539            // ── Upsert/Merge path ──
540            // flush_staged_to_delta pre-concats for upsert so retries don't
541            // pay a full O(rows × cols) copy each attempt. Handle len > 1
542            // defensively in case a future caller skips the pre-concat.
543            let combined = if batches.len() == 1 {
544                batches.into_iter().next().expect("len == 1 checked")
545            } else {
546                match arrow_select::concat::concat_batches(&batches[0].schema(), &batches) {
547                    Ok(c) => c,
548                    Err(e) => {
549                        // Concat is a local op — restore table and propagate.
550                        self.table = Some(table);
551                        return Err(ConnectorError::Internal(format!(
552                            "failed to concat batches: {e}"
553                        )));
554                    }
555                }
556            };
557
558            // Cached in init_delta_table — cloning is a small HashMap copy
559            // plus a handful of Arc bumps, far cheaper than rebuilding from
560            // config strings each attempt.
561            let writer_props = self.cached_writer_properties.clone();
562            let merge_session = self
563                .merge_session
564                .as_ref()
565                .expect("merge_session built in init_delta_table for Upsert mode");
566
567            super::delta_io::merge_changelog(
568                table,
569                combined,
570                &self.config.merge_key_columns,
571                &self.config.writer_id,
572                self.current_epoch,
573                self.config.schema_evolution,
574                writer_props,
575                merge_session,
576            )
577            .await
578            .map(|(t, result)| {
579                self.metrics.record_merge();
580                if result.rows_deleted > 0 {
581                    self.metrics.record_deletes(result.rows_deleted as u64);
582                }
583                t
584            })
585        } else {
586            // ── Append/Overwrite path ──
587            let save_mode = match self.config.write_mode {
588                DeltaWriteMode::Append => SaveMode::Append,
589                DeltaWriteMode::Overwrite => SaveMode::Overwrite,
590                DeltaWriteMode::Upsert => unreachable!("handled by the upsert branch above"),
591            };
592
593            let partition_cols = if self.config.partition_columns.is_empty() {
594                None
595            } else {
596                Some(self.config.partition_columns.as_slice())
597            };
598
599            // Create a Delta Lake checkpoint every N commits for read performance.
600            let should_checkpoint = self.config.checkpoint_interval > 0
601                && self.delta_version > 0
602                && (self.delta_version + 1).is_multiple_of(self.config.checkpoint_interval);
603
604            super::delta_io::write_batches(
605                table,
606                batches,
607                &self.config.writer_id,
608                self.current_epoch,
609                save_mode,
610                partition_cols,
611                self.config.schema_evolution,
612                Some(self.config.target_file_size),
613                should_checkpoint,
614                self.cached_writer_properties.clone(),
615            )
616            .await
617            .map(|(t, _version)| t)
618        }
619    }
620
621    /// Writes all staged data to Delta Lake as a single atomic transaction.
622    ///
623    /// Retries on optimistic concurrency conflicts with exponential backoff.
624    /// On non-conflict errors or exhausted retries, propagates the error.
625    #[cfg(feature = "delta-lake")]
626    #[allow(clippy::too_many_lines)]
627    async fn flush_staged_to_delta(&mut self) -> Result<WriteResult, ConnectorError> {
628        if self.staged_batches.is_empty() {
629            return Ok(WriteResult::new(0, 0));
630        }
631
632        // For upsert, concat the whole epoch and collapse the changelog to one
633        // row per merge key BEFORE the MERGE. This (a) makes the MERGE
634        // cardinality-safe — delta-rs rejects multiple source rows matching one
635        // target row, which every aggregate retract+insert would otherwise
636        // trigger — and (b) strips the Z-set `__weight` column the MERGE does
637        // not understand. Pre-concatenating also means conflict retries don't
638        // redo the O(rows × cols) copy each attempt. Append/overwrite passes
639        // the Vec straight to delta-rs, which handles multi-batch internally.
640        if self.config.write_mode == DeltaWriteMode::Upsert {
641            let combined = if self.staged_batches.len() == 1 {
642                self.staged_batches[0].clone()
643            } else {
644                let schema = self.staged_batches[0].schema();
645                arrow_select::concat::concat_batches(&schema, &self.staged_batches).map_err(
646                    |e| ConnectorError::Internal(format!("failed to concat staged batches: {e}")),
647                )?
648            };
649            let rows_in = combined.num_rows() as u64;
650            let collapse_start = Instant::now();
651            let collapsed =
652                crate::changelog::collapse_changelog(&combined, &self.config.merge_key_columns)?;
653            let (upserts_out, deletes_out) = count_collapsed_ops(&collapsed);
654            self.metrics.observe_collapse(
655                rows_in,
656                upserts_out,
657                deletes_out,
658                collapse_start.elapsed().as_secs_f64(),
659            );
660            self.staged_batches.clear();
661            self.staged_batches.push(collapsed);
662        }
663
664        let total_rows = self.staged_rows;
665        let estimated_bytes = self.staged_bytes;
666        let flush_start = Instant::now();
667
668        // Retry loop with exponential backoff for optimistic concurrency conflicts.
669        let backoff_ms = [100u64, 500, 2000];
670        let max_attempts = (self.config.max_commit_retries as usize).saturating_add(1);
671        let mut last_error: Option<ConnectorError> = None;
672
673        for attempt in 0..max_attempts {
674            // delta-rs consumes DeltaTable by value. If self.table is None
675            // (prior failure or checkpoint-boundary drop) reinstall it —
676            // prefer a scheduled background reopen, fall back to sync.
677            if self.table.is_none() {
678                let reopen_timeout = self.config.write_timeout;
679                if !self.try_install_pending_reopen(reopen_timeout).await {
680                    tokio::time::timeout(reopen_timeout, self.reopen_table())
681                        .await
682                        .map_err(|_| {
683                            ConnectorError::ConnectionFailed(format!(
684                                "Delta table reopen timed out after {}s",
685                                reopen_timeout.as_secs()
686                            ))
687                        })??;
688                }
689            }
690
691            let table = self
692                .table
693                .take()
694                .ok_or_else(|| ConnectorError::InvalidState {
695                    expected: "table initialized".into(),
696                    actual: "table not initialized".into(),
697                })?;
698
699            // Timeout the write to prevent hanging on stale connections.
700            // Azure LB drops idle connections after ~4 min; without this,
701            // a dead connection blocks the sink task forever.
702            let write_timeout = self.config.write_timeout;
703            let write_result =
704                tokio::time::timeout(write_timeout, self.attempt_delta_write(table)).await;
705
706            // Convert timeout to a write error. The table handle was consumed
707            // by attempt_delta_write; the retry loop will reopen via reopen_table().
708            let write_result = match write_result {
709                Ok(inner) => inner,
710                Err(_elapsed) => Err(ConnectorError::WriteError(format!(
711                    "Delta write timed out after {}s",
712                    write_timeout.as_secs()
713                ))),
714            };
715
716            match write_result {
717                Ok(table) => {
718                    #[allow(clippy::cast_sign_loss)]
719                    {
720                        self.delta_version = table.version().unwrap_or(0) as u64;
721                    }
722
723                    // delta-rs' in-memory Snapshot grows per commit and is not
724                    // compacted in place; drop on checkpoint boundaries so the
725                    // next flush re-opens from the checkpoint file. Pre-open
726                    // in the background so the next commit doesn't block.
727                    let crossed_checkpoint = self.config.checkpoint_interval > 0
728                        && self.delta_version > 0
729                        && self
730                            .delta_version
731                            .is_multiple_of(self.config.checkpoint_interval);
732                    if crossed_checkpoint {
733                        self.table = None;
734                        self.schedule_background_reopen();
735                    } else {
736                        self.table = Some(table);
737                    }
738
739                    self.staged_batches.clear();
740                    self.staged_rows = 0;
741                    self.staged_bytes = 0;
742
743                    self.metrics
744                        .record_flush(total_rows as u64, estimated_bytes);
745                    self.metrics.record_commit(self.delta_version);
746                    self.metrics
747                        .observe_flush_duration(flush_start.elapsed().as_secs_f64());
748
749                    debug!(
750                        rows = total_rows,
751                        bytes = estimated_bytes,
752                        delta_version = self.delta_version,
753                        attempt = attempt + 1,
754                        reopened = crossed_checkpoint,
755                        "Delta Lake: committed staged data to Delta"
756                    );
757
758                    return Ok(WriteResult::new(total_rows, estimated_bytes));
759                }
760                Err(e) => {
761                    if Self::is_conflict_error(&e) && attempt + 1 < max_attempts {
762                        self.metrics.record_conflict();
763                        self.metrics.record_retry();
764                        // ±25% jitter breaks up lockstep retries from
765                        // concurrent writers colliding on the same version.
766                        let base = backoff_ms.get(attempt).copied().unwrap_or(2000);
767                        let delay_ms = jittered_backoff_ms(base);
768                        warn!(
769                            attempt = attempt + 1,
770                            max_attempts,
771                            delay_ms,
772                            error = %e,
773                            "Delta Lake: conflict error, retrying after backoff"
774                        );
775                        tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
776                        last_error = Some(e);
777                        // self.table is already None; loop will re-open.
778                        continue;
779                    }
780                    // Non-conflict error or exhausted retries — propagate.
781                    self.metrics
782                        .observe_flush_duration(flush_start.elapsed().as_secs_f64());
783                    return Err(e);
784                }
785            }
786        }
787
788        // Should not reach here, but if it does, return the last error.
789        Err(last_error.unwrap_or_else(|| {
790            ConnectorError::Internal("flush_staged_to_delta: no attempts made".into())
791        }))
792    }
793
794    /// Splits a changelog `RecordBatch` into insert and delete batches.
795    ///
796    /// Uses the `_op` metadata column:
797    /// - `"I"` (insert), `"U"` (update-after), `"r"` (snapshot read) -> insert
798    /// - `"D"` (delete) -> delete
799    ///
800    /// The returned batches exclude metadata columns (those starting with `_`).
801    ///
802    /// # Errors
803    ///
804    /// Returns `ConnectorError::ConfigurationError` if the `_op` column is
805    /// missing or not a string type.
806    pub fn split_changelog_batch(
807        batch: &RecordBatch,
808    ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
809        let op_idx = batch.schema().index_of("_op").map_err(|_| {
810            ConnectorError::ConfigurationError(
811                "upsert mode requires '_op' column in input schema".into(),
812            )
813        })?;
814
815        let op_array = batch
816            .column(op_idx)
817            .as_any()
818            .downcast_ref::<arrow_array::StringArray>()
819            .ok_or_else(|| {
820                ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
821            })?;
822
823        // Build boolean masks for insert vs delete rows.
824        let len = op_array.len();
825        let mut insert_mask = Vec::with_capacity(len);
826        let mut delete_mask = Vec::with_capacity(len);
827
828        for i in 0..len {
829            if op_array.is_null(i) {
830                insert_mask.push(false);
831                delete_mask.push(false);
832                continue;
833            }
834            match op_array.value(i) {
835                "I" | "U" | "r" => {
836                    insert_mask.push(true);
837                    delete_mask.push(false);
838                }
839                "D" => {
840                    insert_mask.push(false);
841                    delete_mask.push(true);
842                }
843                _ => {
844                    insert_mask.push(false);
845                    delete_mask.push(false);
846                }
847            }
848        }
849
850        // Compute user-column projection indices once (strip metadata columns).
851        let user_col_indices: Vec<usize> = batch
852            .schema()
853            .fields()
854            .iter()
855            .enumerate()
856            .filter(|(_, f)| !f.name().starts_with('_'))
857            .map(|(i, _)| i)
858            .collect();
859
860        let insert_batch = filter_and_project(batch, insert_mask, &user_col_indices)?;
861        let delete_batch = filter_and_project(batch, delete_mask, &user_col_indices)?;
862
863        Ok((insert_batch, delete_batch))
864    }
865}
866
867/// Background compaction loop: OPTIMIZE + VACUUM with adaptive intervals.
868///
869/// Opens its own `DeltaTable` handle (no shared state with the sink).
870#[cfg(feature = "delta-lake")]
871#[allow(clippy::too_many_lines)]
872async fn compaction_loop(
873    table_path: String,
874    storage_options: Arc<std::collections::HashMap<String, String>>,
875    config: super::delta_config::CompactionConfig,
876    vacuum_retention: std::time::Duration,
877    compaction_props: Option<deltalake::parquet::file::properties::WriterProperties>,
878    cancel: tokio_util::sync::CancellationToken,
879) {
880    use super::delta_io;
881
882    /// Minimum adaptive compaction interval (floor).
883    const MIN_COMPACTION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
884
885    let base_interval = config.check_interval;
886    let mut current_interval = base_interval;
887    let mut consecutive_skips: u32 = 0;
888
889    info!(
890        table_path = %table_path,
891        check_interval_secs = base_interval.as_secs(),
892        "compaction background task started (adaptive interval)"
893    );
894
895    // Initial delay before the first check.
896    tokio::select! {
897        () = cancel.cancelled() => {
898            info!("compaction background task cancelled");
899            return;
900        }
901        () = tokio::time::sleep(current_interval) => {}
902    }
903
904    loop {
905        // Open a fresh table handle for compaction (no shared state).
906        let table =
907            match delta_io::open_or_create_table(&table_path, (*storage_options).clone(), None)
908                .await
909            {
910                Ok(t) => t,
911                Err(e) => {
912                    warn!(error = %e, "compaction: failed to open table, will retry");
913                    tokio::select! {
914                        () = cancel.cancelled() => {
915                            info!("compaction background task cancelled");
916                            return;
917                        }
918                        () = tokio::time::sleep(current_interval) => {}
919                    }
920                    continue;
921                }
922            };
923
924        // Check if compaction is needed.
925        let should_compact = match table.snapshot() {
926            Ok(snapshot) => {
927                let file_count = snapshot.log_data().num_files();
928                if file_count < config.min_files_for_compaction {
929                    debug!(
930                        file_count,
931                        min = config.min_files_for_compaction,
932                        "compaction: skipping, not enough files"
933                    );
934                    false
935                } else {
936                    true
937                }
938            }
939            Err(e) => {
940                warn!(error = %e, "compaction: snapshot failed, skipping tick");
941                false
942            }
943        };
944
945        if should_compact {
946            consecutive_skips = 0;
947            // Speed up: halve the interval (floor at MIN_COMPACTION_INTERVAL).
948            current_interval = (current_interval / 2).max(MIN_COMPACTION_INTERVAL);
949
950            let target_size = config.target_file_size as u64;
951            // Clone the pre-built properties (cheap) instead of re-parsing
952            // from config strings each tick.
953            match delta_io::run_compaction(
954                table,
955                target_size,
956                &config.z_order_columns,
957                compaction_props.clone(),
958            )
959            .await
960            {
961                Ok((table, result)) => {
962                    debug!(
963                        files_added = result.files_added,
964                        files_removed = result.files_removed,
965                        interval_secs = current_interval.as_secs(),
966                        "compaction: OPTIMIZE complete"
967                    );
968
969                    // Run VACUUM after compaction.
970                    match delta_io::run_vacuum(table, vacuum_retention).await {
971                        Ok((_table, files_deleted)) => {
972                            debug!(files_deleted, "compaction: VACUUM complete");
973                        }
974                        Err(e) => {
975                            warn!(error = %e, "compaction: VACUUM failed");
976                        }
977                    }
978                }
979                Err(e) => {
980                    warn!(error = %e, "compaction: OPTIMIZE failed");
981                }
982            }
983        } else {
984            consecutive_skips = consecutive_skips.saturating_add(1);
985            // Slow down: double the interval after 2 consecutive idle ticks.
986            if consecutive_skips >= 2 {
987                current_interval = (current_interval * 2).min(base_interval);
988            }
989        }
990
991        // Adaptive sleep: wait current_interval or cancel.
992        tokio::select! {
993            () = cancel.cancelled() => {
994                info!("compaction background task cancelled");
995                return;
996            }
997            () = tokio::time::sleep(current_interval) => {}
998        }
999    }
1000}
1001
1002/// Pre-creates a Unity Catalog external Delta table via the REST API if
1003/// `catalog.storage.location` is configured and a schema is available.
1004/// Idempotent: treats "already exists" (HTTP 409 / `ALREADY_EXISTS`) as success.
1005#[cfg(all(feature = "delta-lake", feature = "delta-lake-unity"))]
1006async fn ensure_uc_table_exists(
1007    config: &DeltaLakeSinkConfig,
1008    schema: Option<&SchemaRef>,
1009) -> Result<(), ConnectorError> {
1010    let super::delta_config::DeltaCatalogType::Unity {
1011        ref workspace_url,
1012        ref access_token,
1013    } = config.catalog_type
1014    else {
1015        return Ok(());
1016    };
1017
1018    let Some(ref storage_location) = config.catalog_storage_location else {
1019        return Ok(());
1020    };
1021
1022    let Some(arrow_schema) = schema else {
1023        warn!(
1024            "catalog.storage.location is set but no schema available — \
1025             skipping Unity Catalog auto-create"
1026        );
1027        return Ok(());
1028    };
1029
1030    let catalog = config.catalog_name.as_deref().unwrap_or_default();
1031    let schema_name = config.catalog_schema.as_deref().unwrap_or_default();
1032    let table_name = config
1033        .table_path
1034        .strip_prefix("uc://")
1035        .and_then(|s| s.rsplit('.').next())
1036        .unwrap_or(&config.table_path);
1037
1038    let columns = super::unity_catalog::arrow_to_uc_columns(arrow_schema);
1039    super::unity_catalog::create_uc_table(
1040        workspace_url,
1041        access_token,
1042        catalog,
1043        schema_name,
1044        table_name,
1045        storage_location,
1046        &columns,
1047    )
1048    .await
1049}
1050
1051#[async_trait]
1052impl SinkConnector for DeltaLakeSink {
1053    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
1054        self.state = ConnectorState::Initializing;
1055
1056        // Re-parse config if properties provided.
1057        if !config.properties().is_empty() {
1058            self.config = DeltaLakeSinkConfig::from_config(config)?;
1059        }
1060
1061        info!(
1062            table_path = %self.config.table_path,
1063            mode = %self.config.write_mode,
1064            guarantee = %self.config.delivery_guarantee,
1065            "opening Delta Lake sink connector"
1066        );
1067
1068        // When delta-lake feature is enabled, open/create the actual table.
1069        // If Unity Catalog auto-create is configured but no schema is available
1070        // yet, defer initialization to the first write_batch() call.
1071        #[cfg(feature = "delta-lake")]
1072        {
1073            let should_defer = matches!(
1074                self.config.catalog_type,
1075                super::delta_config::DeltaCatalogType::Unity { .. }
1076            ) && self.config.catalog_storage_location.is_some()
1077                && self.schema.is_none();
1078
1079            if should_defer {
1080                info!(
1081                    "Unity Catalog auto-create configured but pipeline schema not yet \
1082                     available — deferring Delta table init to first begin_epoch"
1083                );
1084                self.needs_deferred_delta_init = true;
1085                self.state = ConnectorState::Initializing;
1086                return Ok(());
1087            }
1088
1089            self.init_delta_table().await?;
1090
1091            // If table still has no version after init (new table, no schema yet),
1092            // defer full creation to the first write_batch() when schema is available.
1093            if self.table.as_ref().is_some_and(|t| t.version().is_none()) && self.schema.is_none() {
1094                self.needs_deferred_delta_init = true;
1095                self.state = ConnectorState::Initializing;
1096                return Ok(());
1097            }
1098        }
1099
1100        #[cfg(not(feature = "delta-lake"))]
1101        {
1102            self.state = ConnectorState::Failed;
1103            return Err(ConnectorError::ConfigurationError(
1104                "Delta Lake sink requires the 'delta-lake' feature to be enabled. \
1105                 Build with: cargo build --features delta-lake"
1106                    .into(),
1107            ));
1108        }
1109
1110        #[cfg(feature = "delta-lake")]
1111        {
1112            self.state = ConnectorState::Running;
1113            info!("Delta Lake sink connector opened successfully");
1114            Ok(())
1115        }
1116    }
1117
1118    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
1119        // Accept both Running and Initializing (deferred init in progress).
1120        if self.state != ConnectorState::Running && self.state != ConnectorState::Initializing {
1121            return Err(ConnectorError::InvalidState {
1122                expected: "Running".into(),
1123                actual: self.state.to_string(),
1124            });
1125        }
1126
1127        if batch.num_rows() == 0 {
1128            return Ok(WriteResult::new(0, 0));
1129        }
1130
1131        if self.epoch_skipped {
1132            return Ok(WriteResult::new(0, 0));
1133        }
1134
1135        // Handle schema on first write. In upsert mode, strip metadata columns
1136        // (_op, _ts_ms) so the Delta table isn't created with changelog columns.
1137        if self.schema.is_none() {
1138            self.schema = Some(Self::target_schema(&batch.schema(), self.config.write_mode));
1139        }
1140
1141        // Fallback for deferred init: if begin_epoch() couldn't complete
1142        // init (schema was still None), complete it now that the first
1143        // batch provides a schema.
1144        #[cfg(feature = "delta-lake")]
1145        if self.needs_deferred_delta_init {
1146            info!("schema now available from first batch — completing deferred Delta table init");
1147            match self.init_delta_table().await {
1148                Ok(()) => {
1149                    self.needs_deferred_delta_init = false;
1150                    self.state = ConnectorState::Running;
1151                    info!("Delta Lake sink connector opened successfully (deferred)");
1152                }
1153                Err(e) => {
1154                    self.state = ConnectorState::Failed;
1155                    return Err(e);
1156                }
1157            }
1158        }
1159
1160        let num_rows = batch.num_rows();
1161        let estimated_bytes = Self::estimate_batch_size(batch);
1162
1163        // Hard cap on combined buffered + staged data. Applies to both
1164        // delivery guarantees:
1165        // - Exactly-once cannot flush opportunistically from write_batch
1166        //   (a mid-epoch flush would leak rows on rollback), so pending
1167        //   accumulates until the next checkpoint.
1168        // - At-least-once normally auto-flushes via `should_flush()`, but
1169        //   if flushes fail repeatedly, `staged_batches` holds data while
1170        //   `buffer` keeps growing — without this cap the sink can OOM.
1171        //
1172        // A single incoming batch may itself exceed the cap; we cannot
1173        // split or reject it (the rows would be lost), so we widen the
1174        // cap to `num_rows`/`estimated_bytes` when a batch alone is
1175        // larger. This still rejects the *next* batch if that one-shot
1176        // admission left us bloated.
1177        let pending_rows = self.buffered_rows + self.staged_rows + num_rows;
1178        let pending_bytes = self.buffered_bytes + self.staged_bytes + estimated_bytes;
1179        let row_cap = self
1180            .config
1181            .max_buffer_records
1182            .saturating_mul(4)
1183            .max(num_rows);
1184        let byte_cap = (self.config.target_file_size as u64)
1185            .saturating_mul(4)
1186            .max(estimated_bytes);
1187        if pending_rows > row_cap || pending_bytes > byte_cap {
1188            return Err(ConnectorError::WriteError(format!(
1189                "delta sink buffer full ({pending_rows} rows, \
1190                 {pending_bytes} bytes pending; cap {row_cap} rows, \
1191                 {byte_cap} bytes) — backpressure until next flush/commit"
1192            )));
1193        }
1194
1195        // Buffer the batch.
1196        if self.buffer_start_time.is_none() {
1197            self.buffer_start_time = Some(Instant::now());
1198        }
1199        self.buffer.push(batch.clone());
1200        self.buffered_rows += num_rows;
1201        self.buffered_bytes += estimated_bytes;
1202
1203        #[cfg(feature = "delta-lake")]
1204        if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce && self.should_flush() {
1205            if !self.staged_batches.is_empty() {
1206                self.flush_staged_to_delta().await?;
1207            }
1208            self.staged_batches = std::mem::take(&mut self.buffer);
1209            self.staged_rows = self.buffered_rows;
1210            self.staged_bytes = self.buffered_bytes;
1211            self.buffered_rows = 0;
1212            self.buffered_bytes = 0;
1213            self.buffer_start_time = None;
1214            self.flush_staged_to_delta().await?;
1215        }
1216
1217        Ok(WriteResult::new(0, 0))
1218    }
1219
1220    fn schema(&self) -> SchemaRef {
1221        self.schema
1222            .clone()
1223            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
1224    }
1225
1226    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1227        // Complete deferred Delta table init on the first epoch.
1228        // This must run BEFORE the epoch skip check so that
1229        // last_committed_epoch is resolved from the Delta log.
1230        // Without this, exactly-once recovery would miss already-committed
1231        // epochs and produce duplicates.
1232        #[cfg(feature = "delta-lake")]
1233        if self.needs_deferred_delta_init {
1234            // Schema may not be available yet on the very first epoch.
1235            // If so, buffer the epoch — the write_batch will provide it.
1236            // But if the pipeline provided a schema via with_schema() or a
1237            // previous epoch's write_batch set it, complete init now.
1238            if self.schema.is_some() {
1239                info!("schema available — completing deferred Delta table init");
1240                match self.init_delta_table().await {
1241                    Ok(()) => {
1242                        self.needs_deferred_delta_init = false;
1243                        self.state = ConnectorState::Running;
1244                        info!("Delta Lake sink connector opened successfully (deferred)");
1245                    }
1246                    Err(e) => {
1247                        self.state = ConnectorState::Failed;
1248                        return Err(e);
1249                    }
1250                }
1251            }
1252        }
1253
1254        // For exactly-once, skip epochs already committed.
1255        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1256            && epoch <= self.last_committed_epoch
1257        {
1258            warn!(
1259                epoch,
1260                last_committed = self.last_committed_epoch,
1261                "Delta Lake: skipping already-committed epoch"
1262            );
1263            self.epoch_skipped = true;
1264            return Ok(());
1265        }
1266
1267        self.epoch_skipped = false;
1268        self.current_epoch = epoch;
1269        self.buffer.clear();
1270        self.buffered_rows = 0;
1271        self.buffered_bytes = 0;
1272        self.buffer_start_time = None;
1273
1274        debug!(epoch, "Delta Lake: began epoch");
1275        Ok(())
1276    }
1277
1278    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1279        // Skip if already committed (exactly-once idempotency).
1280        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1281            && epoch <= self.last_committed_epoch
1282        {
1283            return Ok(());
1284        }
1285
1286        // Stage buffered data for commit. The actual Delta write happens in
1287        // commit_epoch() — this ensures rollback_epoch() can discard the data
1288        // without leaving orphan files in Delta.
1289        if !self.buffer.is_empty() {
1290            self.staged_batches = std::mem::take(&mut self.buffer);
1291            self.staged_rows = self.buffered_rows;
1292            self.staged_bytes = self.buffered_bytes;
1293            self.buffered_rows = 0;
1294            self.buffered_bytes = 0;
1295            self.buffer_start_time = None;
1296        }
1297
1298        debug!(epoch, "Delta Lake: pre-committed (batches staged)");
1299        Ok(())
1300    }
1301
1302    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1303        // Skip if already committed (exactly-once idempotency).
1304        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1305            && epoch <= self.last_committed_epoch
1306        {
1307            return Ok(());
1308        }
1309
1310        // Write staged data to Delta as a single atomic transaction.
1311        #[cfg(feature = "delta-lake")]
1312        {
1313            if !self.staged_batches.is_empty() {
1314                self.flush_staged_to_delta().await?;
1315            }
1316        }
1317
1318        self.last_committed_epoch = epoch;
1319
1320        info!(
1321            epoch,
1322            delta_version = self.delta_version,
1323            "Delta Lake: committed epoch"
1324        );
1325
1326        Ok(())
1327    }
1328
1329    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1330        // Discard both buffered and staged data. Because the actual Delta
1331        // write only happens in commit_epoch(), no orphan files are created.
1332        self.buffer.clear();
1333        self.buffered_rows = 0;
1334        self.buffered_bytes = 0;
1335        self.buffer_start_time = None;
1336        self.staged_batches.clear();
1337        self.staged_rows = 0;
1338        self.staged_bytes = 0;
1339
1340        self.epoch_skipped = false;
1341        self.metrics.record_rollback();
1342        warn!(epoch, "Delta Lake: rolled back epoch");
1343        Ok(())
1344    }
1345
1346    fn capabilities(&self) -> SinkConnectorCapabilities {
1347        // Delta commits can run long under compaction or contention.
1348        let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(180)).with_idempotent();
1349
1350        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1351            caps = caps.with_exactly_once().with_two_phase_commit();
1352        }
1353        if self.config.write_mode == DeltaWriteMode::Upsert {
1354            caps = caps.with_upsert().with_changelog();
1355        }
1356        if self.config.schema_evolution {
1357            caps = caps.with_schema_evolution();
1358        }
1359        if !self.config.partition_columns.is_empty() {
1360            caps = caps.with_partitioned();
1361        }
1362
1363        caps
1364    }
1365
1366    async fn flush(&mut self) -> Result<(), ConnectorError> {
1367        // For at-least-once delivery, flush() is the only commit trigger
1368        // because the checkpoint coordinator skips begin_epoch/pre_commit/
1369        // commit_epoch for non-exactly-once sinks. Write directly to Delta.
1370        #[cfg(feature = "delta-lake")]
1371        if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1372            // Retry any orphaned staged data from a prior failed flush
1373            // before moving new data in, to prevent silent data loss.
1374            // flush_staged_to_delta handles table == None via reopen_table().
1375            if !self.staged_batches.is_empty() {
1376                self.flush_staged_to_delta().await?;
1377            }
1378
1379            // Stage new buffered data and flush to Delta.
1380            if !self.buffer.is_empty() {
1381                self.staged_batches = std::mem::take(&mut self.buffer);
1382                self.staged_rows = self.buffered_rows;
1383                self.staged_bytes = self.buffered_bytes;
1384                self.buffered_rows = 0;
1385                self.buffered_bytes = 0;
1386                self.buffer_start_time = None;
1387
1388                self.flush_staged_to_delta().await?;
1389            }
1390            return Ok(());
1391        }
1392
1393        if self.buffer.is_empty() {
1394            return Ok(());
1395        }
1396
1397        // For exactly-once, just coalesce in memory — the 2PC path
1398        // (pre_commit + commit_epoch) handles the actual Delta write.
1399        if self.buffer.len() > 1 {
1400            let schema = self.buffer[0].schema();
1401            let combined = arrow_select::concat::concat_batches(&schema, &self.buffer)
1402                .map_err(|e| ConnectorError::Internal(format!("concat failed: {e}")))?;
1403            self.buffer.clear();
1404            self.buffer.push(combined);
1405        }
1406        Ok(())
1407    }
1408
1409    async fn close(&mut self) -> Result<(), ConnectorError> {
1410        info!("closing Delta Lake sink connector");
1411
1412        // Commit any remaining data before closing.
1413        // For at-least-once, use flush() which handles both orphaned
1414        // staged data and buffered data. For exactly-once, use 2PC.
1415        #[cfg(feature = "delta-lake")]
1416        if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1417            self.flush().await?;
1418        } else if !self.buffer.is_empty() {
1419            self.pre_commit(self.current_epoch).await?;
1420            self.commit_epoch(self.current_epoch).await?;
1421        }
1422
1423        // Cancel and join the background compaction task.
1424        #[cfg(feature = "delta-lake")]
1425        {
1426            if let Some(cancel) = self.compaction_cancel.take() {
1427                cancel.cancel();
1428            }
1429            if let Some(handle) = self.compaction_handle.take() {
1430                // Wait up to 5 seconds for the compaction task to finish.
1431                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
1432            }
1433        }
1434
1435        #[cfg(feature = "delta-lake")]
1436        if let Some(pending) = self.pending_reopen.take() {
1437            pending.abort();
1438        }
1439
1440        // Drop the table handle when closing.
1441        #[cfg(feature = "delta-lake")]
1442        {
1443            self.table = None;
1444        }
1445
1446        self.state = ConnectorState::Closed;
1447
1448        info!(
1449            table_path = %self.config.table_path,
1450            delta_version = self.delta_version,
1451            "Delta Lake sink connector closed"
1452        );
1453
1454        Ok(())
1455    }
1456}
1457
1458/// Safety-net: if the sink is dropped mid-lifecycle (panic, config error,
1459/// shutdown without calling `close()`), cancel any background work so we
1460/// don't leak a compaction loop or a stray table-reopen task.
1461#[cfg(feature = "delta-lake")]
1462impl Drop for DeltaLakeSink {
1463    fn drop(&mut self) {
1464        if let Some(cancel) = self.compaction_cancel.take() {
1465            cancel.cancel();
1466        }
1467        if let Some(handle) = self.compaction_handle.take() {
1468            handle.abort();
1469        }
1470        if let Some(handle) = self.pending_reopen.take() {
1471            handle.abort();
1472        }
1473    }
1474}
1475
1476impl std::fmt::Debug for DeltaLakeSink {
1477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1478        f.debug_struct("DeltaLakeSink")
1479            .field("state", &self.state)
1480            .field("table_path", &self.config.table_path)
1481            .field("mode", &self.config.write_mode)
1482            .field("guarantee", &self.config.delivery_guarantee)
1483            .field("current_epoch", &self.current_epoch)
1484            .field("last_committed_epoch", &self.last_committed_epoch)
1485            .field("buffered_rows", &self.buffered_rows)
1486            .field("delta_version", &self.delta_version)
1487            .field("epoch_skipped", &self.epoch_skipped)
1488            .finish_non_exhaustive()
1489    }
1490}
1491
1492// ── Helper functions ────────────────────────────────────────────────
1493
1494/// Applies ±25% jitter to a backoff delay in milliseconds.
1495///
1496/// Without jitter, multiple writers colliding on the same Delta version
1497/// retry in lockstep and keep colliding — a thundering herd. The 0.75–1.25
1498/// range spreads retries across a 500ms window for the default 2s max.
1499#[cfg(feature = "delta-lake")]
1500fn jittered_backoff_ms(base_ms: u64) -> u64 {
1501    use rand::RngExt as _;
1502    let factor: f64 = rand::rng().random_range(0.75_f64..=1.25_f64);
1503    #[allow(
1504        clippy::cast_possible_truncation,
1505        clippy::cast_sign_loss,
1506        clippy::cast_precision_loss
1507    )]
1508    let jittered = (base_ms as f64 * factor) as u64;
1509    jittered.max(1)
1510}
1511
1512/// Filters a `RecordBatch` using a boolean mask and projects to the given column indices.
1513///
1514/// Takes `mask` by value to hand it straight to `BooleanArray::from` without
1515/// an intermediate `Vec<bool>` copy. Projects before filtering so the SIMD
1516/// kernel only walks user columns, not the dropped metadata columns.
1517fn filter_and_project(
1518    batch: &RecordBatch,
1519    mask: Vec<bool>,
1520    col_indices: &[usize],
1521) -> Result<RecordBatch, ConnectorError> {
1522    use arrow_array::BooleanArray;
1523    use arrow_select::filter::filter_record_batch;
1524
1525    let bool_array = BooleanArray::from(mask);
1526
1527    let projected = batch
1528        .project(col_indices)
1529        .map_err(|e| ConnectorError::Internal(format!("batch projection failed: {e}")))?;
1530
1531    filter_record_batch(&projected, &bool_array)
1532        .map_err(|e| ConnectorError::Internal(format!("arrow filter failed: {e}")))
1533}
1534
1535#[cfg(test)]
1536#[allow(clippy::cast_possible_wrap)]
1537#[allow(clippy::cast_precision_loss)]
1538#[allow(clippy::float_cmp)]
1539mod tests {
1540    use super::super::delta_config::DeltaCatalogType;
1541    use super::*;
1542    use arrow_array::{Float64Array, Int64Array, StringArray};
1543    use arrow_schema::{DataType, Field, Schema};
1544
1545    fn test_schema() -> SchemaRef {
1546        Arc::new(Schema::new(vec![
1547            Field::new("id", DataType::Int64, false),
1548            Field::new("name", DataType::Utf8, true),
1549            Field::new("value", DataType::Float64, true),
1550        ]))
1551    }
1552
1553    fn test_config() -> DeltaLakeSinkConfig {
1554        #[cfg(unix)]
1555        let path = "/tmp/delta_test_nonexistent_8f3a";
1556        #[cfg(windows)]
1557        let path = "C:\\delta_test_nonexistent_8f3a";
1558        DeltaLakeSinkConfig::new(path)
1559    }
1560
1561    fn upsert_config() -> DeltaLakeSinkConfig {
1562        let mut cfg = test_config();
1563        cfg.write_mode = DeltaWriteMode::Upsert;
1564        cfg.merge_key_columns = vec!["id".to_string()];
1565        cfg
1566    }
1567
1568    fn test_batch(n: usize) -> RecordBatch {
1569        let ids: Vec<i64> = (0..n as i64).collect();
1570        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1571        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1572
1573        RecordBatch::try_new(
1574            test_schema(),
1575            vec![
1576                Arc::new(Int64Array::from(ids)),
1577                Arc::new(StringArray::from(names)),
1578                Arc::new(Float64Array::from(values)),
1579            ],
1580        )
1581        .unwrap()
1582    }
1583
1584    // ── Constructor tests ──
1585
1586    #[test]
1587    fn test_new_defaults() {
1588        let sink = DeltaLakeSink::new(test_config(), None);
1589        assert_eq!(sink.state(), ConnectorState::Created);
1590        assert_eq!(sink.current_epoch(), 0);
1591        assert_eq!(sink.last_committed_epoch(), 0);
1592        assert_eq!(sink.buffered_rows(), 0);
1593        assert_eq!(sink.buffered_bytes(), 0);
1594        assert_eq!(sink.delta_version(), 0);
1595        assert!(sink.schema.is_none());
1596    }
1597
1598    #[test]
1599    fn test_with_schema() {
1600        let schema = test_schema();
1601        let sink = DeltaLakeSink::with_schema(test_config(), schema.clone());
1602        assert_eq!(sink.schema(), schema);
1603    }
1604
1605    #[test]
1606    fn test_schema_empty_when_none() {
1607        let sink = DeltaLakeSink::new(test_config(), None);
1608        let schema = sink.schema();
1609        assert_eq!(schema.fields().len(), 0);
1610    }
1611
1612    #[cfg(feature = "delta-lake")]
1613    #[test]
1614    fn test_deferred_init_flag_default_false() {
1615        let sink = DeltaLakeSink::new(test_config(), None);
1616        assert!(!sink.needs_deferred_delta_init);
1617    }
1618
1619    fn unity_config() -> DeltaLakeSinkConfig {
1620        let mut config = test_config();
1621        config.catalog_type = DeltaCatalogType::Unity {
1622            workspace_url: "https://test.azuredatabricks.net".to_string(),
1623            access_token: "dapi123".to_string(),
1624        };
1625        config.catalog_name = Some("main".to_string());
1626        config.catalog_schema = Some("default".to_string());
1627        config.catalog_storage_location = Some("abfss://c@acct.dfs.core.windows.net/t".to_string());
1628        config
1629    }
1630
1631    #[cfg(feature = "delta-lake")]
1632    #[tokio::test]
1633    async fn test_open_defers_init_for_unity_no_schema() {
1634        use crate::config::ConnectorConfig;
1635
1636        let config = unity_config();
1637        let mut sink = DeltaLakeSink::new(config, None);
1638
1639        // open() with empty ConnectorConfig (simulates factory path)
1640        let connector_config = ConnectorConfig::new("delta-lake");
1641        // open() will re-parse but table.path is "/tmp/delta_test" (local),
1642        // so it won't actually reach UC REST. However from_config requires
1643        // table.path, so we use the sink's existing config by passing empty.
1644        // The sink skips re-parse when properties are empty.
1645        let result = sink.open(&connector_config).await;
1646        assert!(result.is_ok());
1647
1648        // Should be in Initializing state with deferred flag set.
1649        assert!(sink.needs_deferred_delta_init);
1650        assert_eq!(sink.state(), ConnectorState::Initializing);
1651        assert!(sink.schema.is_none());
1652    }
1653
1654    #[cfg(feature = "delta-lake")]
1655    #[tokio::test]
1656    async fn test_deferred_init_transitions_to_failed_on_error() {
1657        // When deferred init fails, the sink must transition to Failed
1658        // to prevent an unbounded retry storm.
1659        let mut sink = DeltaLakeSink::new(test_config(), None);
1660        sink.state = ConnectorState::Initializing;
1661        sink.needs_deferred_delta_init = true;
1662        sink.schema = Some(test_schema());
1663
1664        // begin_epoch will try init_delta_table() which will fail
1665        // (no real Delta table at /tmp/delta_test). The sink should
1666        // transition to Failed.
1667        let result = sink.begin_epoch(1).await;
1668        assert!(result.is_err());
1669        assert_eq!(sink.state(), ConnectorState::Failed);
1670        // Flag may still be set, but Failed state prevents further usage.
1671    }
1672
1673    #[cfg(feature = "delta-lake")]
1674    #[tokio::test]
1675    async fn test_write_batch_accepts_initializing_state() {
1676        // During deferred init, write_batch must accept Initializing state
1677        // so the first batch can provide the schema.
1678        let mut sink = DeltaLakeSink::new(test_config(), None);
1679        sink.state = ConnectorState::Initializing;
1680        sink.needs_deferred_delta_init = true;
1681
1682        let batch = test_batch(5);
1683        // write_batch sets schema, then tries init_delta_table which fails.
1684        // Sink transitions to Failed.
1685        let result = sink.write_batch(&batch).await;
1686        assert!(result.is_err());
1687        assert_eq!(sink.state(), ConnectorState::Failed);
1688        // Schema was set before init was attempted.
1689        assert!(sink.schema.is_some());
1690    }
1691
1692    #[test]
1693    fn test_no_deferred_init_without_catalog_storage_location() {
1694        // Unity catalog without catalog.storage.location should NOT defer.
1695        let mut config = unity_config();
1696        config.catalog_storage_location = None;
1697        let sink = DeltaLakeSink::new(config, None);
1698
1699        let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1700            && sink.config.catalog_storage_location.is_some()
1701            && sink.schema.is_none();
1702        assert!(!should_defer);
1703    }
1704
1705    #[test]
1706    fn test_no_deferred_init_with_schema() {
1707        // Unity catalog with schema already set should NOT defer.
1708        let config = unity_config();
1709        let sink = DeltaLakeSink::with_schema(config, test_schema());
1710
1711        let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1712            && sink.config.catalog_storage_location.is_some()
1713            && sink.schema.is_none();
1714        assert!(!should_defer);
1715    }
1716
1717    // ── Batch size estimation ──
1718
1719    #[test]
1720    fn test_estimate_batch_size() {
1721        let batch = test_batch(100);
1722        let size = DeltaLakeSink::estimate_batch_size(&batch);
1723        assert!(size > 0);
1724    }
1725
1726    #[test]
1727    fn test_estimate_batch_size_empty() {
1728        let batch = RecordBatch::new_empty(test_schema());
1729        let size = DeltaLakeSink::estimate_batch_size(&batch);
1730        // Arrow arrays have baseline buffer allocation even with 0 rows,
1731        // so size may be small but not necessarily zero.
1732        assert!(size < 1024);
1733    }
1734
1735    // ── Should flush tests ──
1736
1737    #[test]
1738    fn test_should_flush_by_rows() {
1739        let mut config = test_config();
1740        config.max_buffer_records = 100;
1741        let mut sink = DeltaLakeSink::new(config, None);
1742        sink.buffered_rows = 99;
1743        assert!(!sink.should_flush());
1744        sink.buffered_rows = 100;
1745        assert!(sink.should_flush());
1746    }
1747
1748    #[test]
1749    fn test_should_flush_by_bytes() {
1750        let mut config = test_config();
1751        config.target_file_size = 1000;
1752        let mut sink = DeltaLakeSink::new(config, None);
1753        sink.buffered_bytes = 999;
1754        assert!(!sink.should_flush());
1755        sink.buffered_bytes = 1000;
1756        assert!(sink.should_flush());
1757    }
1758
1759    #[test]
1760    fn test_should_flush_empty() {
1761        let sink = DeltaLakeSink::new(test_config(), None);
1762        assert!(!sink.should_flush());
1763    }
1764
1765    #[tokio::test]
1766    async fn test_exactly_once_buffer_backpressure() {
1767        // Exactly-once mode rejects writes once the cumulative pending
1768        // buffer exceeds the hard cap (4× max_buffer_records). A single
1769        // incoming batch that by itself exceeds the cap is admitted (we
1770        // cannot split it without breaking exactly-once), but subsequent
1771        // batches hit backpressure.
1772        let mut config = test_config();
1773        config.delivery_guarantee = crate::connector::DeliveryGuarantee::ExactlyOnce;
1774        config.max_buffer_records = 10;
1775        let mut sink = DeltaLakeSink::new(config, None);
1776        sink.state = ConnectorState::Running;
1777
1778        // First batch of 50 rows exceeds 4× cap (40) but we admit it —
1779        // rejecting would wedge the pipeline with no way to split.
1780        let first = test_batch(50);
1781        sink.write_batch(&first)
1782            .await
1783            .expect("single oversized batch must be admitted");
1784        assert_eq!(sink.buffered_rows(), 50);
1785
1786        // A second normal batch must now be rejected: cumulative pending
1787        // (50 + 5 = 55) exceeds the effective cap (max(40, 5) = 40).
1788        let second = test_batch(5);
1789        let err = sink
1790            .write_batch(&second)
1791            .await
1792            .expect_err("should reject once cumulative buffer exceeds cap");
1793        let msg = err.to_string();
1794        assert!(
1795            msg.contains("buffer full"),
1796            "expected backpressure error, got: {msg}"
1797        );
1798        // Rejected batch must NOT have been buffered.
1799        assert_eq!(sink.buffered_rows(), 50);
1800    }
1801
1802    // ── Batch buffering tests ──
1803
1804    #[tokio::test]
1805    async fn test_write_batch_buffering() {
1806        let mut config = test_config();
1807        config.max_buffer_records = 100;
1808        let mut sink = DeltaLakeSink::new(config, None);
1809        sink.state = ConnectorState::Running;
1810
1811        let batch = test_batch(10);
1812        let result = sink.write_batch(&batch).await.unwrap();
1813
1814        // Should buffer, not flush (10 < 100)
1815        assert_eq!(result.records_written, 0);
1816        assert_eq!(sink.buffered_rows(), 10);
1817        assert!(sink.buffered_bytes() > 0);
1818    }
1819
1820    #[tokio::test]
1821    async fn test_write_batch_empty() {
1822        let mut sink = DeltaLakeSink::new(test_config(), None);
1823        sink.state = ConnectorState::Running;
1824
1825        let batch = test_batch(0);
1826        let result = sink.write_batch(&batch).await.unwrap();
1827        assert_eq!(result.records_written, 0);
1828        assert_eq!(sink.buffered_rows(), 0);
1829    }
1830
1831    #[tokio::test]
1832    async fn test_write_batch_not_running() {
1833        let mut sink = DeltaLakeSink::new(test_config(), None);
1834        // state is Created, not Running
1835
1836        let batch = test_batch(10);
1837        let result = sink.write_batch(&batch).await;
1838        assert!(result.is_err());
1839    }
1840
1841    #[tokio::test]
1842    async fn test_write_batch_sets_schema() {
1843        let mut sink = DeltaLakeSink::new(test_config(), None);
1844        sink.state = ConnectorState::Running;
1845        assert!(sink.schema.is_none());
1846
1847        let batch = test_batch(5);
1848        sink.write_batch(&batch).await.unwrap();
1849        assert!(sink.schema.is_some());
1850        assert_eq!(sink.schema.as_ref().unwrap().fields().len(), 3);
1851    }
1852
1853    #[tokio::test]
1854    async fn test_multiple_write_batches_accumulate() {
1855        let mut config = test_config();
1856        config.max_buffer_records = 100;
1857        let mut sink = DeltaLakeSink::new(config, None);
1858        sink.state = ConnectorState::Running;
1859
1860        let batch = test_batch(10);
1861        sink.write_batch(&batch).await.unwrap();
1862        sink.write_batch(&batch).await.unwrap();
1863        sink.write_batch(&batch).await.unwrap();
1864
1865        assert_eq!(sink.buffered_rows(), 30);
1866    }
1867
1868    // ── Epoch lifecycle tests ──
1869    // Note: Epoch lifecycle with real I/O is tested in delta_io.rs integration tests.
1870
1871    #[tokio::test]
1872    async fn test_rollback_clears_buffer() {
1873        let mut config = test_config();
1874        config.max_buffer_records = 1000;
1875        let mut sink = DeltaLakeSink::new(config, None);
1876        sink.state = ConnectorState::Running;
1877
1878        let batch = test_batch(50);
1879        sink.write_batch(&batch).await.unwrap();
1880        assert_eq!(sink.buffered_rows(), 50);
1881
1882        sink.rollback_epoch(0).await.unwrap();
1883        assert_eq!(sink.buffered_rows(), 0);
1884        assert_eq!(sink.buffered_bytes(), 0);
1885    }
1886
1887    /// D001: Rollback after `pre_commit` must discard staged data.
1888    /// `pre_commit` stages batches; rollback discards them without writing to Delta.
1889    #[tokio::test]
1890    async fn test_rollback_after_pre_commit_discards_staged() {
1891        let mut config = test_config();
1892        config.max_buffer_records = 1000;
1893        let mut sink = DeltaLakeSink::new(config, None);
1894        sink.state = ConnectorState::Running;
1895
1896        sink.begin_epoch(1).await.unwrap();
1897        let batch = test_batch(50);
1898        sink.write_batch(&batch).await.unwrap();
1899        assert_eq!(sink.buffered_rows(), 50);
1900
1901        // pre_commit stages the buffer
1902        sink.pre_commit(1).await.unwrap();
1903        assert_eq!(sink.buffered_rows(), 0);
1904        assert_eq!(sink.staged_rows, 50);
1905        assert!(!sink.staged_batches.is_empty());
1906
1907        // rollback discards both buffer and staged data
1908        sink.rollback_epoch(1).await.unwrap();
1909        assert_eq!(sink.buffered_rows(), 0);
1910        assert_eq!(sink.staged_rows, 0);
1911        assert_eq!(sink.staged_bytes, 0);
1912        assert!(sink.staged_batches.is_empty());
1913        assert_eq!(sink.delta_version(), 0); // no Delta write occurred
1914    }
1915
1916    /// Staged data is preserved across `pre_commit` → failed commit → rollback.
1917    /// This verifies that `pre_commit` does not destroy staged state, so a
1918    /// subsequent rollback can discard it cleanly.
1919    #[tokio::test]
1920    async fn test_staged_data_preserved_until_commit_or_rollback() {
1921        let mut config = test_config();
1922        config.max_buffer_records = 1000;
1923        let mut sink = DeltaLakeSink::new(config, None);
1924        sink.state = ConnectorState::Running;
1925
1926        sink.begin_epoch(1).await.unwrap();
1927        sink.write_batch(&test_batch(25)).await.unwrap();
1928        sink.write_batch(&test_batch(25)).await.unwrap();
1929
1930        // pre_commit moves buffer → staged
1931        sink.pre_commit(1).await.unwrap();
1932        assert_eq!(sink.staged_rows, 50);
1933        assert_eq!(sink.staged_batches.len(), 2);
1934        assert_eq!(sink.buffered_rows(), 0);
1935
1936        // Simulate: commit_epoch would write to Delta, but without the
1937        // feature we exercise the local path. Verify staged state is
1938        // consumed only on success.
1939        // (Without delta-lake feature, commit_epoch calls commit_local
1940        // which succeeds.)
1941
1942        // Instead, test rollback: staged data should be discarded.
1943        sink.rollback_epoch(1).await.unwrap();
1944        assert!(sink.staged_batches.is_empty());
1945        assert_eq!(sink.staged_rows, 0);
1946        assert_eq!(sink.staged_bytes, 0);
1947    }
1948
1949    #[tokio::test]
1950    async fn test_commit_empty_epoch() {
1951        let mut sink = DeltaLakeSink::new(test_config(), None);
1952        sink.state = ConnectorState::Running;
1953
1954        sink.begin_epoch(1).await.unwrap();
1955        // No writes
1956        sink.commit_epoch(1).await.unwrap();
1957        assert_eq!(sink.last_committed_epoch(), 1);
1958        assert_eq!(sink.delta_version(), 0); // No version bump (no files)
1959    }
1960
1961    // ── Flush tests ──
1962    // Note: These tests bypass open() and test business logic only.
1963
1964    #[tokio::test]
1965    async fn test_flush_coalesces_buffer() {
1966        let mut config = test_config();
1967        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1968        config.writer_id = "test-writer".to_string();
1969        let mut sink = DeltaLakeSink::new(config, None);
1970        sink.state = ConnectorState::Running;
1971
1972        let batch = test_batch(10);
1973        sink.write_batch(&batch).await.unwrap();
1974        sink.write_batch(&batch).await.unwrap();
1975        assert_eq!(sink.buffer.len(), 2);
1976
1977        // flush() coalesces batches but does not write to Delta.
1978        sink.flush().await.unwrap();
1979        assert_eq!(sink.buffer.len(), 1);
1980        assert_eq!(sink.buffered_rows(), 20);
1981    }
1982
1983    // ── Open and close tests ──
1984    // Note: These tests use fake paths that don't exist.
1985    // With the delta-lake feature, open() tries to actually access the path.
1986    // See delta_io.rs for integration tests with real I/O.
1987
1988    #[tokio::test]
1989    async fn test_close() {
1990        let mut sink = DeltaLakeSink::new(test_config(), None);
1991        sink.state = ConnectorState::Running;
1992
1993        sink.close().await.unwrap();
1994        assert_eq!(sink.state(), ConnectorState::Closed);
1995    }
1996
1997    // ── Capabilities tests ──
1998
1999    #[test]
2000    fn test_capabilities_append_exactly_once() {
2001        let mut config = test_config();
2002        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2003        let sink = DeltaLakeSink::new(config, None);
2004        let caps = sink.capabilities();
2005        assert!(caps.exactly_once);
2006        assert!(caps.idempotent);
2007        assert!(!caps.upsert);
2008        assert!(!caps.changelog);
2009        assert!(!caps.schema_evolution);
2010        assert!(!caps.partitioned);
2011    }
2012
2013    #[test]
2014    fn test_capabilities_upsert() {
2015        let sink = DeltaLakeSink::new(upsert_config(), None);
2016        let caps = sink.capabilities();
2017        assert!(caps.upsert);
2018        assert!(caps.changelog);
2019        assert!(caps.idempotent);
2020    }
2021
2022    #[test]
2023    fn test_capabilities_schema_evolution() {
2024        let mut config = test_config();
2025        config.schema_evolution = true;
2026        let sink = DeltaLakeSink::new(config, None);
2027        let caps = sink.capabilities();
2028        assert!(caps.schema_evolution);
2029    }
2030
2031    #[test]
2032    fn test_capabilities_partitioned() {
2033        let mut config = test_config();
2034        config.partition_columns = vec!["trade_date".to_string()];
2035        let sink = DeltaLakeSink::new(config, None);
2036        let caps = sink.capabilities();
2037        assert!(caps.partitioned);
2038    }
2039
2040    #[test]
2041    fn test_capabilities_at_least_once() {
2042        let mut config = test_config();
2043        config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
2044        let sink = DeltaLakeSink::new(config, None);
2045        let caps = sink.capabilities();
2046        assert!(!caps.exactly_once);
2047        assert!(caps.idempotent);
2048    }
2049
2050    // ── Changelog splitting tests ──
2051
2052    fn changelog_schema() -> SchemaRef {
2053        Arc::new(Schema::new(vec![
2054            Field::new("id", DataType::Int64, false),
2055            Field::new("name", DataType::Utf8, true),
2056            Field::new("_op", DataType::Utf8, false),
2057            Field::new("_ts_ms", DataType::Int64, false),
2058        ]))
2059    }
2060
2061    fn changelog_batch() -> RecordBatch {
2062        RecordBatch::try_new(
2063            changelog_schema(),
2064            vec![
2065                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
2066                Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
2067                Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
2068                Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
2069            ],
2070        )
2071        .unwrap()
2072    }
2073
2074    #[test]
2075    fn test_split_changelog_batch() {
2076        let batch = changelog_batch();
2077        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2078
2079        // Inserts: rows 0 (I), 1 (U), 3 (I) = 3 rows
2080        assert_eq!(inserts.num_rows(), 3);
2081        // Deletes: rows 2 (D), 4 (D) = 2 rows
2082        assert_eq!(deletes.num_rows(), 2);
2083
2084        // Metadata columns should be stripped
2085        assert_eq!(inserts.num_columns(), 2); // id, name only
2086        assert_eq!(deletes.num_columns(), 2);
2087
2088        // Verify insert values
2089        let insert_ids = inserts
2090            .column(0)
2091            .as_any()
2092            .downcast_ref::<Int64Array>()
2093            .unwrap();
2094        assert_eq!(insert_ids.value(0), 1);
2095        assert_eq!(insert_ids.value(1), 2);
2096        assert_eq!(insert_ids.value(2), 4);
2097
2098        // Verify delete values
2099        let delete_ids = deletes
2100            .column(0)
2101            .as_any()
2102            .downcast_ref::<Int64Array>()
2103            .unwrap();
2104        assert_eq!(delete_ids.value(0), 3);
2105        assert_eq!(delete_ids.value(1), 5);
2106    }
2107
2108    #[test]
2109    fn test_split_changelog_all_inserts() {
2110        let schema = changelog_schema();
2111        let batch = RecordBatch::try_new(
2112            schema,
2113            vec![
2114                Arc::new(Int64Array::from(vec![1, 2])),
2115                Arc::new(StringArray::from(vec!["a", "b"])),
2116                Arc::new(StringArray::from(vec!["I", "I"])),
2117                Arc::new(Int64Array::from(vec![100, 200])),
2118            ],
2119        )
2120        .unwrap();
2121
2122        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2123        assert_eq!(inserts.num_rows(), 2);
2124        assert_eq!(deletes.num_rows(), 0);
2125    }
2126
2127    #[test]
2128    fn test_split_changelog_all_deletes() {
2129        let schema = changelog_schema();
2130        let batch = RecordBatch::try_new(
2131            schema,
2132            vec![
2133                Arc::new(Int64Array::from(vec![1, 2])),
2134                Arc::new(StringArray::from(vec!["a", "b"])),
2135                Arc::new(StringArray::from(vec!["D", "D"])),
2136                Arc::new(Int64Array::from(vec![100, 200])),
2137            ],
2138        )
2139        .unwrap();
2140
2141        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2142        assert_eq!(inserts.num_rows(), 0);
2143        assert_eq!(deletes.num_rows(), 2);
2144    }
2145
2146    #[test]
2147    fn test_split_changelog_missing_op_column() {
2148        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
2149        let batch =
2150            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
2151
2152        let result = DeltaLakeSink::split_changelog_batch(&batch);
2153        assert!(result.is_err());
2154    }
2155
2156    #[test]
2157    fn test_split_changelog_snapshot_read() {
2158        let schema = changelog_schema();
2159        let batch = RecordBatch::try_new(
2160            schema,
2161            vec![
2162                Arc::new(Int64Array::from(vec![1])),
2163                Arc::new(StringArray::from(vec!["a"])),
2164                Arc::new(StringArray::from(vec!["r"])), // snapshot read
2165                Arc::new(Int64Array::from(vec![100])),
2166            ],
2167        )
2168        .unwrap();
2169
2170        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2171        assert_eq!(inserts.num_rows(), 1);
2172        assert_eq!(deletes.num_rows(), 0);
2173    }
2174
2175    // ── Debug output test ──
2176
2177    #[test]
2178    fn test_debug_output() {
2179        let sink = DeltaLakeSink::new(test_config(), None);
2180        let debug = format!("{sink:?}");
2181        assert!(debug.contains("DeltaLakeSink"));
2182        assert!(debug.contains("delta_test_nonexistent_8f3a"));
2183    }
2184
2185    // ── End-to-end upsert collapse (aggregating-MV changelog → Delta table) ──
2186
2187    /// A Z-set changelog batch shaped like aggregating-MV output:
2188    /// `[region: Utf8, total: Int64, __weight: Int64]`.
2189    #[cfg(feature = "delta-lake")]
2190    fn zset_changelog(rows: &[(&str, i64, i64)]) -> RecordBatch {
2191        let schema = Arc::new(Schema::new(vec![
2192            Field::new("region", DataType::Utf8, false),
2193            Field::new("total", DataType::Int64, false),
2194            Field::new(
2195                laminar_core::changelog::WEIGHT_COLUMN,
2196                DataType::Int64,
2197                false,
2198            ),
2199        ]));
2200        RecordBatch::try_new(
2201            schema,
2202            vec![
2203                Arc::new(StringArray::from(
2204                    rows.iter().map(|r| r.0).collect::<Vec<_>>(),
2205                )),
2206                Arc::new(Int64Array::from(
2207                    rows.iter().map(|r| r.1).collect::<Vec<_>>(),
2208                )),
2209                Arc::new(Int64Array::from(
2210                    rows.iter().map(|r| r.2).collect::<Vec<_>>(),
2211                )),
2212            ],
2213        )
2214        .unwrap()
2215    }
2216
2217    /// Drive one full epoch through the exactly-once sink lifecycle.
2218    #[cfg(feature = "delta-lake")]
2219    async fn run_epoch(sink: &mut DeltaLakeSink, epoch: u64, batch: &RecordBatch) {
2220        sink.begin_epoch(epoch).await.unwrap();
2221        sink.write_batch(batch).await.unwrap();
2222        sink.pre_commit(epoch).await.unwrap();
2223        sink.commit_epoch(epoch).await.unwrap();
2224    }
2225
2226    /// Read the table back and return its current `(region, total)` rows, sorted.
2227    #[cfg(feature = "delta-lake")]
2228    async fn read_regions(path: &str) -> Vec<(String, i64)> {
2229        let ctx = datafusion::prelude::SessionContext::new();
2230        crate::lakehouse::delta_table_provider::register_delta_table(
2231            &ctx,
2232            "t",
2233            path,
2234            std::collections::HashMap::new(),
2235        )
2236        .await
2237        .unwrap();
2238        let batches = ctx
2239            .sql("SELECT region, total FROM t")
2240            .await
2241            .unwrap()
2242            .collect()
2243            .await
2244            .unwrap();
2245        let mut out = Vec::new();
2246        for b in &batches {
2247            // DataFusion may return strings as Utf8View; cast to the concrete
2248            // types we downcast to.
2249            let region_arr = arrow_cast::cast(
2250                b.column(b.schema().index_of("region").unwrap()),
2251                &DataType::Utf8,
2252            )
2253            .unwrap();
2254            let total_arr = arrow_cast::cast(
2255                b.column(b.schema().index_of("total").unwrap()),
2256                &DataType::Int64,
2257            )
2258            .unwrap();
2259            let regions = region_arr.as_any().downcast_ref::<StringArray>().unwrap();
2260            let totals = total_arr.as_any().downcast_ref::<Int64Array>().unwrap();
2261            for i in 0..b.num_rows() {
2262                out.push((regions.value(i).to_string(), totals.value(i)));
2263            }
2264        }
2265        out.sort();
2266        out
2267    }
2268
2269    /// An aggregating MV emits a Z-set changelog; the upsert sink must collapse
2270    /// it into the table's current per-key state — surviving value updates,
2271    /// group disappearance, and multiple updates to one key within a single
2272    /// epoch (the case that triggers a delta-rs cardinality violation without
2273    /// collapse).
2274    #[cfg(feature = "delta-lake")]
2275    #[tokio::test]
2276    async fn upsert_collapses_aggregating_mv_to_current_state() {
2277        let dir = tempfile::tempdir().unwrap();
2278        let table_dir = dir.path().join("agg");
2279        // delta-rs' local object store requires the table directory to exist.
2280        std::fs::create_dir_all(&table_dir).unwrap();
2281        let path = table_dir.to_string_lossy().to_string();
2282
2283        let mut cfg = DeltaLakeSinkConfig::new(&path);
2284        cfg.write_mode = DeltaWriteMode::Upsert;
2285        cfg.merge_key_columns = vec!["region".to_string()];
2286        // Exactly-once buffers the whole epoch and flushes once at commit, so
2287        // each epoch's changelog is collapsed together (deterministic).
2288        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2289
2290        // No explicit schema: the schema (and table) is derived from the first
2291        // batch, exactly like the production pipeline — exercising the
2292        // `target_schema` strip of `__weight`.
2293        let mut sink = DeltaLakeSink::new(cfg, None);
2294        sink.open(&ConnectorConfig::new("delta-lake"))
2295            .await
2296            .unwrap();
2297
2298        // Epoch 1: two brand-new groups.
2299        run_epoch(
2300            &mut sink,
2301            1,
2302            &zset_changelog(&[("east", 10, 1), ("west", 5, 1)]),
2303        )
2304        .await;
2305        assert_eq!(
2306            read_regions(&path).await,
2307            vec![("east".into(), 10), ("west".into(), 5)]
2308        );
2309
2310        // Epoch 2: update east (retract 10 + insert 30), drop west, add north.
2311        run_epoch(
2312            &mut sink,
2313            2,
2314            &zset_changelog(&[
2315                ("east", 10, -1),
2316                ("east", 30, 1),
2317                ("west", 5, -1),
2318                ("north", 7, 1),
2319            ]),
2320        )
2321        .await;
2322        assert_eq!(
2323            read_regions(&path).await,
2324            vec![("east".into(), 30), ("north".into(), 7)]
2325        );
2326
2327        // Epoch 3: two consecutive updates to "east" in one epoch. Without
2328        // collapse this is multiple source rows for one merge key → delta-rs
2329        // cardinality violation. With collapse it folds to the final value.
2330        run_epoch(
2331            &mut sink,
2332            3,
2333            &zset_changelog(&[
2334                ("east", 30, -1),
2335                ("east", 40, 1),
2336                ("east", 40, -1),
2337                ("east", 55, 1),
2338            ]),
2339        )
2340        .await;
2341        assert_eq!(
2342            read_regions(&path).await,
2343            vec![("east".into(), 55), ("north".into(), 7)]
2344        );
2345
2346        // The target table never carried the Z-set weight column.
2347        assert!(sink.schema.as_ref().unwrap().index_of("__weight").is_err());
2348
2349        // Collapse observability fired across the three epochs.
2350        let m = sink.sink_metrics();
2351        assert_eq!(m.collapse_rows_in.get(), 10);
2352        assert!(m.collapse_deletes_out.get() >= 1, "west was dropped");
2353        assert!(m.collapse_upserts_out.get() >= 4);
2354
2355        sink.close().await.unwrap();
2356    }
2357
2358    /// Exactly-once replay: a writer that recovers an already-committed epoch
2359    /// (from Delta `txn` metadata) and re-applies it must leave the table
2360    /// byte-for-byte unchanged — no new snapshot, same per-key state.
2361    #[cfg(feature = "delta-lake")]
2362    #[tokio::test]
2363    async fn upsert_replay_of_committed_epoch_is_idempotent() {
2364        let dir = tempfile::tempdir().unwrap();
2365        let table_dir = dir.path().join("agg_replay");
2366        std::fs::create_dir_all(&table_dir).unwrap();
2367        let path = table_dir.to_string_lossy().to_string();
2368
2369        let mk_cfg = || {
2370            let mut cfg = DeltaLakeSinkConfig::new(&path);
2371            cfg.write_mode = DeltaWriteMode::Upsert;
2372            cfg.merge_key_columns = vec!["region".to_string()];
2373            cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2374            // Exactly-once recovery keys off a STABLE writer_id (default is a
2375            // random UUID); a restart must reuse it to find its prior epoch.
2376            cfg.writer_id = "replay-writer".to_string();
2377            cfg
2378        };
2379        let epoch2 = || {
2380            zset_changelog(&[
2381                ("east", 10, -1),
2382                ("east", 30, 1),
2383                ("west", 5, -1),
2384                ("north", 7, 1),
2385            ])
2386        };
2387
2388        // Writer A commits epochs 1 and 2, then "crashes" (close).
2389        let mut sink_a = DeltaLakeSink::new(mk_cfg(), None);
2390        sink_a
2391            .open(&ConnectorConfig::new("delta-lake"))
2392            .await
2393            .unwrap();
2394        run_epoch(
2395            &mut sink_a,
2396            1,
2397            &zset_changelog(&[("east", 10, 1), ("west", 5, 1)]),
2398        )
2399        .await;
2400        run_epoch(&mut sink_a, 2, &epoch2()).await;
2401        let expected = vec![("east".to_string(), 30), ("north".to_string(), 7)];
2402        assert_eq!(read_regions(&path).await, expected);
2403        let version_after_2 = sink_a.delta_version();
2404        sink_a.close().await.unwrap();
2405
2406        // Writer B recovers against the same table + writer_id.
2407        let mut sink_b = DeltaLakeSink::new(mk_cfg(), None);
2408        sink_b
2409            .open(&ConnectorConfig::new("delta-lake"))
2410            .await
2411            .unwrap();
2412        assert_eq!(
2413            sink_b.last_committed_epoch(),
2414            2,
2415            "epoch must be recovered from Delta txn metadata"
2416        );
2417
2418        // Replay the already-committed epoch 2: it must be skipped end-to-end.
2419        sink_b.begin_epoch(2).await.unwrap();
2420        sink_b.write_batch(&epoch2()).await.unwrap();
2421        sink_b.pre_commit(2).await.unwrap();
2422        sink_b.commit_epoch(2).await.unwrap();
2423
2424        assert_eq!(
2425            read_regions(&path).await,
2426            expected,
2427            "replay must not change state"
2428        );
2429        assert_eq!(
2430            sink_b.delta_version(),
2431            version_after_2,
2432            "replayed epoch must not create a new Delta version"
2433        );
2434
2435        // The recovered writer continues normally with a fresh epoch.
2436        run_epoch(
2437            &mut sink_b,
2438            3,
2439            &zset_changelog(&[("east", 30, -1), ("east", 55, 1)]),
2440        )
2441        .await;
2442        assert_eq!(
2443            read_regions(&path).await,
2444            vec![("east".to_string(), 55), ("north".to_string(), 7)]
2445        );
2446
2447        sink_b.close().await.unwrap();
2448    }
2449}