Skip to main content

laminar_connectors/lakehouse/
delta.rs

1//! Delta Lake sink connector implementation.
2//!
3//! [`DeltaLakeSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to Delta Lake tables with ACID transactions and at-least-once delivery
5//! (exactly-once opt-in via `delivery.guarantee = 'exactly-once'`).
6//!
7//! # Write Strategies
8//!
9//! - **Append mode**: Arrow-to-Parquet zero-copy writes for immutable streams
10//! - **Overwrite mode**: Replace partition contents for recomputation
11//! - **Upsert mode**: CDC MERGE via Z-set changelog integration
12//!
13//! Exactly-once semantics use epoch-to-Delta-version mapping: each `LaminarDB`
14//! epoch maps to exactly one Delta Lake transaction via `txn` application
15//! transaction metadata in the Delta log.
16//!
17//! # Ring Architecture
18//!
19//! - **Ring 0**: No sink code. Data arrives via SPSC channel (~5ns push).
20//! - **Ring 1**: Batch buffering, Parquet writes, Delta log commits.
21//! - **Ring 2**: Schema management, configuration, health checks.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::{Array, RecordBatch};
27use arrow_schema::SchemaRef;
28use async_trait::async_trait;
29use tracing::{debug, info, warn};
30
31#[cfg(feature = "delta-lake")]
32use deltalake::DeltaTable;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::protocol::SaveMode;
36
37use crate::config::{ConnectorConfig, ConnectorState};
38use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
39use crate::error::ConnectorError;
40use crate::health::HealthStatus;
41use crate::metrics::ConnectorMetrics;
42
43use super::delta_config::{DeltaLakeSinkConfig, DeltaWriteMode};
44use super::delta_metrics::DeltaLakeSinkMetrics;
45use crate::connector::DeliveryGuarantee;
46
47/// Delta Lake sink connector.
48///
49/// Writes Arrow `RecordBatch` to Delta Lake tables with ACID transactions,
50/// at-least-once delivery (exactly-once opt-in), partitioning, and
51/// background compaction.
52///
53/// # Lifecycle
54///
55/// ```text
56/// new() -> open() -> [begin_epoch() -> write_batch()* -> commit_epoch()] -> close()
57///                          |                                    |
58///                          +--- rollback_epoch() (on failure) --+
59/// ```
60///
61/// # Exactly-Once Semantics
62///
63/// Each `LaminarDB` epoch maps to exactly one Delta Lake transaction (version).
64/// On recovery, the sink checks `_delta_log/` for the last committed epoch
65/// via `txn` (application transaction) metadata. If an epoch was already
66/// committed, it is skipped (idempotent commit).
67///
68/// # 2PC Protocol
69///
70/// `pre_commit()` coalesces the buffer but does NOT write to Delta.
71/// `commit_epoch()` performs the actual Delta write+commit atomically.
72/// `rollback_epoch()` discards the staged buffer without side effects.
73/// This ensures that a rolled-back epoch never leaves data in Delta.
74pub struct DeltaLakeSink {
75    /// Sink configuration.
76    config: DeltaLakeSinkConfig,
77    /// Arrow schema for input batches (set on first write or from existing table).
78    schema: Option<SchemaRef>,
79    /// Connector lifecycle state.
80    state: ConnectorState,
81    /// Current epoch being written.
82    current_epoch: u64,
83    /// Last successfully committed epoch.
84    last_committed_epoch: u64,
85    /// `RecordBatch` buffer for the current epoch.
86    buffer: Vec<RecordBatch>,
87    /// Total rows buffered in current epoch.
88    buffered_rows: usize,
89    /// Total bytes buffered (estimated) in current epoch.
90    buffered_bytes: u64,
91    /// Current Delta Lake table version.
92    delta_version: u64,
93    /// Time when the current buffer started accumulating.
94    buffer_start_time: Option<Instant>,
95    /// Sink metrics.
96    metrics: DeltaLakeSinkMetrics,
97    /// Delta Lake table handle (present when `delta-lake` feature is enabled).
98    #[cfg(feature = "delta-lake")]
99    table: Option<DeltaTable>,
100    /// Whether the current epoch was skipped (already committed).
101    epoch_skipped: bool,
102    /// Staged batches ready for commit (populated by `pre_commit()`, consumed
103    /// by `commit_epoch()`). This separation ensures `rollback_epoch()` can
104    /// discard prepared data without leaving orphan files in Delta.
105    staged_batches: Vec<RecordBatch>,
106    /// Rows staged for commit (mirrors `staged_batches`).
107    staged_rows: usize,
108    /// Estimated bytes staged for commit.
109    staged_bytes: u64,
110    /// Resolved table path after catalog lookup (may differ from `config.table_path`
111    /// when using Unity/Glue catalogs). Used by `reopen_table()` so retries
112    /// target the same resolved path that `open()` connected to.
113    #[cfg(feature = "delta-lake")]
114    resolved_table_path: String,
115    /// Resolved storage options after catalog lookup.
116    #[cfg(feature = "delta-lake")]
117    resolved_storage_options: std::collections::HashMap<String, String>,
118    /// Cancellation token for the background compaction task.
119    #[cfg(feature = "delta-lake")]
120    compaction_cancel: Option<tokio_util::sync::CancellationToken>,
121    /// Handle for the background compaction task.
122    #[cfg(feature = "delta-lake")]
123    compaction_handle: Option<tokio::task::JoinHandle<()>>,
124    /// When true, Delta table init is deferred until the first `write_batch()`
125    /// provides a schema. This happens when Unity Catalog auto-create is
126    /// configured but the pipeline schema is not yet available at `open()` time.
127    #[cfg(feature = "delta-lake")]
128    needs_deferred_delta_init: bool,
129    /// Background reopen kicked off after a checkpoint-boundary drop, so
130    /// the next flush doesn't pay the table-load cost on the commit path.
131    #[cfg(feature = "delta-lake")]
132    pending_reopen: Option<tokio::task::JoinHandle<Result<DeltaTable, ConnectorError>>>,
133    /// Pre-built Parquet writer properties for hot-path writes. Built once
134    /// in `init_delta_table()` from `config.parquet`; cloning this is far
135    /// cheaper than rebuilding (string parsing, bloom-filter column setup)
136    /// from scratch on every commit.
137    #[cfg(feature = "delta-lake")]
138    cached_writer_properties: Option<deltalake::parquet::file::properties::WriterProperties>,
139    /// Shared `DataFusion` session for upsert/merge operations. Creating a
140    /// fresh `SessionContext` per merge allocated a runtime env, memory
141    /// pool, and object-store registry each commit; reusing one flattens
142    /// allocator churn under steady-state upsert load.
143    #[cfg(feature = "delta-lake")]
144    merge_session: Option<datafusion::prelude::SessionContext>,
145}
146
147impl DeltaLakeSink {
148    /// Creates a new Delta Lake sink with the given configuration.
149    #[must_use]
150    pub fn new(config: DeltaLakeSinkConfig, registry: Option<&prometheus::Registry>) -> Self {
151        Self {
152            config,
153            schema: None,
154            state: ConnectorState::Created,
155            current_epoch: 0,
156            last_committed_epoch: 0,
157            buffer: Vec::with_capacity(16),
158            buffered_rows: 0,
159            buffered_bytes: 0,
160            delta_version: 0,
161            buffer_start_time: None,
162            metrics: DeltaLakeSinkMetrics::new(registry),
163            epoch_skipped: false,
164            staged_batches: Vec::new(),
165            staged_rows: 0,
166            staged_bytes: 0,
167            #[cfg(feature = "delta-lake")]
168            table: None,
169            #[cfg(feature = "delta-lake")]
170            resolved_table_path: String::new(),
171            #[cfg(feature = "delta-lake")]
172            resolved_storage_options: std::collections::HashMap::new(),
173            #[cfg(feature = "delta-lake")]
174            compaction_cancel: None,
175            #[cfg(feature = "delta-lake")]
176            compaction_handle: None,
177            #[cfg(feature = "delta-lake")]
178            needs_deferred_delta_init: false,
179            #[cfg(feature = "delta-lake")]
180            pending_reopen: None,
181            #[cfg(feature = "delta-lake")]
182            cached_writer_properties: None,
183            #[cfg(feature = "delta-lake")]
184            merge_session: None,
185        }
186    }
187
188    /// Creates a new Delta Lake sink with an explicit schema.
189    #[must_use]
190    pub fn with_schema(config: DeltaLakeSinkConfig, schema: SchemaRef) -> Self {
191        let mut sink = Self::new(config, None);
192        sink.schema = Some(schema);
193        sink
194    }
195
196    /// Initializes the Delta table: auto-creates in Unity Catalog if needed,
197    /// resolves the catalog path, opens or creates the Delta table, and spawns
198    /// the compaction loop. Called from `open()` or deferred to the first
199    /// `write_batch()` when the schema is not yet available at open time.
200    #[cfg(feature = "delta-lake")]
201    async fn init_delta_table(&mut self) -> Result<(), ConnectorError> {
202        use super::delta_io;
203
204        // For uc:// tables, pre-create in Unity Catalog if needed.
205        // Must run before resolve_catalog_options which calls GET on the table.
206        #[cfg(feature = "delta-lake-unity")]
207        ensure_uc_table_exists(&self.config, self.schema.as_ref()).await?;
208
209        // Resolve catalog path: for Unity this calls GET to get the
210        // storage_location, bypassing delta-rs credential vending.
211        let (resolved_path, mut merged_options) = delta_io::resolve_catalog_options(
212            &self.config.catalog_type,
213            self.config.catalog_database.as_deref(),
214            self.config.catalog_name.as_deref(),
215            self.config.catalog_schema.as_deref(),
216            &self.config.table_path,
217            &self.config.storage_options,
218        )
219        .await?;
220
221        // Inject default connection timeouts if not explicitly set.
222        // Azure load balancers close idle connections after ~4 minutes.
223        // Without these, a stale connection causes writes to hang forever.
224        merged_options
225            .entry("timeout".to_string())
226            .or_insert_with(|| "120s".to_string());
227        merged_options
228            .entry("connect_timeout".to_string())
229            .or_insert_with(|| "30s".to_string());
230        merged_options
231            .entry("pool_idle_timeout".to_string())
232            .or_insert_with(|| "60s".to_string());
233
234        // Persist resolved values for reopen_table() on conflict retry.
235        self.resolved_table_path.clone_from(&resolved_path);
236        self.resolved_storage_options.clone_from(&merged_options);
237
238        let init_timeout = self
239            .config
240            .write_timeout
241            .max(std::time::Duration::from_secs(120));
242        let table = tokio::time::timeout(
243            init_timeout,
244            delta_io::open_or_create_table(
245                &resolved_path,
246                merged_options.clone(),
247                self.schema.as_ref(),
248            ),
249        )
250        .await
251        .map_err(|_| {
252            ConnectorError::ConnectionFailed(format!(
253                "Delta table init timed out after {}s",
254                init_timeout.as_secs()
255            ))
256        })??;
257
258        // Read schema from existing table if we don't have one.
259        if self.schema.is_none() {
260            if let Ok(schema) = delta_io::get_table_schema(&table) {
261                self.schema = Some(schema);
262            }
263        }
264
265        // Resolve last committed epoch for exactly-once recovery.
266        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
267            self.last_committed_epoch =
268                delta_io::get_last_committed_epoch(&table, &self.config.writer_id).await;
269            if self.last_committed_epoch > 0 {
270                info!(
271                    writer_id = %self.config.writer_id,
272                    last_committed_epoch = self.last_committed_epoch,
273                    "recovered last committed epoch from Delta Lake txn metadata"
274                );
275            }
276        }
277
278        // Store the Delta version.
279        #[allow(clippy::cast_sign_loss)]
280        {
281            self.delta_version = table.version().unwrap_or(0) as u64;
282        }
283        self.table = Some(table);
284
285        // Pre-build caches used on every commit. Rebuilding WriterProperties
286        // from config strings per commit was pure churn (HashMap<ColumnPath>
287        // cloned for bloom filters, string lowercase allocs); a cached value
288        // clones cheaply. Similarly, a shared SessionContext avoids
289        // allocating a new RuntimeEnv + MemoryPool + ObjectStoreRegistry
290        // per merge — a significant source of allocator fragmentation on
291        // long-running upsert streams.
292        self.cached_writer_properties = self.config.parquet.to_writer_properties().ok();
293        if self.config.write_mode == DeltaWriteMode::Upsert {
294            self.merge_session = Some(datafusion::prelude::SessionContext::new());
295        }
296
297        // Spawn background compaction task if enabled. Pre-build the
298        // compaction writer properties once; the loop clones per tick
299        // instead of re-parsing config strings.
300        if self.config.compaction.enabled {
301            let cancel = tokio_util::sync::CancellationToken::new();
302            let compaction_props = self.config.parquet.compaction_writer_properties().ok();
303            let handle = tokio::spawn(compaction_loop(
304                resolved_path.clone(),
305                Arc::new(merged_options),
306                self.config.compaction.clone(),
307                self.config.vacuum_retention,
308                compaction_props,
309                cancel.clone(),
310            ));
311            self.compaction_cancel = Some(cancel);
312            self.compaction_handle = Some(handle);
313        }
314
315        Ok(())
316    }
317
318    /// Returns the current connector state.
319    #[must_use]
320    pub fn state(&self) -> ConnectorState {
321        self.state
322    }
323
324    /// Returns the current epoch.
325    #[must_use]
326    pub fn current_epoch(&self) -> u64 {
327        self.current_epoch
328    }
329
330    /// Returns the last committed epoch.
331    #[must_use]
332    pub fn last_committed_epoch(&self) -> u64 {
333        self.last_committed_epoch
334    }
335
336    /// Returns the number of buffered rows pending flush.
337    #[must_use]
338    pub fn buffered_rows(&self) -> usize {
339        self.buffered_rows
340    }
341
342    /// Returns the estimated buffered bytes.
343    #[must_use]
344    pub fn buffered_bytes(&self) -> u64 {
345        self.buffered_bytes
346    }
347
348    /// Returns the current Delta Lake table version.
349    #[must_use]
350    pub fn delta_version(&self) -> u64 {
351        self.delta_version
352    }
353
354    /// Returns a reference to the sink metrics.
355    #[must_use]
356    pub fn sink_metrics(&self) -> &DeltaLakeSinkMetrics {
357        &self.metrics
358    }
359
360    /// Returns the sink configuration.
361    #[must_use]
362    pub fn config(&self) -> &DeltaLakeSinkConfig {
363        &self.config
364    }
365
366    /// Checks if a buffer flush is needed based on size or time thresholds.
367    #[must_use]
368    pub fn should_flush(&self) -> bool {
369        if self.buffered_rows >= self.config.max_buffer_records {
370            return true;
371        }
372        if self.buffered_bytes >= self.config.target_file_size as u64 {
373            return true;
374        }
375        if let Some(start) = self.buffer_start_time {
376            if start.elapsed() >= self.config.max_buffer_duration {
377                return true;
378            }
379        }
380        false
381    }
382
383    /// Returns the target table schema, stripping metadata columns (`_op`, etc.)
384    /// in upsert mode so the Delta table isn't created with changelog columns.
385    /// CDC metadata columns stripped from the target Delta table schema.
386    const CDC_METADATA_COLUMNS: &'static [&'static str] = &["_op", "_ts_ms"];
387
388    fn target_schema(batch_schema: &SchemaRef, write_mode: DeltaWriteMode) -> SchemaRef {
389        if write_mode == DeltaWriteMode::Upsert {
390            let fields: Vec<_> = batch_schema
391                .fields()
392                .iter()
393                .filter(|f| !Self::CDC_METADATA_COLUMNS.contains(&f.name().as_str()))
394                .cloned()
395                .collect();
396            Arc::new(arrow_schema::Schema::new(fields))
397        } else {
398            batch_schema.clone()
399        }
400    }
401
402    /// Estimates the byte size of a `RecordBatch`.
403    #[must_use]
404    pub fn estimate_batch_size(batch: &RecordBatch) -> u64 {
405        batch
406            .columns()
407            .iter()
408            .map(|col| col.get_array_memory_size() as u64)
409            .sum()
410    }
411
412    /// Returns `true` if the error is a Delta Lake optimistic concurrency
413    /// conflict (retryable). Matches specific delta-rs conflict indicators
414    /// only — not generic "transaction" mentions.
415    #[cfg(feature = "delta-lake")]
416    fn is_conflict_error(err: &ConnectorError) -> bool {
417        let msg = err.to_string().to_lowercase();
418        msg.contains("conflicting commit")
419            || msg.contains("version already exists")
420            || msg.contains("concurrent")
421            || (msg.contains("conflict") && !msg.contains("log") && !msg.contains("corrupt"))
422    }
423
424    /// Re-opens the Delta Lake table after a conflict error destroys the handle.
425    /// Uses the resolved path/options from `open()`, not `config.*`, so that
426    /// catalog-resolved paths (Unity/Glue) are used correctly.
427    #[cfg(feature = "delta-lake")]
428    async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
429        use super::delta_io;
430
431        let table = delta_io::open_or_create_table(
432            &self.resolved_table_path,
433            self.resolved_storage_options.clone(),
434            self.schema.as_ref(),
435        )
436        .await?;
437
438        #[allow(clippy::cast_sign_loss)]
439        {
440            self.delta_version = table.version().unwrap_or(0) as u64;
441        }
442        self.table = Some(table);
443        Ok(())
444    }
445
446    /// Spawn a background reopen of the Delta table so the next flush
447    /// doesn't pay the load cost on its hot path. An already-in-flight
448    /// reopen is aborted — the fresher one supersedes it.
449    #[cfg(feature = "delta-lake")]
450    fn schedule_background_reopen(&mut self) {
451        if let Some(prev) = self.pending_reopen.take() {
452            prev.abort();
453        }
454        let path = self.resolved_table_path.clone();
455        let opts = self.resolved_storage_options.clone();
456        let schema = self.schema.clone();
457        self.pending_reopen = Some(tokio::spawn(async move {
458            super::delta_io::open_or_create_table(&path, opts, schema.as_ref()).await
459        }));
460    }
461
462    /// Install a previously-scheduled background reopen. Returns `false`
463    /// on miss — no pending reopen, task failure, or timeout — so callers
464    /// fall through to the synchronous `reopen_table()` path.
465    #[cfg(feature = "delta-lake")]
466    async fn try_install_pending_reopen(&mut self, timeout: std::time::Duration) -> bool {
467        let Some(mut pending) = self.pending_reopen.take() else {
468            return false;
469        };
470        let table = match tokio::time::timeout(timeout, &mut pending).await {
471            Ok(Ok(Ok(t))) => t,
472            Ok(Ok(Err(e))) => {
473                warn!(error = %e, "Delta background reopen failed");
474                return false;
475            }
476            Ok(Err(e)) => {
477                warn!(error = %e, "Delta background reopen task ended unexpectedly");
478                return false;
479            }
480            Err(_) => {
481                warn!(
482                    timeout_secs = timeout.as_secs(),
483                    "Delta background reopen timed out"
484                );
485                pending.abort();
486                return false;
487            }
488        };
489        #[allow(clippy::cast_sign_loss)]
490        {
491            self.delta_version = table.version().unwrap_or(0) as u64;
492        }
493        self.table = Some(table);
494        true
495    }
496
497    /// Attempts a single Delta write/merge and returns the updated table on success.
498    #[cfg(feature = "delta-lake")]
499    async fn attempt_delta_write(
500        &mut self,
501        table: DeltaTable,
502    ) -> Result<DeltaTable, ConnectorError> {
503        // Clone batches for the write — staged_batches is only cleared on
504        // success. RecordBatch::clone is Arc-bump only (~16-48ns per batch).
505        let batches: Vec<RecordBatch> = self.staged_batches.clone();
506
507        if self.config.write_mode == DeltaWriteMode::Upsert {
508            // ── Upsert/Merge path ──
509            // flush_staged_to_delta pre-concats for upsert so retries don't
510            // pay a full O(rows × cols) copy each attempt. Handle len > 1
511            // defensively in case a future caller skips the pre-concat.
512            let combined = if batches.len() == 1 {
513                batches.into_iter().next().expect("len == 1 checked")
514            } else {
515                match arrow_select::concat::concat_batches(&batches[0].schema(), &batches) {
516                    Ok(c) => c,
517                    Err(e) => {
518                        // Concat is a local op — restore table and propagate.
519                        self.table = Some(table);
520                        return Err(ConnectorError::Internal(format!(
521                            "failed to concat batches: {e}"
522                        )));
523                    }
524                }
525            };
526
527            // Cached in init_delta_table — cloning is a small HashMap copy
528            // plus a handful of Arc bumps, far cheaper than rebuilding from
529            // config strings each attempt.
530            let writer_props = self.cached_writer_properties.clone();
531            let merge_session = self
532                .merge_session
533                .as_ref()
534                .expect("merge_session built in init_delta_table for Upsert mode");
535
536            super::delta_io::merge_changelog(
537                table,
538                combined,
539                &self.config.merge_key_columns,
540                &self.config.writer_id,
541                self.current_epoch,
542                self.config.schema_evolution,
543                writer_props,
544                merge_session,
545            )
546            .await
547            .map(|(t, result)| {
548                self.metrics.record_merge();
549                if result.rows_deleted > 0 {
550                    self.metrics.record_deletes(result.rows_deleted as u64);
551                }
552                t
553            })
554        } else {
555            // ── Append/Overwrite path ──
556            let save_mode = match self.config.write_mode {
557                DeltaWriteMode::Append => SaveMode::Append,
558                DeltaWriteMode::Overwrite => SaveMode::Overwrite,
559                DeltaWriteMode::Upsert => unreachable!("handled by the upsert branch above"),
560            };
561
562            let partition_cols = if self.config.partition_columns.is_empty() {
563                None
564            } else {
565                Some(self.config.partition_columns.as_slice())
566            };
567
568            // Create a Delta Lake checkpoint every N commits for read performance.
569            let should_checkpoint = self.config.checkpoint_interval > 0
570                && self.delta_version > 0
571                && (self.delta_version + 1).is_multiple_of(self.config.checkpoint_interval);
572
573            super::delta_io::write_batches(
574                table,
575                batches,
576                &self.config.writer_id,
577                self.current_epoch,
578                save_mode,
579                partition_cols,
580                self.config.schema_evolution,
581                Some(self.config.target_file_size),
582                should_checkpoint,
583                self.cached_writer_properties.clone(),
584            )
585            .await
586            .map(|(t, _version)| t)
587        }
588    }
589
590    /// Writes all staged data to Delta Lake as a single atomic transaction.
591    ///
592    /// Retries on optimistic concurrency conflicts with exponential backoff.
593    /// On non-conflict errors or exhausted retries, propagates the error.
594    #[cfg(feature = "delta-lake")]
595    #[allow(clippy::too_many_lines)]
596    async fn flush_staged_to_delta(&mut self) -> Result<WriteResult, ConnectorError> {
597        if self.staged_batches.is_empty() {
598            return Ok(WriteResult::new(0, 0));
599        }
600
601        // Pre-concat upsert batches so conflict retries don't redo the
602        // O(rows × cols) copy each attempt. Append/overwrite passes the Vec
603        // straight to delta-rs, which handles multi-batch internally.
604        if self.config.write_mode == DeltaWriteMode::Upsert && self.staged_batches.len() > 1 {
605            let schema = self.staged_batches[0].schema();
606            let combined = arrow_select::concat::concat_batches(&schema, &self.staged_batches)
607                .map_err(|e| {
608                    ConnectorError::Internal(format!("failed to concat staged batches: {e}"))
609                })?;
610            self.staged_batches.clear();
611            self.staged_batches.push(combined);
612        }
613
614        let total_rows = self.staged_rows;
615        let estimated_bytes = self.staged_bytes;
616        let flush_start = Instant::now();
617
618        // Retry loop with exponential backoff for optimistic concurrency conflicts.
619        let backoff_ms = [100u64, 500, 2000];
620        let max_attempts = (self.config.max_commit_retries as usize).saturating_add(1);
621        let mut last_error: Option<ConnectorError> = None;
622
623        for attempt in 0..max_attempts {
624            // delta-rs consumes DeltaTable by value. If self.table is None
625            // (prior failure or checkpoint-boundary drop) reinstall it —
626            // prefer a scheduled background reopen, fall back to sync.
627            if self.table.is_none() {
628                let reopen_timeout = self.config.write_timeout;
629                if !self.try_install_pending_reopen(reopen_timeout).await {
630                    tokio::time::timeout(reopen_timeout, self.reopen_table())
631                        .await
632                        .map_err(|_| {
633                            ConnectorError::ConnectionFailed(format!(
634                                "Delta table reopen timed out after {}s",
635                                reopen_timeout.as_secs()
636                            ))
637                        })??;
638                }
639            }
640
641            let table = self
642                .table
643                .take()
644                .ok_or_else(|| ConnectorError::InvalidState {
645                    expected: "table initialized".into(),
646                    actual: "table not initialized".into(),
647                })?;
648
649            // Timeout the write to prevent hanging on stale connections.
650            // Azure LB drops idle connections after ~4 min; without this,
651            // a dead connection blocks the sink task forever.
652            let write_timeout = self.config.write_timeout;
653            let write_result =
654                tokio::time::timeout(write_timeout, self.attempt_delta_write(table)).await;
655
656            // Convert timeout to a write error. The table handle was consumed
657            // by attempt_delta_write; the retry loop will reopen via reopen_table().
658            let write_result = match write_result {
659                Ok(inner) => inner,
660                Err(_elapsed) => Err(ConnectorError::WriteError(format!(
661                    "Delta write timed out after {}s",
662                    write_timeout.as_secs()
663                ))),
664            };
665
666            match write_result {
667                Ok(table) => {
668                    #[allow(clippy::cast_sign_loss)]
669                    {
670                        self.delta_version = table.version().unwrap_or(0) as u64;
671                    }
672
673                    // delta-rs' in-memory Snapshot grows per commit and is not
674                    // compacted in place; drop on checkpoint boundaries so the
675                    // next flush re-opens from the checkpoint file. Pre-open
676                    // in the background so the next commit doesn't block.
677                    let crossed_checkpoint = self.config.checkpoint_interval > 0
678                        && self.delta_version > 0
679                        && self
680                            .delta_version
681                            .is_multiple_of(self.config.checkpoint_interval);
682                    if crossed_checkpoint {
683                        self.table = None;
684                        self.schedule_background_reopen();
685                    } else {
686                        self.table = Some(table);
687                    }
688
689                    self.staged_batches.clear();
690                    self.staged_rows = 0;
691                    self.staged_bytes = 0;
692
693                    self.metrics
694                        .record_flush(total_rows as u64, estimated_bytes);
695                    self.metrics.record_commit(self.delta_version);
696                    self.metrics
697                        .observe_flush_duration(flush_start.elapsed().as_secs_f64());
698
699                    debug!(
700                        rows = total_rows,
701                        bytes = estimated_bytes,
702                        delta_version = self.delta_version,
703                        attempt = attempt + 1,
704                        reopened = crossed_checkpoint,
705                        "Delta Lake: committed staged data to Delta"
706                    );
707
708                    return Ok(WriteResult::new(total_rows, estimated_bytes));
709                }
710                Err(e) => {
711                    if Self::is_conflict_error(&e) && attempt + 1 < max_attempts {
712                        self.metrics.record_conflict();
713                        self.metrics.record_retry();
714                        // ±25% jitter breaks up lockstep retries from
715                        // concurrent writers colliding on the same version.
716                        let base = backoff_ms.get(attempt).copied().unwrap_or(2000);
717                        let delay_ms = jittered_backoff_ms(base);
718                        warn!(
719                            attempt = attempt + 1,
720                            max_attempts,
721                            delay_ms,
722                            error = %e,
723                            "Delta Lake: conflict error, retrying after backoff"
724                        );
725                        tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
726                        last_error = Some(e);
727                        // self.table is already None; loop will re-open.
728                        continue;
729                    }
730                    // Non-conflict error or exhausted retries — propagate.
731                    self.metrics
732                        .observe_flush_duration(flush_start.elapsed().as_secs_f64());
733                    return Err(e);
734                }
735            }
736        }
737
738        // Should not reach here, but if it does, return the last error.
739        Err(last_error.unwrap_or_else(|| {
740            ConnectorError::Internal("flush_staged_to_delta: no attempts made".into())
741        }))
742    }
743
744    /// Splits a changelog `RecordBatch` into insert and delete batches.
745    ///
746    /// Uses the `_op` metadata column:
747    /// - `"I"` (insert), `"U"` (update-after), `"r"` (snapshot read) -> insert
748    /// - `"D"` (delete) -> delete
749    ///
750    /// The returned batches exclude metadata columns (those starting with `_`).
751    ///
752    /// # Errors
753    ///
754    /// Returns `ConnectorError::ConfigurationError` if the `_op` column is
755    /// missing or not a string type.
756    pub fn split_changelog_batch(
757        batch: &RecordBatch,
758    ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
759        let op_idx = batch.schema().index_of("_op").map_err(|_| {
760            ConnectorError::ConfigurationError(
761                "upsert mode requires '_op' column in input schema".into(),
762            )
763        })?;
764
765        let op_array = batch
766            .column(op_idx)
767            .as_any()
768            .downcast_ref::<arrow_array::StringArray>()
769            .ok_or_else(|| {
770                ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
771            })?;
772
773        // Build boolean masks for insert vs delete rows.
774        let len = op_array.len();
775        let mut insert_mask = Vec::with_capacity(len);
776        let mut delete_mask = Vec::with_capacity(len);
777
778        for i in 0..len {
779            if op_array.is_null(i) {
780                insert_mask.push(false);
781                delete_mask.push(false);
782                continue;
783            }
784            match op_array.value(i) {
785                "I" | "U" | "r" => {
786                    insert_mask.push(true);
787                    delete_mask.push(false);
788                }
789                "D" => {
790                    insert_mask.push(false);
791                    delete_mask.push(true);
792                }
793                _ => {
794                    insert_mask.push(false);
795                    delete_mask.push(false);
796                }
797            }
798        }
799
800        // Compute user-column projection indices once (strip metadata columns).
801        let user_col_indices: Vec<usize> = batch
802            .schema()
803            .fields()
804            .iter()
805            .enumerate()
806            .filter(|(_, f)| !f.name().starts_with('_'))
807            .map(|(i, _)| i)
808            .collect();
809
810        let insert_batch = filter_and_project(batch, insert_mask, &user_col_indices)?;
811        let delete_batch = filter_and_project(batch, delete_mask, &user_col_indices)?;
812
813        Ok((insert_batch, delete_batch))
814    }
815}
816
817/// Background compaction loop: OPTIMIZE + VACUUM with adaptive intervals.
818///
819/// Opens its own `DeltaTable` handle (no shared state with the sink).
820#[cfg(feature = "delta-lake")]
821#[allow(clippy::too_many_lines)]
822async fn compaction_loop(
823    table_path: String,
824    storage_options: Arc<std::collections::HashMap<String, String>>,
825    config: super::delta_config::CompactionConfig,
826    vacuum_retention: std::time::Duration,
827    compaction_props: Option<deltalake::parquet::file::properties::WriterProperties>,
828    cancel: tokio_util::sync::CancellationToken,
829) {
830    use super::delta_io;
831
832    /// Minimum adaptive compaction interval (floor).
833    const MIN_COMPACTION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
834
835    let base_interval = config.check_interval;
836    let mut current_interval = base_interval;
837    let mut consecutive_skips: u32 = 0;
838
839    info!(
840        table_path = %table_path,
841        check_interval_secs = base_interval.as_secs(),
842        "compaction background task started (adaptive interval)"
843    );
844
845    // Initial delay before the first check.
846    tokio::select! {
847        () = cancel.cancelled() => {
848            info!("compaction background task cancelled");
849            return;
850        }
851        () = tokio::time::sleep(current_interval) => {}
852    }
853
854    loop {
855        // Open a fresh table handle for compaction (no shared state).
856        let table =
857            match delta_io::open_or_create_table(&table_path, (*storage_options).clone(), None)
858                .await
859            {
860                Ok(t) => t,
861                Err(e) => {
862                    warn!(error = %e, "compaction: failed to open table, will retry");
863                    tokio::select! {
864                        () = cancel.cancelled() => {
865                            info!("compaction background task cancelled");
866                            return;
867                        }
868                        () = tokio::time::sleep(current_interval) => {}
869                    }
870                    continue;
871                }
872            };
873
874        // Check if compaction is needed.
875        let should_compact = match table.snapshot() {
876            Ok(snapshot) => {
877                let file_count = snapshot.log_data().num_files();
878                if file_count < config.min_files_for_compaction {
879                    debug!(
880                        file_count,
881                        min = config.min_files_for_compaction,
882                        "compaction: skipping, not enough files"
883                    );
884                    false
885                } else {
886                    true
887                }
888            }
889            Err(e) => {
890                warn!(error = %e, "compaction: snapshot failed, skipping tick");
891                false
892            }
893        };
894
895        if should_compact {
896            consecutive_skips = 0;
897            // Speed up: halve the interval (floor at MIN_COMPACTION_INTERVAL).
898            current_interval = (current_interval / 2).max(MIN_COMPACTION_INTERVAL);
899
900            let target_size = config.target_file_size as u64;
901            // Clone the pre-built properties (cheap) instead of re-parsing
902            // from config strings each tick.
903            match delta_io::run_compaction(
904                table,
905                target_size,
906                &config.z_order_columns,
907                compaction_props.clone(),
908            )
909            .await
910            {
911                Ok((table, result)) => {
912                    debug!(
913                        files_added = result.files_added,
914                        files_removed = result.files_removed,
915                        interval_secs = current_interval.as_secs(),
916                        "compaction: OPTIMIZE complete"
917                    );
918
919                    // Run VACUUM after compaction.
920                    match delta_io::run_vacuum(table, vacuum_retention).await {
921                        Ok((_table, files_deleted)) => {
922                            debug!(files_deleted, "compaction: VACUUM complete");
923                        }
924                        Err(e) => {
925                            warn!(error = %e, "compaction: VACUUM failed");
926                        }
927                    }
928                }
929                Err(e) => {
930                    warn!(error = %e, "compaction: OPTIMIZE failed");
931                }
932            }
933        } else {
934            consecutive_skips = consecutive_skips.saturating_add(1);
935            // Slow down: double the interval after 2 consecutive idle ticks.
936            if consecutive_skips >= 2 {
937                current_interval = (current_interval * 2).min(base_interval);
938            }
939        }
940
941        // Adaptive sleep: wait current_interval or cancel.
942        tokio::select! {
943            () = cancel.cancelled() => {
944                info!("compaction background task cancelled");
945                return;
946            }
947            () = tokio::time::sleep(current_interval) => {}
948        }
949    }
950}
951
952/// Pre-creates a Unity Catalog external Delta table via the REST API if
953/// `catalog.storage.location` is configured and a schema is available.
954/// Idempotent: treats "already exists" (HTTP 409 / `ALREADY_EXISTS`) as success.
955#[cfg(all(feature = "delta-lake", feature = "delta-lake-unity"))]
956async fn ensure_uc_table_exists(
957    config: &DeltaLakeSinkConfig,
958    schema: Option<&SchemaRef>,
959) -> Result<(), ConnectorError> {
960    let super::delta_config::DeltaCatalogType::Unity {
961        ref workspace_url,
962        ref access_token,
963    } = config.catalog_type
964    else {
965        return Ok(());
966    };
967
968    let Some(ref storage_location) = config.catalog_storage_location else {
969        return Ok(());
970    };
971
972    let Some(arrow_schema) = schema else {
973        warn!(
974            "catalog.storage.location is set but no schema available — \
975             skipping Unity Catalog auto-create"
976        );
977        return Ok(());
978    };
979
980    let catalog = config.catalog_name.as_deref().unwrap_or_default();
981    let schema_name = config.catalog_schema.as_deref().unwrap_or_default();
982    let table_name = config
983        .table_path
984        .strip_prefix("uc://")
985        .and_then(|s| s.rsplit('.').next())
986        .unwrap_or(&config.table_path);
987
988    let columns = super::unity_catalog::arrow_to_uc_columns(arrow_schema);
989    super::unity_catalog::create_uc_table(
990        workspace_url,
991        access_token,
992        catalog,
993        schema_name,
994        table_name,
995        storage_location,
996        &columns,
997    )
998    .await
999}
1000
1001#[async_trait]
1002impl SinkConnector for DeltaLakeSink {
1003    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
1004        self.state = ConnectorState::Initializing;
1005
1006        // Re-parse config if properties provided.
1007        if !config.properties().is_empty() {
1008            self.config = DeltaLakeSinkConfig::from_config(config)?;
1009        }
1010
1011        info!(
1012            table_path = %self.config.table_path,
1013            mode = %self.config.write_mode,
1014            guarantee = %self.config.delivery_guarantee,
1015            "opening Delta Lake sink connector"
1016        );
1017
1018        // When delta-lake feature is enabled, open/create the actual table.
1019        // If Unity Catalog auto-create is configured but no schema is available
1020        // yet, defer initialization to the first write_batch() call.
1021        #[cfg(feature = "delta-lake")]
1022        {
1023            let should_defer = matches!(
1024                self.config.catalog_type,
1025                super::delta_config::DeltaCatalogType::Unity { .. }
1026            ) && self.config.catalog_storage_location.is_some()
1027                && self.schema.is_none();
1028
1029            if should_defer {
1030                info!(
1031                    "Unity Catalog auto-create configured but pipeline schema not yet \
1032                     available — deferring Delta table init to first begin_epoch"
1033                );
1034                self.needs_deferred_delta_init = true;
1035                self.state = ConnectorState::Initializing;
1036                return Ok(());
1037            }
1038
1039            self.init_delta_table().await?;
1040
1041            // If table still has no version after init (new table, no schema yet),
1042            // defer full creation to the first write_batch() when schema is available.
1043            if self.table.as_ref().is_some_and(|t| t.version().is_none()) && self.schema.is_none() {
1044                self.needs_deferred_delta_init = true;
1045                self.state = ConnectorState::Initializing;
1046                return Ok(());
1047            }
1048        }
1049
1050        #[cfg(not(feature = "delta-lake"))]
1051        {
1052            self.state = ConnectorState::Failed;
1053            return Err(ConnectorError::ConfigurationError(
1054                "Delta Lake sink requires the 'delta-lake' feature to be enabled. \
1055                 Build with: cargo build --features delta-lake"
1056                    .into(),
1057            ));
1058        }
1059
1060        #[cfg(feature = "delta-lake")]
1061        {
1062            self.state = ConnectorState::Running;
1063            info!("Delta Lake sink connector opened successfully");
1064            Ok(())
1065        }
1066    }
1067
1068    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
1069        // Accept both Running and Initializing (deferred init in progress).
1070        if self.state != ConnectorState::Running && self.state != ConnectorState::Initializing {
1071            return Err(ConnectorError::InvalidState {
1072                expected: "Running".into(),
1073                actual: self.state.to_string(),
1074            });
1075        }
1076
1077        if batch.num_rows() == 0 {
1078            return Ok(WriteResult::new(0, 0));
1079        }
1080
1081        if self.epoch_skipped {
1082            return Ok(WriteResult::new(0, 0));
1083        }
1084
1085        // Handle schema on first write. In upsert mode, strip metadata columns
1086        // (_op, _ts_ms) so the Delta table isn't created with changelog columns.
1087        if self.schema.is_none() {
1088            self.schema = Some(Self::target_schema(&batch.schema(), self.config.write_mode));
1089        }
1090
1091        // Fallback for deferred init: if begin_epoch() couldn't complete
1092        // init (schema was still None), complete it now that the first
1093        // batch provides a schema.
1094        #[cfg(feature = "delta-lake")]
1095        if self.needs_deferred_delta_init {
1096            info!("schema now available from first batch — completing deferred Delta table init");
1097            match self.init_delta_table().await {
1098                Ok(()) => {
1099                    self.needs_deferred_delta_init = false;
1100                    self.state = ConnectorState::Running;
1101                    info!("Delta Lake sink connector opened successfully (deferred)");
1102                }
1103                Err(e) => {
1104                    self.state = ConnectorState::Failed;
1105                    return Err(e);
1106                }
1107            }
1108        }
1109
1110        let num_rows = batch.num_rows();
1111        let estimated_bytes = Self::estimate_batch_size(batch);
1112
1113        // Hard cap on combined buffered + staged data. Applies to both
1114        // delivery guarantees:
1115        // - Exactly-once cannot flush opportunistically from write_batch
1116        //   (a mid-epoch flush would leak rows on rollback), so pending
1117        //   accumulates until the next checkpoint.
1118        // - At-least-once normally auto-flushes via `should_flush()`, but
1119        //   if flushes fail repeatedly, `staged_batches` holds data while
1120        //   `buffer` keeps growing — without this cap the sink can OOM.
1121        //
1122        // A single incoming batch may itself exceed the cap; we cannot
1123        // split or reject it (the rows would be lost), so we widen the
1124        // cap to `num_rows`/`estimated_bytes` when a batch alone is
1125        // larger. This still rejects the *next* batch if that one-shot
1126        // admission left us bloated.
1127        let pending_rows = self.buffered_rows + self.staged_rows + num_rows;
1128        let pending_bytes = self.buffered_bytes + self.staged_bytes + estimated_bytes;
1129        let row_cap = self
1130            .config
1131            .max_buffer_records
1132            .saturating_mul(4)
1133            .max(num_rows);
1134        let byte_cap = (self.config.target_file_size as u64)
1135            .saturating_mul(4)
1136            .max(estimated_bytes);
1137        if pending_rows > row_cap || pending_bytes > byte_cap {
1138            return Err(ConnectorError::WriteError(format!(
1139                "delta sink buffer full ({pending_rows} rows, \
1140                 {pending_bytes} bytes pending; cap {row_cap} rows, \
1141                 {byte_cap} bytes) — backpressure until next flush/commit"
1142            )));
1143        }
1144
1145        // Buffer the batch.
1146        if self.buffer_start_time.is_none() {
1147            self.buffer_start_time = Some(Instant::now());
1148        }
1149        self.buffer.push(batch.clone());
1150        self.buffered_rows += num_rows;
1151        self.buffered_bytes += estimated_bytes;
1152
1153        #[cfg(feature = "delta-lake")]
1154        if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce && self.should_flush() {
1155            if !self.staged_batches.is_empty() {
1156                self.flush_staged_to_delta().await?;
1157            }
1158            self.staged_batches = std::mem::take(&mut self.buffer);
1159            self.staged_rows = self.buffered_rows;
1160            self.staged_bytes = self.buffered_bytes;
1161            self.buffered_rows = 0;
1162            self.buffered_bytes = 0;
1163            self.buffer_start_time = None;
1164            self.flush_staged_to_delta().await?;
1165        }
1166
1167        Ok(WriteResult::new(0, 0))
1168    }
1169
1170    fn schema(&self) -> SchemaRef {
1171        self.schema
1172            .clone()
1173            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
1174    }
1175
1176    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1177        // Complete deferred Delta table init on the first epoch.
1178        // This must run BEFORE the epoch skip check so that
1179        // last_committed_epoch is resolved from the Delta log.
1180        // Without this, exactly-once recovery would miss already-committed
1181        // epochs and produce duplicates.
1182        #[cfg(feature = "delta-lake")]
1183        if self.needs_deferred_delta_init {
1184            // Schema may not be available yet on the very first epoch.
1185            // If so, buffer the epoch — the write_batch will provide it.
1186            // But if the pipeline provided a schema via with_schema() or a
1187            // previous epoch's write_batch set it, complete init now.
1188            if self.schema.is_some() {
1189                info!("schema available — completing deferred Delta table init");
1190                match self.init_delta_table().await {
1191                    Ok(()) => {
1192                        self.needs_deferred_delta_init = false;
1193                        self.state = ConnectorState::Running;
1194                        info!("Delta Lake sink connector opened successfully (deferred)");
1195                    }
1196                    Err(e) => {
1197                        self.state = ConnectorState::Failed;
1198                        return Err(e);
1199                    }
1200                }
1201            }
1202        }
1203
1204        // For exactly-once, skip epochs already committed.
1205        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1206            && epoch <= self.last_committed_epoch
1207        {
1208            warn!(
1209                epoch,
1210                last_committed = self.last_committed_epoch,
1211                "Delta Lake: skipping already-committed epoch"
1212            );
1213            self.epoch_skipped = true;
1214            return Ok(());
1215        }
1216
1217        self.epoch_skipped = false;
1218        self.current_epoch = epoch;
1219        self.buffer.clear();
1220        self.buffered_rows = 0;
1221        self.buffered_bytes = 0;
1222        self.buffer_start_time = None;
1223
1224        debug!(epoch, "Delta Lake: began epoch");
1225        Ok(())
1226    }
1227
1228    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1229        // Skip if already committed (exactly-once idempotency).
1230        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1231            && epoch <= self.last_committed_epoch
1232        {
1233            return Ok(());
1234        }
1235
1236        // Stage buffered data for commit. The actual Delta write happens in
1237        // commit_epoch() — this ensures rollback_epoch() can discard the data
1238        // without leaving orphan files in Delta.
1239        if !self.buffer.is_empty() {
1240            self.staged_batches = std::mem::take(&mut self.buffer);
1241            self.staged_rows = self.buffered_rows;
1242            self.staged_bytes = self.buffered_bytes;
1243            self.buffered_rows = 0;
1244            self.buffered_bytes = 0;
1245            self.buffer_start_time = None;
1246        }
1247
1248        debug!(epoch, "Delta Lake: pre-committed (batches staged)");
1249        Ok(())
1250    }
1251
1252    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1253        // Skip if already committed (exactly-once idempotency).
1254        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
1255            && epoch <= self.last_committed_epoch
1256        {
1257            return Ok(());
1258        }
1259
1260        // Write staged data to Delta as a single atomic transaction.
1261        #[cfg(feature = "delta-lake")]
1262        {
1263            if !self.staged_batches.is_empty() {
1264                self.flush_staged_to_delta().await?;
1265            }
1266        }
1267
1268        self.last_committed_epoch = epoch;
1269
1270        info!(
1271            epoch,
1272            delta_version = self.delta_version,
1273            "Delta Lake: committed epoch"
1274        );
1275
1276        Ok(())
1277    }
1278
1279    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
1280        // Discard both buffered and staged data. Because the actual Delta
1281        // write only happens in commit_epoch(), no orphan files are created.
1282        self.buffer.clear();
1283        self.buffered_rows = 0;
1284        self.buffered_bytes = 0;
1285        self.buffer_start_time = None;
1286        self.staged_batches.clear();
1287        self.staged_rows = 0;
1288        self.staged_bytes = 0;
1289
1290        self.epoch_skipped = false;
1291        self.metrics.record_rollback();
1292        warn!(epoch, "Delta Lake: rolled back epoch");
1293        Ok(())
1294    }
1295
1296    fn health_check(&self) -> HealthStatus {
1297        match self.state {
1298            ConnectorState::Running => HealthStatus::Healthy,
1299            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1300            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1301            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1302            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1303            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1304        }
1305    }
1306
1307    fn metrics(&self) -> ConnectorMetrics {
1308        self.metrics.to_connector_metrics()
1309    }
1310
1311    fn capabilities(&self) -> SinkConnectorCapabilities {
1312        // Delta commits can run long under compaction or contention.
1313        let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(180)).with_idempotent();
1314
1315        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1316            caps = caps.with_exactly_once().with_two_phase_commit();
1317        }
1318        if self.config.write_mode == DeltaWriteMode::Upsert {
1319            caps = caps.with_upsert().with_changelog();
1320        }
1321        if self.config.schema_evolution {
1322            caps = caps.with_schema_evolution();
1323        }
1324        if !self.config.partition_columns.is_empty() {
1325            caps = caps.with_partitioned();
1326        }
1327
1328        caps
1329    }
1330
1331    async fn flush(&mut self) -> Result<(), ConnectorError> {
1332        // For at-least-once delivery, flush() is the only commit trigger
1333        // because the checkpoint coordinator skips begin_epoch/pre_commit/
1334        // commit_epoch for non-exactly-once sinks. Write directly to Delta.
1335        #[cfg(feature = "delta-lake")]
1336        if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1337            // Retry any orphaned staged data from a prior failed flush
1338            // before moving new data in, to prevent silent data loss.
1339            // flush_staged_to_delta handles table == None via reopen_table().
1340            if !self.staged_batches.is_empty() {
1341                self.flush_staged_to_delta().await?;
1342            }
1343
1344            // Stage new buffered data and flush to Delta.
1345            if !self.buffer.is_empty() {
1346                self.staged_batches = std::mem::take(&mut self.buffer);
1347                self.staged_rows = self.buffered_rows;
1348                self.staged_bytes = self.buffered_bytes;
1349                self.buffered_rows = 0;
1350                self.buffered_bytes = 0;
1351                self.buffer_start_time = None;
1352
1353                self.flush_staged_to_delta().await?;
1354            }
1355            return Ok(());
1356        }
1357
1358        if self.buffer.is_empty() {
1359            return Ok(());
1360        }
1361
1362        // For exactly-once, just coalesce in memory — the 2PC path
1363        // (pre_commit + commit_epoch) handles the actual Delta write.
1364        if self.buffer.len() > 1 {
1365            let schema = self.buffer[0].schema();
1366            let combined = arrow_select::concat::concat_batches(&schema, &self.buffer)
1367                .map_err(|e| ConnectorError::Internal(format!("concat failed: {e}")))?;
1368            self.buffer.clear();
1369            self.buffer.push(combined);
1370        }
1371        Ok(())
1372    }
1373
1374    async fn close(&mut self) -> Result<(), ConnectorError> {
1375        info!("closing Delta Lake sink connector");
1376
1377        // Commit any remaining data before closing.
1378        // For at-least-once, use flush() which handles both orphaned
1379        // staged data and buffered data. For exactly-once, use 2PC.
1380        #[cfg(feature = "delta-lake")]
1381        if self.config.delivery_guarantee != DeliveryGuarantee::ExactlyOnce {
1382            self.flush().await?;
1383        } else if !self.buffer.is_empty() {
1384            self.pre_commit(self.current_epoch).await?;
1385            self.commit_epoch(self.current_epoch).await?;
1386        }
1387
1388        // Cancel and join the background compaction task.
1389        #[cfg(feature = "delta-lake")]
1390        {
1391            if let Some(cancel) = self.compaction_cancel.take() {
1392                cancel.cancel();
1393            }
1394            if let Some(handle) = self.compaction_handle.take() {
1395                // Wait up to 5 seconds for the compaction task to finish.
1396                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
1397            }
1398        }
1399
1400        #[cfg(feature = "delta-lake")]
1401        if let Some(pending) = self.pending_reopen.take() {
1402            pending.abort();
1403        }
1404
1405        // Drop the table handle when closing.
1406        #[cfg(feature = "delta-lake")]
1407        {
1408            self.table = None;
1409        }
1410
1411        self.state = ConnectorState::Closed;
1412
1413        info!(
1414            table_path = %self.config.table_path,
1415            delta_version = self.delta_version,
1416            "Delta Lake sink connector closed"
1417        );
1418
1419        Ok(())
1420    }
1421}
1422
1423/// Safety-net: if the sink is dropped mid-lifecycle (panic, config error,
1424/// shutdown without calling `close()`), cancel any background work so we
1425/// don't leak a compaction loop or a stray table-reopen task.
1426#[cfg(feature = "delta-lake")]
1427impl Drop for DeltaLakeSink {
1428    fn drop(&mut self) {
1429        if let Some(cancel) = self.compaction_cancel.take() {
1430            cancel.cancel();
1431        }
1432        if let Some(handle) = self.compaction_handle.take() {
1433            handle.abort();
1434        }
1435        if let Some(handle) = self.pending_reopen.take() {
1436            handle.abort();
1437        }
1438    }
1439}
1440
1441impl std::fmt::Debug for DeltaLakeSink {
1442    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1443        f.debug_struct("DeltaLakeSink")
1444            .field("state", &self.state)
1445            .field("table_path", &self.config.table_path)
1446            .field("mode", &self.config.write_mode)
1447            .field("guarantee", &self.config.delivery_guarantee)
1448            .field("current_epoch", &self.current_epoch)
1449            .field("last_committed_epoch", &self.last_committed_epoch)
1450            .field("buffered_rows", &self.buffered_rows)
1451            .field("delta_version", &self.delta_version)
1452            .field("epoch_skipped", &self.epoch_skipped)
1453            .finish_non_exhaustive()
1454    }
1455}
1456
1457// ── Helper functions ────────────────────────────────────────────────
1458
1459/// Applies ±25% jitter to a backoff delay in milliseconds.
1460///
1461/// Without jitter, multiple writers colliding on the same Delta version
1462/// retry in lockstep and keep colliding — a thundering herd. The 0.75–1.25
1463/// range spreads retries across a 500ms window for the default 2s max.
1464#[cfg(feature = "delta-lake")]
1465fn jittered_backoff_ms(base_ms: u64) -> u64 {
1466    use rand::RngExt as _;
1467    let factor: f64 = rand::rng().random_range(0.75_f64..=1.25_f64);
1468    #[allow(
1469        clippy::cast_possible_truncation,
1470        clippy::cast_sign_loss,
1471        clippy::cast_precision_loss
1472    )]
1473    let jittered = (base_ms as f64 * factor) as u64;
1474    jittered.max(1)
1475}
1476
1477/// Filters a `RecordBatch` using a boolean mask and projects to the given column indices.
1478///
1479/// Takes `mask` by value to hand it straight to `BooleanArray::from` without
1480/// an intermediate `Vec<bool>` copy. Projects before filtering so the SIMD
1481/// kernel only walks user columns, not the dropped metadata columns.
1482fn filter_and_project(
1483    batch: &RecordBatch,
1484    mask: Vec<bool>,
1485    col_indices: &[usize],
1486) -> Result<RecordBatch, ConnectorError> {
1487    use arrow_array::BooleanArray;
1488    use arrow_select::filter::filter_record_batch;
1489
1490    let bool_array = BooleanArray::from(mask);
1491
1492    let projected = batch
1493        .project(col_indices)
1494        .map_err(|e| ConnectorError::Internal(format!("batch projection failed: {e}")))?;
1495
1496    filter_record_batch(&projected, &bool_array)
1497        .map_err(|e| ConnectorError::Internal(format!("arrow filter failed: {e}")))
1498}
1499
1500#[cfg(test)]
1501#[allow(clippy::cast_possible_wrap)]
1502#[allow(clippy::cast_precision_loss)]
1503#[allow(clippy::float_cmp)]
1504mod tests {
1505    use super::super::delta_config::DeltaCatalogType;
1506    use super::*;
1507    use arrow_array::{Float64Array, Int64Array, StringArray};
1508    use arrow_schema::{DataType, Field, Schema};
1509
1510    fn test_schema() -> SchemaRef {
1511        Arc::new(Schema::new(vec![
1512            Field::new("id", DataType::Int64, false),
1513            Field::new("name", DataType::Utf8, true),
1514            Field::new("value", DataType::Float64, true),
1515        ]))
1516    }
1517
1518    fn test_config() -> DeltaLakeSinkConfig {
1519        #[cfg(unix)]
1520        let path = "/tmp/delta_test_nonexistent_8f3a";
1521        #[cfg(windows)]
1522        let path = "C:\\delta_test_nonexistent_8f3a";
1523        DeltaLakeSinkConfig::new(path)
1524    }
1525
1526    fn upsert_config() -> DeltaLakeSinkConfig {
1527        let mut cfg = test_config();
1528        cfg.write_mode = DeltaWriteMode::Upsert;
1529        cfg.merge_key_columns = vec!["id".to_string()];
1530        cfg
1531    }
1532
1533    fn test_batch(n: usize) -> RecordBatch {
1534        let ids: Vec<i64> = (0..n as i64).collect();
1535        let names: Vec<&str> = (0..n).map(|_| "test").collect();
1536        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
1537
1538        RecordBatch::try_new(
1539            test_schema(),
1540            vec![
1541                Arc::new(Int64Array::from(ids)),
1542                Arc::new(StringArray::from(names)),
1543                Arc::new(Float64Array::from(values)),
1544            ],
1545        )
1546        .unwrap()
1547    }
1548
1549    // ── Constructor tests ──
1550
1551    #[test]
1552    fn test_new_defaults() {
1553        let sink = DeltaLakeSink::new(test_config(), None);
1554        assert_eq!(sink.state(), ConnectorState::Created);
1555        assert_eq!(sink.current_epoch(), 0);
1556        assert_eq!(sink.last_committed_epoch(), 0);
1557        assert_eq!(sink.buffered_rows(), 0);
1558        assert_eq!(sink.buffered_bytes(), 0);
1559        assert_eq!(sink.delta_version(), 0);
1560        assert!(sink.schema.is_none());
1561    }
1562
1563    #[test]
1564    fn test_with_schema() {
1565        let schema = test_schema();
1566        let sink = DeltaLakeSink::with_schema(test_config(), schema.clone());
1567        assert_eq!(sink.schema(), schema);
1568    }
1569
1570    #[test]
1571    fn test_schema_empty_when_none() {
1572        let sink = DeltaLakeSink::new(test_config(), None);
1573        let schema = sink.schema();
1574        assert_eq!(schema.fields().len(), 0);
1575    }
1576
1577    #[cfg(feature = "delta-lake")]
1578    #[test]
1579    fn test_deferred_init_flag_default_false() {
1580        let sink = DeltaLakeSink::new(test_config(), None);
1581        assert!(!sink.needs_deferred_delta_init);
1582    }
1583
1584    fn unity_config() -> DeltaLakeSinkConfig {
1585        let mut config = test_config();
1586        config.catalog_type = DeltaCatalogType::Unity {
1587            workspace_url: "https://test.azuredatabricks.net".to_string(),
1588            access_token: "dapi123".to_string(),
1589        };
1590        config.catalog_name = Some("main".to_string());
1591        config.catalog_schema = Some("default".to_string());
1592        config.catalog_storage_location = Some("abfss://c@acct.dfs.core.windows.net/t".to_string());
1593        config
1594    }
1595
1596    #[cfg(feature = "delta-lake")]
1597    #[tokio::test]
1598    async fn test_open_defers_init_for_unity_no_schema() {
1599        use crate::config::ConnectorConfig;
1600
1601        let config = unity_config();
1602        let mut sink = DeltaLakeSink::new(config, None);
1603
1604        // open() with empty ConnectorConfig (simulates factory path)
1605        let connector_config = ConnectorConfig::new("delta-lake");
1606        // open() will re-parse but table.path is "/tmp/delta_test" (local),
1607        // so it won't actually reach UC REST. However from_config requires
1608        // table.path, so we use the sink's existing config by passing empty.
1609        // The sink skips re-parse when properties are empty.
1610        let result = sink.open(&connector_config).await;
1611        assert!(result.is_ok());
1612
1613        // Should be in Initializing state with deferred flag set.
1614        assert!(sink.needs_deferred_delta_init);
1615        assert_eq!(sink.state(), ConnectorState::Initializing);
1616        assert!(sink.schema.is_none());
1617    }
1618
1619    #[cfg(feature = "delta-lake")]
1620    #[tokio::test]
1621    async fn test_deferred_init_transitions_to_failed_on_error() {
1622        // When deferred init fails, the sink must transition to Failed
1623        // to prevent an unbounded retry storm.
1624        let mut sink = DeltaLakeSink::new(test_config(), None);
1625        sink.state = ConnectorState::Initializing;
1626        sink.needs_deferred_delta_init = true;
1627        sink.schema = Some(test_schema());
1628
1629        // begin_epoch will try init_delta_table() which will fail
1630        // (no real Delta table at /tmp/delta_test). The sink should
1631        // transition to Failed.
1632        let result = sink.begin_epoch(1).await;
1633        assert!(result.is_err());
1634        assert_eq!(sink.state(), ConnectorState::Failed);
1635        // Flag may still be set, but Failed state prevents further usage.
1636    }
1637
1638    #[cfg(feature = "delta-lake")]
1639    #[tokio::test]
1640    async fn test_write_batch_accepts_initializing_state() {
1641        // During deferred init, write_batch must accept Initializing state
1642        // so the first batch can provide the schema.
1643        let mut sink = DeltaLakeSink::new(test_config(), None);
1644        sink.state = ConnectorState::Initializing;
1645        sink.needs_deferred_delta_init = true;
1646
1647        let batch = test_batch(5);
1648        // write_batch sets schema, then tries init_delta_table which fails.
1649        // Sink transitions to Failed.
1650        let result = sink.write_batch(&batch).await;
1651        assert!(result.is_err());
1652        assert_eq!(sink.state(), ConnectorState::Failed);
1653        // Schema was set before init was attempted.
1654        assert!(sink.schema.is_some());
1655    }
1656
1657    #[test]
1658    fn test_no_deferred_init_without_catalog_storage_location() {
1659        // Unity catalog without catalog.storage.location should NOT defer.
1660        let mut config = unity_config();
1661        config.catalog_storage_location = None;
1662        let sink = DeltaLakeSink::new(config, None);
1663
1664        let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1665            && sink.config.catalog_storage_location.is_some()
1666            && sink.schema.is_none();
1667        assert!(!should_defer);
1668    }
1669
1670    #[test]
1671    fn test_no_deferred_init_with_schema() {
1672        // Unity catalog with schema already set should NOT defer.
1673        let config = unity_config();
1674        let sink = DeltaLakeSink::with_schema(config, test_schema());
1675
1676        let should_defer = matches!(sink.config.catalog_type, DeltaCatalogType::Unity { .. })
1677            && sink.config.catalog_storage_location.is_some()
1678            && sink.schema.is_none();
1679        assert!(!should_defer);
1680    }
1681
1682    #[test]
1683    fn test_health_check_initializing_during_deferred() {
1684        let mut sink = DeltaLakeSink::new(test_config(), None);
1685        sink.state = ConnectorState::Initializing;
1686        // During deferred init, health should NOT report Healthy.
1687        assert!(matches!(sink.health_check(), HealthStatus::Unknown));
1688    }
1689
1690    // ── Batch size estimation ──
1691
1692    #[test]
1693    fn test_estimate_batch_size() {
1694        let batch = test_batch(100);
1695        let size = DeltaLakeSink::estimate_batch_size(&batch);
1696        assert!(size > 0);
1697    }
1698
1699    #[test]
1700    fn test_estimate_batch_size_empty() {
1701        let batch = RecordBatch::new_empty(test_schema());
1702        let size = DeltaLakeSink::estimate_batch_size(&batch);
1703        // Arrow arrays have baseline buffer allocation even with 0 rows,
1704        // so size may be small but not necessarily zero.
1705        assert!(size < 1024);
1706    }
1707
1708    // ── Should flush tests ──
1709
1710    #[test]
1711    fn test_should_flush_by_rows() {
1712        let mut config = test_config();
1713        config.max_buffer_records = 100;
1714        let mut sink = DeltaLakeSink::new(config, None);
1715        sink.buffered_rows = 99;
1716        assert!(!sink.should_flush());
1717        sink.buffered_rows = 100;
1718        assert!(sink.should_flush());
1719    }
1720
1721    #[test]
1722    fn test_should_flush_by_bytes() {
1723        let mut config = test_config();
1724        config.target_file_size = 1000;
1725        let mut sink = DeltaLakeSink::new(config, None);
1726        sink.buffered_bytes = 999;
1727        assert!(!sink.should_flush());
1728        sink.buffered_bytes = 1000;
1729        assert!(sink.should_flush());
1730    }
1731
1732    #[test]
1733    fn test_should_flush_empty() {
1734        let sink = DeltaLakeSink::new(test_config(), None);
1735        assert!(!sink.should_flush());
1736    }
1737
1738    #[tokio::test]
1739    async fn test_exactly_once_buffer_backpressure() {
1740        // Exactly-once mode rejects writes once the cumulative pending
1741        // buffer exceeds the hard cap (4× max_buffer_records). A single
1742        // incoming batch that by itself exceeds the cap is admitted (we
1743        // cannot split it without breaking exactly-once), but subsequent
1744        // batches hit backpressure.
1745        let mut config = test_config();
1746        config.delivery_guarantee = crate::connector::DeliveryGuarantee::ExactlyOnce;
1747        config.max_buffer_records = 10;
1748        let mut sink = DeltaLakeSink::new(config, None);
1749        sink.state = ConnectorState::Running;
1750
1751        // First batch of 50 rows exceeds 4× cap (40) but we admit it —
1752        // rejecting would wedge the pipeline with no way to split.
1753        let first = test_batch(50);
1754        sink.write_batch(&first)
1755            .await
1756            .expect("single oversized batch must be admitted");
1757        assert_eq!(sink.buffered_rows(), 50);
1758
1759        // A second normal batch must now be rejected: cumulative pending
1760        // (50 + 5 = 55) exceeds the effective cap (max(40, 5) = 40).
1761        let second = test_batch(5);
1762        let err = sink
1763            .write_batch(&second)
1764            .await
1765            .expect_err("should reject once cumulative buffer exceeds cap");
1766        let msg = err.to_string();
1767        assert!(
1768            msg.contains("buffer full"),
1769            "expected backpressure error, got: {msg}"
1770        );
1771        // Rejected batch must NOT have been buffered.
1772        assert_eq!(sink.buffered_rows(), 50);
1773    }
1774
1775    // ── Batch buffering tests ──
1776
1777    #[tokio::test]
1778    async fn test_write_batch_buffering() {
1779        let mut config = test_config();
1780        config.max_buffer_records = 100;
1781        let mut sink = DeltaLakeSink::new(config, None);
1782        sink.state = ConnectorState::Running;
1783
1784        let batch = test_batch(10);
1785        let result = sink.write_batch(&batch).await.unwrap();
1786
1787        // Should buffer, not flush (10 < 100)
1788        assert_eq!(result.records_written, 0);
1789        assert_eq!(sink.buffered_rows(), 10);
1790        assert!(sink.buffered_bytes() > 0);
1791    }
1792
1793    #[tokio::test]
1794    async fn test_write_batch_empty() {
1795        let mut sink = DeltaLakeSink::new(test_config(), None);
1796        sink.state = ConnectorState::Running;
1797
1798        let batch = test_batch(0);
1799        let result = sink.write_batch(&batch).await.unwrap();
1800        assert_eq!(result.records_written, 0);
1801        assert_eq!(sink.buffered_rows(), 0);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_write_batch_not_running() {
1806        let mut sink = DeltaLakeSink::new(test_config(), None);
1807        // state is Created, not Running
1808
1809        let batch = test_batch(10);
1810        let result = sink.write_batch(&batch).await;
1811        assert!(result.is_err());
1812    }
1813
1814    #[tokio::test]
1815    async fn test_write_batch_sets_schema() {
1816        let mut sink = DeltaLakeSink::new(test_config(), None);
1817        sink.state = ConnectorState::Running;
1818        assert!(sink.schema.is_none());
1819
1820        let batch = test_batch(5);
1821        sink.write_batch(&batch).await.unwrap();
1822        assert!(sink.schema.is_some());
1823        assert_eq!(sink.schema.as_ref().unwrap().fields().len(), 3);
1824    }
1825
1826    #[tokio::test]
1827    async fn test_multiple_write_batches_accumulate() {
1828        let mut config = test_config();
1829        config.max_buffer_records = 100;
1830        let mut sink = DeltaLakeSink::new(config, None);
1831        sink.state = ConnectorState::Running;
1832
1833        let batch = test_batch(10);
1834        sink.write_batch(&batch).await.unwrap();
1835        sink.write_batch(&batch).await.unwrap();
1836        sink.write_batch(&batch).await.unwrap();
1837
1838        assert_eq!(sink.buffered_rows(), 30);
1839    }
1840
1841    // ── Epoch lifecycle tests ──
1842    // Note: Epoch lifecycle with real I/O is tested in delta_io.rs integration tests.
1843
1844    #[tokio::test]
1845    async fn test_rollback_clears_buffer() {
1846        let mut config = test_config();
1847        config.max_buffer_records = 1000;
1848        let mut sink = DeltaLakeSink::new(config, None);
1849        sink.state = ConnectorState::Running;
1850
1851        let batch = test_batch(50);
1852        sink.write_batch(&batch).await.unwrap();
1853        assert_eq!(sink.buffered_rows(), 50);
1854
1855        sink.rollback_epoch(0).await.unwrap();
1856        assert_eq!(sink.buffered_rows(), 0);
1857        assert_eq!(sink.buffered_bytes(), 0);
1858    }
1859
1860    /// D001: Rollback after `pre_commit` must discard staged data.
1861    /// `pre_commit` stages batches; rollback discards them without writing to Delta.
1862    #[tokio::test]
1863    async fn test_rollback_after_pre_commit_discards_staged() {
1864        let mut config = test_config();
1865        config.max_buffer_records = 1000;
1866        let mut sink = DeltaLakeSink::new(config, None);
1867        sink.state = ConnectorState::Running;
1868
1869        sink.begin_epoch(1).await.unwrap();
1870        let batch = test_batch(50);
1871        sink.write_batch(&batch).await.unwrap();
1872        assert_eq!(sink.buffered_rows(), 50);
1873
1874        // pre_commit stages the buffer
1875        sink.pre_commit(1).await.unwrap();
1876        assert_eq!(sink.buffered_rows(), 0);
1877        assert_eq!(sink.staged_rows, 50);
1878        assert!(!sink.staged_batches.is_empty());
1879
1880        // rollback discards both buffer and staged data
1881        sink.rollback_epoch(1).await.unwrap();
1882        assert_eq!(sink.buffered_rows(), 0);
1883        assert_eq!(sink.staged_rows, 0);
1884        assert_eq!(sink.staged_bytes, 0);
1885        assert!(sink.staged_batches.is_empty());
1886        assert_eq!(sink.delta_version(), 0); // no Delta write occurred
1887    }
1888
1889    /// Staged data is preserved across `pre_commit` → failed commit → rollback.
1890    /// This verifies that `pre_commit` does not destroy staged state, so a
1891    /// subsequent rollback can discard it cleanly.
1892    #[tokio::test]
1893    async fn test_staged_data_preserved_until_commit_or_rollback() {
1894        let mut config = test_config();
1895        config.max_buffer_records = 1000;
1896        let mut sink = DeltaLakeSink::new(config, None);
1897        sink.state = ConnectorState::Running;
1898
1899        sink.begin_epoch(1).await.unwrap();
1900        sink.write_batch(&test_batch(25)).await.unwrap();
1901        sink.write_batch(&test_batch(25)).await.unwrap();
1902
1903        // pre_commit moves buffer → staged
1904        sink.pre_commit(1).await.unwrap();
1905        assert_eq!(sink.staged_rows, 50);
1906        assert_eq!(sink.staged_batches.len(), 2);
1907        assert_eq!(sink.buffered_rows(), 0);
1908
1909        // Simulate: commit_epoch would write to Delta, but without the
1910        // feature we exercise the local path. Verify staged state is
1911        // consumed only on success.
1912        // (Without delta-lake feature, commit_epoch calls commit_local
1913        // which succeeds.)
1914
1915        // Instead, test rollback: staged data should be discarded.
1916        sink.rollback_epoch(1).await.unwrap();
1917        assert!(sink.staged_batches.is_empty());
1918        assert_eq!(sink.staged_rows, 0);
1919        assert_eq!(sink.staged_bytes, 0);
1920    }
1921
1922    #[tokio::test]
1923    async fn test_commit_empty_epoch() {
1924        let mut sink = DeltaLakeSink::new(test_config(), None);
1925        sink.state = ConnectorState::Running;
1926
1927        sink.begin_epoch(1).await.unwrap();
1928        // No writes
1929        sink.commit_epoch(1).await.unwrap();
1930        assert_eq!(sink.last_committed_epoch(), 1);
1931        assert_eq!(sink.delta_version(), 0); // No version bump (no files)
1932    }
1933
1934    // ── Flush tests ──
1935    // Note: These tests bypass open() and test business logic only.
1936
1937    #[tokio::test]
1938    async fn test_flush_coalesces_buffer() {
1939        let mut config = test_config();
1940        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1941        config.writer_id = "test-writer".to_string();
1942        let mut sink = DeltaLakeSink::new(config, None);
1943        sink.state = ConnectorState::Running;
1944
1945        let batch = test_batch(10);
1946        sink.write_batch(&batch).await.unwrap();
1947        sink.write_batch(&batch).await.unwrap();
1948        assert_eq!(sink.buffer.len(), 2);
1949
1950        // flush() coalesces batches but does not write to Delta.
1951        sink.flush().await.unwrap();
1952        assert_eq!(sink.buffer.len(), 1);
1953        assert_eq!(sink.buffered_rows(), 20);
1954    }
1955
1956    // ── Open and close tests ──
1957    // Note: These tests use fake paths that don't exist.
1958    // With the delta-lake feature, open() tries to actually access the path.
1959    // See delta_io.rs for integration tests with real I/O.
1960
1961    #[tokio::test]
1962    async fn test_close() {
1963        let mut sink = DeltaLakeSink::new(test_config(), None);
1964        sink.state = ConnectorState::Running;
1965
1966        sink.close().await.unwrap();
1967        assert_eq!(sink.state(), ConnectorState::Closed);
1968    }
1969
1970    // ── Health check tests ──
1971
1972    #[test]
1973    fn test_health_check_created() {
1974        let sink = DeltaLakeSink::new(test_config(), None);
1975        assert_eq!(sink.health_check(), HealthStatus::Unknown);
1976    }
1977
1978    #[test]
1979    fn test_health_check_running() {
1980        let mut sink = DeltaLakeSink::new(test_config(), None);
1981        sink.state = ConnectorState::Running;
1982        assert_eq!(sink.health_check(), HealthStatus::Healthy);
1983    }
1984
1985    #[test]
1986    fn test_health_check_closed() {
1987        let mut sink = DeltaLakeSink::new(test_config(), None);
1988        sink.state = ConnectorState::Closed;
1989        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1990    }
1991
1992    #[test]
1993    fn test_health_check_failed() {
1994        let mut sink = DeltaLakeSink::new(test_config(), None);
1995        sink.state = ConnectorState::Failed;
1996        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1997    }
1998
1999    #[test]
2000    fn test_health_check_paused() {
2001        let mut sink = DeltaLakeSink::new(test_config(), None);
2002        sink.state = ConnectorState::Paused;
2003        assert!(matches!(sink.health_check(), HealthStatus::Degraded(_)));
2004    }
2005
2006    // ── Capabilities tests ──
2007
2008    #[test]
2009    fn test_capabilities_append_exactly_once() {
2010        let mut config = test_config();
2011        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
2012        let sink = DeltaLakeSink::new(config, None);
2013        let caps = sink.capabilities();
2014        assert!(caps.exactly_once);
2015        assert!(caps.idempotent);
2016        assert!(!caps.upsert);
2017        assert!(!caps.changelog);
2018        assert!(!caps.schema_evolution);
2019        assert!(!caps.partitioned);
2020    }
2021
2022    #[test]
2023    fn test_capabilities_upsert() {
2024        let sink = DeltaLakeSink::new(upsert_config(), None);
2025        let caps = sink.capabilities();
2026        assert!(caps.upsert);
2027        assert!(caps.changelog);
2028        assert!(caps.idempotent);
2029    }
2030
2031    #[test]
2032    fn test_capabilities_schema_evolution() {
2033        let mut config = test_config();
2034        config.schema_evolution = true;
2035        let sink = DeltaLakeSink::new(config, None);
2036        let caps = sink.capabilities();
2037        assert!(caps.schema_evolution);
2038    }
2039
2040    #[test]
2041    fn test_capabilities_partitioned() {
2042        let mut config = test_config();
2043        config.partition_columns = vec!["trade_date".to_string()];
2044        let sink = DeltaLakeSink::new(config, None);
2045        let caps = sink.capabilities();
2046        assert!(caps.partitioned);
2047    }
2048
2049    #[test]
2050    fn test_capabilities_at_least_once() {
2051        let mut config = test_config();
2052        config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
2053        let sink = DeltaLakeSink::new(config, None);
2054        let caps = sink.capabilities();
2055        assert!(!caps.exactly_once);
2056        assert!(caps.idempotent);
2057    }
2058
2059    // ── Metrics tests ──
2060
2061    #[test]
2062    fn test_metrics_initial() {
2063        let sink = DeltaLakeSink::new(test_config(), None);
2064        let m = sink.metrics();
2065        assert_eq!(m.records_total, 0);
2066        assert_eq!(m.bytes_total, 0);
2067        assert_eq!(m.errors_total, 0);
2068    }
2069
2070    // ── Changelog splitting tests ──
2071
2072    fn changelog_schema() -> SchemaRef {
2073        Arc::new(Schema::new(vec![
2074            Field::new("id", DataType::Int64, false),
2075            Field::new("name", DataType::Utf8, true),
2076            Field::new("_op", DataType::Utf8, false),
2077            Field::new("_ts_ms", DataType::Int64, false),
2078        ]))
2079    }
2080
2081    fn changelog_batch() -> RecordBatch {
2082        RecordBatch::try_new(
2083            changelog_schema(),
2084            vec![
2085                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
2086                Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
2087                Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
2088                Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
2089            ],
2090        )
2091        .unwrap()
2092    }
2093
2094    #[test]
2095    fn test_split_changelog_batch() {
2096        let batch = changelog_batch();
2097        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2098
2099        // Inserts: rows 0 (I), 1 (U), 3 (I) = 3 rows
2100        assert_eq!(inserts.num_rows(), 3);
2101        // Deletes: rows 2 (D), 4 (D) = 2 rows
2102        assert_eq!(deletes.num_rows(), 2);
2103
2104        // Metadata columns should be stripped
2105        assert_eq!(inserts.num_columns(), 2); // id, name only
2106        assert_eq!(deletes.num_columns(), 2);
2107
2108        // Verify insert values
2109        let insert_ids = inserts
2110            .column(0)
2111            .as_any()
2112            .downcast_ref::<Int64Array>()
2113            .unwrap();
2114        assert_eq!(insert_ids.value(0), 1);
2115        assert_eq!(insert_ids.value(1), 2);
2116        assert_eq!(insert_ids.value(2), 4);
2117
2118        // Verify delete values
2119        let delete_ids = deletes
2120            .column(0)
2121            .as_any()
2122            .downcast_ref::<Int64Array>()
2123            .unwrap();
2124        assert_eq!(delete_ids.value(0), 3);
2125        assert_eq!(delete_ids.value(1), 5);
2126    }
2127
2128    #[test]
2129    fn test_split_changelog_all_inserts() {
2130        let schema = changelog_schema();
2131        let batch = RecordBatch::try_new(
2132            schema,
2133            vec![
2134                Arc::new(Int64Array::from(vec![1, 2])),
2135                Arc::new(StringArray::from(vec!["a", "b"])),
2136                Arc::new(StringArray::from(vec!["I", "I"])),
2137                Arc::new(Int64Array::from(vec![100, 200])),
2138            ],
2139        )
2140        .unwrap();
2141
2142        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2143        assert_eq!(inserts.num_rows(), 2);
2144        assert_eq!(deletes.num_rows(), 0);
2145    }
2146
2147    #[test]
2148    fn test_split_changelog_all_deletes() {
2149        let schema = changelog_schema();
2150        let batch = RecordBatch::try_new(
2151            schema,
2152            vec![
2153                Arc::new(Int64Array::from(vec![1, 2])),
2154                Arc::new(StringArray::from(vec!["a", "b"])),
2155                Arc::new(StringArray::from(vec!["D", "D"])),
2156                Arc::new(Int64Array::from(vec![100, 200])),
2157            ],
2158        )
2159        .unwrap();
2160
2161        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2162        assert_eq!(inserts.num_rows(), 0);
2163        assert_eq!(deletes.num_rows(), 2);
2164    }
2165
2166    #[test]
2167    fn test_split_changelog_missing_op_column() {
2168        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
2169        let batch =
2170            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
2171
2172        let result = DeltaLakeSink::split_changelog_batch(&batch);
2173        assert!(result.is_err());
2174    }
2175
2176    #[test]
2177    fn test_split_changelog_snapshot_read() {
2178        let schema = changelog_schema();
2179        let batch = RecordBatch::try_new(
2180            schema,
2181            vec![
2182                Arc::new(Int64Array::from(vec![1])),
2183                Arc::new(StringArray::from(vec!["a"])),
2184                Arc::new(StringArray::from(vec!["r"])), // snapshot read
2185                Arc::new(Int64Array::from(vec![100])),
2186            ],
2187        )
2188        .unwrap();
2189
2190        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
2191        assert_eq!(inserts.num_rows(), 1);
2192        assert_eq!(deletes.num_rows(), 0);
2193    }
2194
2195    // ── Debug output test ──
2196
2197    #[test]
2198    fn test_debug_output() {
2199        let sink = DeltaLakeSink::new(test_config(), None);
2200        let debug = format!("{sink:?}");
2201        assert!(debug.contains("DeltaLakeSink"));
2202        assert!(debug.contains("delta_test_nonexistent_8f3a"));
2203    }
2204}