Skip to main content

laminar_connectors/lakehouse/
delta.rs

1//! Delta Lake sink connector implementation.
2//!
3//! [`DeltaLakeSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to Delta Lake tables with ACID transactions and at-least-once delivery
5//! (exactly-once opt-in via `delivery.guarantee = 'exactly-once'`).
6//!
7//! # Write Strategies
8//!
9//! - **Append mode**: Arrow-to-Parquet zero-copy writes for immutable streams
10//! - **Overwrite mode**: Replace partition contents for recomputation
11//! - **Upsert mode**: CDC MERGE via Z-set changelog integration
12//!
13//! Exactly-once semantics use epoch-to-Delta-version mapping: each `LaminarDB`
14//! epoch maps to exactly one Delta Lake transaction via `txn` application
15//! transaction metadata in the Delta log.
16//!
17//! # Ring Architecture
18//!
19//! - **Ring 0**: No sink code. Data arrives via SPSC channel (~5ns push).
20//! - **Ring 1**: Batch buffering, Parquet writes, Delta log commits.
21//! - **Ring 2**: Schema management, configuration, health checks.
22
23use std::sync::Arc;
24use std::time::Instant;
25
26use arrow_array::{Array, RecordBatch};
27use arrow_schema::SchemaRef;
28use async_trait::async_trait;
29use tracing::{debug, info, warn};
30
31#[cfg(feature = "delta-lake")]
32use deltalake::DeltaTable;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::protocol::SaveMode;
36
37use crate::config::{ConnectorConfig, ConnectorState};
38use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
39use crate::error::ConnectorError;
40use crate::health::HealthStatus;
41use crate::metrics::ConnectorMetrics;
42
43use super::delta_config::{DeliveryGuarantee, DeltaLakeSinkConfig, DeltaWriteMode};
44use super::delta_metrics::DeltaLakeSinkMetrics;
45
46/// Delta Lake sink connector.
47///
48/// Writes Arrow `RecordBatch` to Delta Lake tables with ACID transactions,
49/// at-least-once delivery (exactly-once opt-in), partitioning, and
50/// background compaction.
51///
52/// # Lifecycle
53///
54/// ```text
55/// new() -> open() -> [begin_epoch() -> write_batch()* -> commit_epoch()] -> close()
56///                          |                                    |
57///                          +--- rollback_epoch() (on failure) --+
58/// ```
59///
60/// # Exactly-Once Semantics
61///
62/// Each `LaminarDB` epoch maps to exactly one Delta Lake transaction (version).
63/// On recovery, the sink checks `_delta_log/` for the last committed epoch
64/// via `txn` (application transaction) metadata. If an epoch was already
65/// committed, it is skipped (idempotent commit).
66///
67/// # 2PC Protocol
68///
69/// `pre_commit()` coalesces the buffer but does NOT write to Delta.
70/// `commit_epoch()` performs the actual Delta write+commit atomically.
71/// `rollback_epoch()` discards the staged buffer without side effects.
72/// This ensures that a rolled-back epoch never leaves data in Delta.
73pub struct DeltaLakeSink {
74    /// Sink configuration.
75    config: DeltaLakeSinkConfig,
76    /// Arrow schema for input batches (set on first write or from existing table).
77    schema: Option<SchemaRef>,
78    /// Connector lifecycle state.
79    state: ConnectorState,
80    /// Current epoch being written.
81    current_epoch: u64,
82    /// Last successfully committed epoch.
83    last_committed_epoch: u64,
84    /// `RecordBatch` buffer for the current epoch.
85    buffer: Vec<RecordBatch>,
86    /// Total rows buffered in current epoch.
87    buffered_rows: usize,
88    /// Total bytes buffered (estimated) in current epoch.
89    buffered_bytes: u64,
90    /// Number of Parquet files pending commit in current epoch.
91    pending_files: usize,
92    /// Current Delta Lake table version.
93    delta_version: u64,
94    /// Time when the current buffer started accumulating.
95    buffer_start_time: Option<Instant>,
96    /// Sink metrics.
97    metrics: DeltaLakeSinkMetrics,
98    /// Delta Lake table handle (present when `delta-lake` feature is enabled).
99    #[cfg(feature = "delta-lake")]
100    table: Option<DeltaTable>,
101    /// Whether the current epoch was skipped (already committed).
102    epoch_skipped: bool,
103    /// Staged batches ready for commit (populated by `pre_commit()`, consumed
104    /// by `commit_epoch()`). This separation ensures `rollback_epoch()` can
105    /// discard prepared data without leaving orphan files in Delta.
106    staged_batches: Vec<RecordBatch>,
107    /// Rows staged for commit (mirrors `staged_batches`).
108    staged_rows: usize,
109    /// Estimated bytes staged for commit.
110    staged_bytes: u64,
111    /// Cancellation token for the background compaction task.
112    #[cfg(feature = "delta-lake")]
113    compaction_cancel: Option<tokio_util::sync::CancellationToken>,
114    /// Handle for the background compaction task.
115    #[cfg(feature = "delta-lake")]
116    compaction_handle: Option<tokio::task::JoinHandle<()>>,
117}
118
119impl DeltaLakeSink {
120    /// Creates a new Delta Lake sink with the given configuration.
121    #[must_use]
122    pub fn new(config: DeltaLakeSinkConfig) -> Self {
123        Self {
124            config,
125            schema: None,
126            state: ConnectorState::Created,
127            current_epoch: 0,
128            last_committed_epoch: 0,
129            buffer: Vec::with_capacity(16),
130            buffered_rows: 0,
131            buffered_bytes: 0,
132            pending_files: 0,
133            delta_version: 0,
134            buffer_start_time: None,
135            metrics: DeltaLakeSinkMetrics::new(),
136            epoch_skipped: false,
137            staged_batches: Vec::new(),
138            staged_rows: 0,
139            staged_bytes: 0,
140            #[cfg(feature = "delta-lake")]
141            table: None,
142            #[cfg(feature = "delta-lake")]
143            compaction_cancel: None,
144            #[cfg(feature = "delta-lake")]
145            compaction_handle: None,
146        }
147    }
148
149    /// Creates a new Delta Lake sink with an explicit schema.
150    #[must_use]
151    pub fn with_schema(config: DeltaLakeSinkConfig, schema: SchemaRef) -> Self {
152        let mut sink = Self::new(config);
153        sink.schema = Some(schema);
154        sink
155    }
156
157    /// Returns the current connector state.
158    #[must_use]
159    pub fn state(&self) -> ConnectorState {
160        self.state
161    }
162
163    /// Returns the current epoch.
164    #[must_use]
165    pub fn current_epoch(&self) -> u64 {
166        self.current_epoch
167    }
168
169    /// Returns the last committed epoch.
170    #[must_use]
171    pub fn last_committed_epoch(&self) -> u64 {
172        self.last_committed_epoch
173    }
174
175    /// Returns the number of buffered rows pending flush.
176    #[must_use]
177    pub fn buffered_rows(&self) -> usize {
178        self.buffered_rows
179    }
180
181    /// Returns the estimated buffered bytes.
182    #[must_use]
183    pub fn buffered_bytes(&self) -> u64 {
184        self.buffered_bytes
185    }
186
187    /// Returns the current Delta Lake table version.
188    #[must_use]
189    pub fn delta_version(&self) -> u64 {
190        self.delta_version
191    }
192
193    /// Returns a reference to the sink metrics.
194    #[must_use]
195    pub fn sink_metrics(&self) -> &DeltaLakeSinkMetrics {
196        &self.metrics
197    }
198
199    /// Returns the sink configuration.
200    #[must_use]
201    pub fn config(&self) -> &DeltaLakeSinkConfig {
202        &self.config
203    }
204
205    /// Checks if a buffer flush is needed based on size or time thresholds.
206    #[must_use]
207    pub fn should_flush(&self) -> bool {
208        if self.buffered_rows >= self.config.max_buffer_records {
209            return true;
210        }
211        if self.buffered_bytes >= self.config.target_file_size as u64 {
212            return true;
213        }
214        if let Some(start) = self.buffer_start_time {
215            if start.elapsed() >= self.config.max_buffer_duration {
216                return true;
217            }
218        }
219        false
220    }
221
222    /// Returns the target table schema, stripping metadata columns (`_op`, etc.)
223    /// in upsert mode so the Delta table isn't created with changelog columns.
224    /// CDC metadata columns stripped from the target Delta table schema.
225    const CDC_METADATA_COLUMNS: &'static [&'static str] = &["_op", "_ts_ms"];
226
227    fn target_schema(batch_schema: &SchemaRef, write_mode: DeltaWriteMode) -> SchemaRef {
228        if write_mode == DeltaWriteMode::Upsert {
229            let fields: Vec<_> = batch_schema
230                .fields()
231                .iter()
232                .filter(|f| !Self::CDC_METADATA_COLUMNS.contains(&f.name().as_str()))
233                .cloned()
234                .collect();
235            Arc::new(arrow_schema::Schema::new(fields))
236        } else {
237            batch_schema.clone()
238        }
239    }
240
241    /// Estimates the byte size of a `RecordBatch`.
242    #[must_use]
243    pub fn estimate_batch_size(batch: &RecordBatch) -> u64 {
244        batch
245            .columns()
246            .iter()
247            .map(|col| col.get_array_memory_size() as u64)
248            .sum()
249    }
250
251    /// Writes all staged data to Delta Lake as a single atomic transaction.
252    ///
253    /// Called from `commit_epoch()` only — after the checkpoint manifest is
254    /// persisted. This ensures `rollback_epoch()` can discard staged data
255    /// without leaving orphaned files in Delta.
256    ///
257    /// On write failure the table handle and staged state are preserved so
258    /// that subsequent retries or rollbacks can still operate.
259    #[cfg(feature = "delta-lake")]
260    async fn flush_staged_to_delta(&mut self) -> Result<WriteResult, ConnectorError> {
261        if self.staged_batches.is_empty() {
262            return Ok(WriteResult::new(0, 0));
263        }
264
265        let total_rows = self.staged_rows;
266        let estimated_bytes = self.staged_bytes;
267
268        // delta-rs write/merge APIs consume DeltaTable by value and return
269        // a new handle on success. We must take() ownership to call them,
270        // but we must restore self.table on error so the sink stays usable.
271        let table = self
272            .table
273            .take()
274            .ok_or_else(|| ConnectorError::InvalidState {
275                expected: "table initialized".into(),
276                actual: "table not initialized".into(),
277            })?;
278
279        // Clone batches for the write — staged_batches is only cleared on
280        // success. RecordBatch::clone is Arc-bump only (~16-48ns per batch).
281        let batches: Vec<RecordBatch> = self.staged_batches.clone();
282
283        let write_result = if self.config.write_mode == DeltaWriteMode::Upsert {
284            // ── Upsert/Merge path ──
285            let combined =
286                match arrow_select::concat::concat_batches(&batches[0].schema(), &batches) {
287                    Ok(c) => c,
288                    Err(e) => {
289                        // Concat is a local op — restore table and propagate.
290                        self.table = Some(table);
291                        return Err(ConnectorError::Internal(format!(
292                            "failed to concat batches: {e}"
293                        )));
294                    }
295                };
296
297            super::delta_io::merge_changelog(
298                table,
299                combined,
300                &self.config.merge_key_columns,
301                &self.config.writer_id,
302                self.current_epoch,
303                self.config.schema_evolution,
304            )
305            .await
306            .map(|(t, result)| {
307                self.metrics.record_merge();
308                if result.rows_deleted > 0 {
309                    self.metrics.record_deletes(result.rows_deleted as u64);
310                }
311                t
312            })
313        } else {
314            // ── Append/Overwrite path ──
315            let save_mode = match self.config.write_mode {
316                DeltaWriteMode::Append => SaveMode::Append,
317                DeltaWriteMode::Overwrite => SaveMode::Overwrite,
318                DeltaWriteMode::Upsert => unreachable!("handled by the upsert branch above"),
319            };
320
321            let partition_cols = if self.config.partition_columns.is_empty() {
322                None
323            } else {
324                Some(self.config.partition_columns.as_slice())
325            };
326
327            super::delta_io::write_batches(
328                table,
329                batches,
330                &self.config.writer_id,
331                self.current_epoch,
332                save_mode,
333                partition_cols,
334                self.config.schema_evolution,
335            )
336            .await
337            .map(|(t, _version)| t)
338        };
339
340        // On error: delta-rs consumed the old table handle but the write
341        // failed. The table is unrecoverable from delta-rs's perspective,
342        // but staged_batches and staged_rows/bytes are intact so the
343        // caller can retry or rollback without data loss.
344        let table = match write_result {
345            Ok(t) => t,
346            Err(e) => {
347                // self.table is already None — the caller must handle this
348                // (rollback_epoch clears staged state; a retry re-opens).
349                return Err(e);
350            }
351        };
352
353        // ── Success: commit state ──
354        #[allow(clippy::cast_sign_loss)]
355        {
356            self.delta_version = table.version().unwrap_or(0) as u64;
357        }
358        self.table = Some(table);
359        self.pending_files = 0;
360
361        // Clear staged state only after confirmed success.
362        self.staged_batches.clear();
363        self.staged_rows = 0;
364        self.staged_bytes = 0;
365
366        self.metrics
367            .record_flush(total_rows as u64, estimated_bytes);
368        self.metrics.record_commit(self.delta_version);
369
370        debug!(
371            rows = total_rows,
372            bytes = estimated_bytes,
373            delta_version = self.delta_version,
374            "Delta Lake: committed staged data to Delta"
375        );
376
377        Ok(WriteResult::new(total_rows, estimated_bytes))
378    }
379
380    /// Commits pending files as a Delta Lake transaction (updates metrics).
381    /// Only used when the delta-lake feature is NOT enabled (local simulation).
382    #[cfg(not(feature = "delta-lake"))]
383    fn commit_local(&mut self, epoch: u64) {
384        self.delta_version += 1;
385        self.pending_files = 0;
386
387        // Record flush metrics for staged data.
388        self.metrics
389            .record_flush(self.staged_rows as u64, self.staged_bytes);
390        self.metrics.record_commit(self.delta_version);
391
392        debug!(
393            epoch,
394            delta_version = self.delta_version,
395            "Delta Lake: committed transaction"
396        );
397    }
398
399    /// Splits a changelog `RecordBatch` into insert and delete batches.
400    ///
401    /// Uses the `_op` metadata column:
402    /// - `"I"` (insert), `"U"` (update-after), `"r"` (snapshot read) -> insert
403    /// - `"D"` (delete) -> delete
404    ///
405    /// The returned batches exclude metadata columns (those starting with `_`).
406    ///
407    /// # Errors
408    ///
409    /// Returns `ConnectorError::ConfigurationError` if the `_op` column is
410    /// missing or not a string type.
411    pub fn split_changelog_batch(
412        batch: &RecordBatch,
413    ) -> Result<(RecordBatch, RecordBatch), ConnectorError> {
414        let op_idx = batch.schema().index_of("_op").map_err(|_| {
415            ConnectorError::ConfigurationError(
416                "upsert mode requires '_op' column in input schema".into(),
417            )
418        })?;
419
420        let op_array = batch
421            .column(op_idx)
422            .as_any()
423            .downcast_ref::<arrow_array::StringArray>()
424            .ok_or_else(|| {
425                ConnectorError::ConfigurationError("'_op' column must be String (Utf8) type".into())
426            })?;
427
428        // Build boolean masks (compact bit-buffers, no per-element heap allocation).
429        let len = op_array.len();
430        let mut insert_mask = Vec::with_capacity(len);
431        let mut delete_mask = Vec::with_capacity(len);
432
433        for i in 0..len {
434            if op_array.is_null(i) {
435                insert_mask.push(false);
436                delete_mask.push(false);
437                continue;
438            }
439            match op_array.value(i) {
440                "I" | "U" | "r" => {
441                    insert_mask.push(true);
442                    delete_mask.push(false);
443                }
444                "D" => {
445                    insert_mask.push(false);
446                    delete_mask.push(true);
447                }
448                _ => {
449                    insert_mask.push(false);
450                    delete_mask.push(false);
451                }
452            }
453        }
454
455        // Compute user-column projection indices once (strip metadata columns).
456        let user_col_indices: Vec<usize> = batch
457            .schema()
458            .fields()
459            .iter()
460            .enumerate()
461            .filter(|(_, f)| !f.name().starts_with('_'))
462            .map(|(i, _)| i)
463            .collect();
464
465        let insert_batch = filter_and_project(batch, &insert_mask, &user_col_indices)?;
466        let delete_batch = filter_and_project(batch, &delete_mask, &user_col_indices)?;
467
468        Ok((insert_batch, delete_batch))
469    }
470}
471
472/// Background compaction loop that periodically runs OPTIMIZE and VACUUM.
473///
474/// Opens its own `DeltaTable` handle (no shared state with the sink).
475#[cfg(feature = "delta-lake")]
476async fn compaction_loop(
477    table_path: String,
478    storage_options: Arc<std::collections::HashMap<String, String>>,
479    config: super::delta_config::CompactionConfig,
480    vacuum_retention: std::time::Duration,
481    cancel: tokio_util::sync::CancellationToken,
482) {
483    use super::delta_io;
484
485    info!(
486        table_path = %table_path,
487        check_interval_secs = config.check_interval.as_secs(),
488        "compaction background task started"
489    );
490
491    let mut interval = tokio::time::interval(config.check_interval);
492    // Skip the first immediate tick.
493    interval.tick().await;
494
495    loop {
496        tokio::select! {
497            () = cancel.cancelled() => {
498                info!("compaction background task cancelled");
499                return;
500            }
501            _ = interval.tick() => {
502                // Open a fresh table handle for compaction (no shared state).
503                // Clone the HashMap only here (once per tick, not avoidable
504                // since open_or_create_table takes owned HashMap).
505                let table = match delta_io::open_or_create_table(
506                    &table_path,
507                    (*storage_options).clone(),
508                    None,
509                )
510                .await
511                {
512                    Ok(t) => t,
513                    Err(e) => {
514                        warn!(error = %e, "compaction: failed to open table, will retry");
515                        continue;
516                    }
517                };
518
519                // Skip compaction if not enough files.
520                match table.snapshot() {
521                    Ok(snapshot) => {
522                        let file_count = snapshot.log_data().num_files();
523                        if file_count < config.min_files_for_compaction {
524                            debug!(
525                                file_count,
526                                min = config.min_files_for_compaction,
527                                "compaction: skipping, not enough files"
528                            );
529                            continue;
530                        }
531                    }
532                    Err(e) => {
533                        warn!(error = %e, "compaction: snapshot failed, skipping tick");
534                        continue;
535                    }
536                }
537
538                // Run OPTIMIZE.
539                let target_size = config.target_file_size as u64;
540                match delta_io::run_compaction(table, target_size, &config.z_order_columns).await {
541                    Ok((table, result)) => {
542                        debug!(
543                            files_added = result.files_added,
544                            files_removed = result.files_removed,
545                            "compaction: OPTIMIZE complete"
546                        );
547
548                        // Run VACUUM after compaction.
549                        match delta_io::run_vacuum(table, vacuum_retention).await {
550                            Ok((_table, files_deleted)) => {
551                                debug!(files_deleted, "compaction: VACUUM complete");
552                            }
553                            Err(e) => {
554                                warn!(error = %e, "compaction: VACUUM failed");
555                            }
556                        }
557                    }
558                    Err(e) => {
559                        warn!(error = %e, "compaction: OPTIMIZE failed");
560                    }
561                }
562            }
563        }
564    }
565}
566
567#[async_trait]
568impl SinkConnector for DeltaLakeSink {
569    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
570        self.state = ConnectorState::Initializing;
571
572        // Re-parse config if properties provided.
573        if !config.properties().is_empty() {
574            self.config = DeltaLakeSinkConfig::from_config(config)?;
575        }
576
577        info!(
578            table_path = %self.config.table_path,
579            mode = %self.config.write_mode,
580            guarantee = %self.config.delivery_guarantee,
581            "opening Delta Lake sink connector"
582        );
583
584        // When delta-lake feature is enabled, open/create the actual table.
585        #[cfg(feature = "delta-lake")]
586        {
587            use super::delta_io;
588
589            // Merge catalog options (Unity/Glue) into storage options.
590            let (resolved_path, merged_options) = delta_io::resolve_catalog_options(
591                &self.config.catalog_type,
592                self.config.catalog_database.as_deref(),
593                self.config.catalog_name.as_deref(),
594                self.config.catalog_schema.as_deref(),
595                &self.config.table_path,
596                &self.config.storage_options,
597                &self.config.catalog_properties,
598            )
599            .await?;
600
601            let table = delta_io::open_or_create_table(
602                &resolved_path,
603                merged_options.clone(),
604                self.schema.as_ref(),
605            )
606            .await?;
607
608            // Read schema from existing table if we don't have one.
609            if self.schema.is_none() {
610                if let Ok(schema) = delta_io::get_table_schema(&table) {
611                    self.schema = Some(schema);
612                }
613            }
614
615            // Resolve last committed epoch for exactly-once recovery.
616            if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
617                self.last_committed_epoch =
618                    delta_io::get_last_committed_epoch(&table, &self.config.writer_id).await;
619                if self.last_committed_epoch > 0 {
620                    info!(
621                        writer_id = %self.config.writer_id,
622                        last_committed_epoch = self.last_committed_epoch,
623                        "recovered last committed epoch from Delta Lake txn metadata"
624                    );
625                }
626            }
627
628            // Store the Delta version.
629            // Note: Delta Lake uses i64 for version, but our version is u64.
630            // Versions are always non-negative, so this is safe.
631            #[allow(clippy::cast_sign_loss)]
632            {
633                self.delta_version = table.version().unwrap_or(0) as u64;
634            }
635            self.table = Some(table);
636
637            // Spawn background compaction task if enabled.
638            if self.config.compaction.enabled {
639                let cancel = tokio_util::sync::CancellationToken::new();
640                let handle = tokio::spawn(compaction_loop(
641                    resolved_path.clone(),
642                    Arc::new(merged_options),
643                    self.config.compaction.clone(),
644                    self.config.vacuum_retention,
645                    cancel.clone(),
646                ));
647                self.compaction_cancel = Some(cancel);
648                self.compaction_handle = Some(handle);
649            }
650        }
651
652        #[cfg(not(feature = "delta-lake"))]
653        {
654            self.state = ConnectorState::Failed;
655            return Err(ConnectorError::ConfigurationError(
656                "Delta Lake sink requires the 'delta-lake' feature to be enabled. \
657                 Build with: cargo build --features delta-lake"
658                    .into(),
659            ));
660        }
661
662        #[cfg(feature = "delta-lake")]
663        {
664            self.state = ConnectorState::Running;
665            info!("Delta Lake sink connector opened successfully");
666            Ok(())
667        }
668    }
669
670    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
671        if self.state != ConnectorState::Running {
672            return Err(ConnectorError::InvalidState {
673                expected: "Running".into(),
674                actual: self.state.to_string(),
675            });
676        }
677
678        if batch.num_rows() == 0 {
679            return Ok(WriteResult::new(0, 0));
680        }
681
682        if self.epoch_skipped {
683            return Ok(WriteResult::new(0, 0));
684        }
685
686        // Handle schema on first write. In upsert mode, strip metadata columns
687        // (_op, _ts_ms) so the Delta table isn't created with changelog columns.
688        if self.schema.is_none() {
689            self.schema = Some(Self::target_schema(&batch.schema(), self.config.write_mode));
690        }
691
692        let num_rows = batch.num_rows();
693        let estimated_bytes = Self::estimate_batch_size(batch);
694
695        // Buffer the batch.
696        if self.buffer_start_time.is_none() {
697            self.buffer_start_time = Some(Instant::now());
698        }
699        self.buffer.push(batch.clone());
700        self.buffered_rows += num_rows;
701        self.buffered_bytes += estimated_bytes;
702
703        // Data stays in buffer until pre_commit(). No mid-epoch Delta writes.
704
705        Ok(WriteResult::new(0, 0))
706    }
707
708    fn schema(&self) -> SchemaRef {
709        self.schema
710            .clone()
711            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
712    }
713
714    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
715        // For exactly-once, skip epochs already committed.
716        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
717            && epoch <= self.last_committed_epoch
718        {
719            warn!(
720                epoch,
721                last_committed = self.last_committed_epoch,
722                "Delta Lake: skipping already-committed epoch"
723            );
724            self.epoch_skipped = true;
725            return Ok(());
726        }
727
728        self.epoch_skipped = false;
729        self.current_epoch = epoch;
730        self.buffer.clear();
731        self.buffered_rows = 0;
732        self.buffered_bytes = 0;
733        self.pending_files = 0;
734        self.buffer_start_time = None;
735
736        debug!(epoch, "Delta Lake: began epoch");
737        Ok(())
738    }
739
740    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
741        // Skip if already committed (exactly-once idempotency).
742        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
743            && epoch <= self.last_committed_epoch
744        {
745            return Ok(());
746        }
747
748        // Stage buffered data for commit. The actual Delta write happens in
749        // commit_epoch() — this ensures rollback_epoch() can discard the data
750        // without leaving orphan files in Delta.
751        if !self.buffer.is_empty() {
752            self.staged_batches = std::mem::take(&mut self.buffer);
753            self.staged_rows = self.buffered_rows;
754            self.staged_bytes = self.buffered_bytes;
755            self.buffered_rows = 0;
756            self.buffered_bytes = 0;
757            self.buffer_start_time = None;
758        }
759
760        #[cfg(not(feature = "delta-lake"))]
761        {
762            self.pending_files += 1;
763        }
764
765        debug!(epoch, "Delta Lake: pre-committed (batches staged)");
766        Ok(())
767    }
768
769    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
770        // Skip if already committed (exactly-once idempotency).
771        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
772            && epoch <= self.last_committed_epoch
773        {
774            return Ok(());
775        }
776
777        // Write staged data to Delta as a single atomic transaction.
778        #[cfg(feature = "delta-lake")]
779        {
780            if !self.staged_batches.is_empty() {
781                self.flush_staged_to_delta().await?;
782            }
783        }
784        #[cfg(not(feature = "delta-lake"))]
785        {
786            if self.pending_files > 0 || !self.staged_batches.is_empty() {
787                self.commit_local(epoch);
788                self.staged_batches.clear();
789                self.staged_rows = 0;
790                self.staged_bytes = 0;
791            }
792        }
793
794        self.last_committed_epoch = epoch;
795
796        info!(
797            epoch,
798            delta_version = self.delta_version,
799            "Delta Lake: committed epoch"
800        );
801
802        Ok(())
803    }
804
805    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
806        // Discard both buffered and staged data. Because the actual Delta
807        // write only happens in commit_epoch(), no orphan files are created.
808        self.buffer.clear();
809        self.buffered_rows = 0;
810        self.buffered_bytes = 0;
811        self.pending_files = 0;
812        self.buffer_start_time = None;
813        self.staged_batches.clear();
814        self.staged_rows = 0;
815        self.staged_bytes = 0;
816
817        self.epoch_skipped = false;
818        self.metrics.record_rollback();
819        warn!(epoch, "Delta Lake: rolled back epoch");
820        Ok(())
821    }
822
823    fn health_check(&self) -> HealthStatus {
824        match self.state {
825            ConnectorState::Running => HealthStatus::Healthy,
826            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
827            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
828            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
829            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
830            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
831        }
832    }
833
834    fn metrics(&self) -> ConnectorMetrics {
835        self.metrics.to_connector_metrics()
836    }
837
838    fn capabilities(&self) -> SinkConnectorCapabilities {
839        let mut caps = SinkConnectorCapabilities::default().with_idempotent();
840
841        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
842            caps = caps.with_exactly_once().with_two_phase_commit();
843        }
844        if self.config.write_mode == DeltaWriteMode::Upsert {
845            caps = caps.with_upsert().with_changelog();
846        }
847        if self.config.schema_evolution {
848            caps = caps.with_schema_evolution();
849        }
850        if !self.config.partition_columns.is_empty() {
851            caps = caps.with_partitioned();
852        }
853
854        caps
855    }
856
857    async fn flush(&mut self) -> Result<(), ConnectorError> {
858        // Coalesce buffered batches to reduce memory fragmentation.
859        // Actual Delta write is deferred to commit_epoch().
860        if self.buffer.len() > 1 {
861            let schema = self.buffer[0].schema();
862            let combined = arrow_select::concat::concat_batches(&schema, &self.buffer)
863                .map_err(|e| ConnectorError::Internal(format!("concat failed: {e}")))?;
864            self.buffer.clear();
865            self.buffer.push(combined);
866        }
867        Ok(())
868    }
869
870    async fn close(&mut self) -> Result<(), ConnectorError> {
871        info!("closing Delta Lake sink connector");
872
873        // Commit any remaining buffered data before closing.
874        if !self.buffer.is_empty() {
875            self.pre_commit(self.current_epoch).await?;
876            self.commit_epoch(self.current_epoch).await?;
877        }
878
879        // Cancel and join the background compaction task.
880        #[cfg(feature = "delta-lake")]
881        {
882            if let Some(cancel) = self.compaction_cancel.take() {
883                cancel.cancel();
884            }
885            if let Some(handle) = self.compaction_handle.take() {
886                // Wait up to 5 seconds for the compaction task to finish.
887                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
888            }
889        }
890
891        // Drop the table handle when closing.
892        #[cfg(feature = "delta-lake")]
893        {
894            self.table = None;
895        }
896
897        self.state = ConnectorState::Closed;
898
899        info!(
900            table_path = %self.config.table_path,
901            delta_version = self.delta_version,
902            "Delta Lake sink connector closed"
903        );
904
905        Ok(())
906    }
907}
908
909impl std::fmt::Debug for DeltaLakeSink {
910    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
911        f.debug_struct("DeltaLakeSink")
912            .field("state", &self.state)
913            .field("table_path", &self.config.table_path)
914            .field("mode", &self.config.write_mode)
915            .field("guarantee", &self.config.delivery_guarantee)
916            .field("current_epoch", &self.current_epoch)
917            .field("last_committed_epoch", &self.last_committed_epoch)
918            .field("buffered_rows", &self.buffered_rows)
919            .field("delta_version", &self.delta_version)
920            .field("epoch_skipped", &self.epoch_skipped)
921            .finish_non_exhaustive()
922    }
923}
924
925// ── Helper functions ────────────────────────────────────────────────
926
927/// Filters a `RecordBatch` using a boolean mask and projects to the given column indices.
928///
929/// Uses Arrow's SIMD-optimized `filter` kernel instead of index-gather (`take`).
930fn filter_and_project(
931    batch: &RecordBatch,
932    mask: &[bool],
933    col_indices: &[usize],
934) -> Result<RecordBatch, ConnectorError> {
935    use arrow_array::BooleanArray;
936    use arrow_select::filter::filter_record_batch;
937
938    let bool_array = BooleanArray::from(mask.to_vec());
939
940    // Filter the full batch first (SIMD-optimized), then project columns.
941    let filtered = filter_record_batch(batch, &bool_array)
942        .map_err(|e| ConnectorError::Internal(format!("arrow filter failed: {e}")))?;
943
944    filtered
945        .project(col_indices)
946        .map_err(|e| ConnectorError::Internal(format!("batch projection failed: {e}")))
947}
948
949#[cfg(test)]
950#[allow(clippy::cast_possible_wrap)]
951#[allow(clippy::cast_precision_loss)]
952#[allow(clippy::float_cmp)]
953mod tests {
954    use super::*;
955    use arrow_array::{Float64Array, Int64Array, StringArray};
956    use arrow_schema::{DataType, Field, Schema};
957
958    fn test_schema() -> SchemaRef {
959        Arc::new(Schema::new(vec![
960            Field::new("id", DataType::Int64, false),
961            Field::new("name", DataType::Utf8, true),
962            Field::new("value", DataType::Float64, true),
963        ]))
964    }
965
966    fn test_config() -> DeltaLakeSinkConfig {
967        DeltaLakeSinkConfig::new("/tmp/delta_test")
968    }
969
970    fn upsert_config() -> DeltaLakeSinkConfig {
971        let mut cfg = test_config();
972        cfg.write_mode = DeltaWriteMode::Upsert;
973        cfg.merge_key_columns = vec!["id".to_string()];
974        cfg
975    }
976
977    fn test_batch(n: usize) -> RecordBatch {
978        let ids: Vec<i64> = (0..n as i64).collect();
979        let names: Vec<&str> = (0..n).map(|_| "test").collect();
980        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
981
982        RecordBatch::try_new(
983            test_schema(),
984            vec![
985                Arc::new(Int64Array::from(ids)),
986                Arc::new(StringArray::from(names)),
987                Arc::new(Float64Array::from(values)),
988            ],
989        )
990        .unwrap()
991    }
992
993    // ── Constructor tests ──
994
995    #[test]
996    fn test_new_defaults() {
997        let sink = DeltaLakeSink::new(test_config());
998        assert_eq!(sink.state(), ConnectorState::Created);
999        assert_eq!(sink.current_epoch(), 0);
1000        assert_eq!(sink.last_committed_epoch(), 0);
1001        assert_eq!(sink.buffered_rows(), 0);
1002        assert_eq!(sink.buffered_bytes(), 0);
1003        assert_eq!(sink.delta_version(), 0);
1004        assert!(sink.schema.is_none());
1005    }
1006
1007    #[test]
1008    fn test_with_schema() {
1009        let schema = test_schema();
1010        let sink = DeltaLakeSink::with_schema(test_config(), schema.clone());
1011        assert_eq!(sink.schema(), schema);
1012    }
1013
1014    #[test]
1015    fn test_schema_empty_when_none() {
1016        let sink = DeltaLakeSink::new(test_config());
1017        let schema = sink.schema();
1018        assert_eq!(schema.fields().len(), 0);
1019    }
1020
1021    // ── Batch size estimation ──
1022
1023    #[test]
1024    fn test_estimate_batch_size() {
1025        let batch = test_batch(100);
1026        let size = DeltaLakeSink::estimate_batch_size(&batch);
1027        assert!(size > 0);
1028    }
1029
1030    #[test]
1031    fn test_estimate_batch_size_empty() {
1032        let batch = RecordBatch::new_empty(test_schema());
1033        let size = DeltaLakeSink::estimate_batch_size(&batch);
1034        // Arrow arrays have baseline buffer allocation even with 0 rows,
1035        // so size may be small but not necessarily zero.
1036        assert!(size < 1024);
1037    }
1038
1039    // ── Should flush tests ──
1040
1041    #[test]
1042    fn test_should_flush_by_rows() {
1043        let mut config = test_config();
1044        config.max_buffer_records = 100;
1045        let mut sink = DeltaLakeSink::new(config);
1046        sink.buffered_rows = 99;
1047        assert!(!sink.should_flush());
1048        sink.buffered_rows = 100;
1049        assert!(sink.should_flush());
1050    }
1051
1052    #[test]
1053    fn test_should_flush_by_bytes() {
1054        let mut config = test_config();
1055        config.target_file_size = 1000;
1056        let mut sink = DeltaLakeSink::new(config);
1057        sink.buffered_bytes = 999;
1058        assert!(!sink.should_flush());
1059        sink.buffered_bytes = 1000;
1060        assert!(sink.should_flush());
1061    }
1062
1063    #[test]
1064    fn test_should_flush_empty() {
1065        let sink = DeltaLakeSink::new(test_config());
1066        assert!(!sink.should_flush());
1067    }
1068
1069    // ── Batch buffering tests ──
1070
1071    #[tokio::test]
1072    async fn test_write_batch_buffering() {
1073        let mut config = test_config();
1074        config.max_buffer_records = 100;
1075        let mut sink = DeltaLakeSink::new(config);
1076        sink.state = ConnectorState::Running;
1077
1078        let batch = test_batch(10);
1079        let result = sink.write_batch(&batch).await.unwrap();
1080
1081        // Should buffer, not flush (10 < 100)
1082        assert_eq!(result.records_written, 0);
1083        assert_eq!(sink.buffered_rows(), 10);
1084        assert!(sink.buffered_bytes() > 0);
1085    }
1086
1087    #[tokio::test]
1088    async fn test_write_batch_buffers_without_commit() {
1089        let mut config = test_config();
1090        config.max_buffer_records = 10;
1091        let mut sink = DeltaLakeSink::new(config);
1092        sink.state = ConnectorState::Running;
1093
1094        // Write batches that exceed the threshold — no mid-epoch commit.
1095        let batch = test_batch(6);
1096        sink.write_batch(&batch).await.unwrap();
1097        sink.write_batch(&batch).await.unwrap();
1098
1099        assert_eq!(sink.buffered_rows(), 12);
1100        assert_eq!(sink.buffer.len(), 2);
1101        assert_eq!(sink.pending_files, 0);
1102    }
1103
1104    #[tokio::test]
1105    async fn test_write_batch_empty() {
1106        let mut sink = DeltaLakeSink::new(test_config());
1107        sink.state = ConnectorState::Running;
1108
1109        let batch = test_batch(0);
1110        let result = sink.write_batch(&batch).await.unwrap();
1111        assert_eq!(result.records_written, 0);
1112        assert_eq!(sink.buffered_rows(), 0);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_write_batch_not_running() {
1117        let mut sink = DeltaLakeSink::new(test_config());
1118        // state is Created, not Running
1119
1120        let batch = test_batch(10);
1121        let result = sink.write_batch(&batch).await;
1122        assert!(result.is_err());
1123    }
1124
1125    #[tokio::test]
1126    async fn test_write_batch_sets_schema() {
1127        let mut sink = DeltaLakeSink::new(test_config());
1128        sink.state = ConnectorState::Running;
1129        assert!(sink.schema.is_none());
1130
1131        let batch = test_batch(5);
1132        sink.write_batch(&batch).await.unwrap();
1133        assert!(sink.schema.is_some());
1134        assert_eq!(sink.schema.unwrap().fields().len(), 3);
1135    }
1136
1137    #[tokio::test]
1138    async fn test_multiple_write_batches_accumulate() {
1139        let mut config = test_config();
1140        config.max_buffer_records = 100;
1141        let mut sink = DeltaLakeSink::new(config);
1142        sink.state = ConnectorState::Running;
1143
1144        let batch = test_batch(10);
1145        sink.write_batch(&batch).await.unwrap();
1146        sink.write_batch(&batch).await.unwrap();
1147        sink.write_batch(&batch).await.unwrap();
1148
1149        assert_eq!(sink.buffered_rows(), 30);
1150    }
1151
1152    // ── Epoch lifecycle tests ──
1153    // Note: These tests bypass open() and test business logic only.
1154    // With the delta-lake feature, commit_epoch() does real I/O which fails without a table.
1155    // See delta_io.rs for integration tests with real I/O.
1156
1157    #[cfg(not(feature = "delta-lake"))]
1158    #[tokio::test]
1159    async fn test_epoch_lifecycle() {
1160        let mut sink = DeltaLakeSink::new(test_config());
1161        sink.state = ConnectorState::Running;
1162
1163        // Begin epoch
1164        sink.begin_epoch(1).await.unwrap();
1165        assert_eq!(sink.current_epoch(), 1);
1166
1167        // Write some data
1168        let batch = test_batch(10);
1169        sink.write_batch(&batch).await.unwrap();
1170
1171        // Two-phase commit: pre_commit flushes buffer, commit_epoch commits
1172        sink.pre_commit(1).await.unwrap();
1173        assert_eq!(sink.buffered_rows(), 0);
1174        sink.commit_epoch(1).await.unwrap();
1175        assert_eq!(sink.last_committed_epoch(), 1);
1176        assert_eq!(sink.delta_version(), 1);
1177
1178        // Metrics should show 1 commit
1179        let m = sink.metrics();
1180        let commits = m.custom.iter().find(|(k, _)| k == "delta.commits");
1181        assert_eq!(commits.unwrap().1, 1.0);
1182    }
1183
1184    #[cfg(not(feature = "delta-lake"))]
1185    #[tokio::test]
1186    async fn test_epoch_skip_already_committed() {
1187        let mut config = test_config();
1188        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1189        let mut sink = DeltaLakeSink::new(config);
1190        sink.state = ConnectorState::Running;
1191
1192        // Commit epoch 1
1193        sink.begin_epoch(1).await.unwrap();
1194        let batch = test_batch(5);
1195        sink.write_batch(&batch).await.unwrap();
1196        sink.pre_commit(1).await.unwrap();
1197        sink.commit_epoch(1).await.unwrap();
1198        assert_eq!(sink.last_committed_epoch(), 1);
1199
1200        // Try to begin epoch 1 again (should skip)
1201        sink.begin_epoch(1).await.unwrap();
1202        // Should not have cleared the state for a new epoch
1203        // (skipped because already committed)
1204
1205        // Commit epoch 1 again (should be no-op due to idempotency)
1206        sink.pre_commit(1).await.unwrap();
1207        sink.commit_epoch(1).await.unwrap();
1208        assert_eq!(sink.last_committed_epoch(), 1);
1209        assert_eq!(sink.delta_version(), 1); // No new version
1210    }
1211
1212    #[cfg(not(feature = "delta-lake"))]
1213    #[tokio::test]
1214    async fn test_epoch_at_least_once_no_skip() {
1215        let mut config = test_config();
1216        config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
1217        let mut sink = DeltaLakeSink::new(config);
1218        sink.state = ConnectorState::Running;
1219
1220        sink.begin_epoch(1).await.unwrap();
1221        let batch = test_batch(5);
1222        sink.write_batch(&batch).await.unwrap();
1223        sink.pre_commit(1).await.unwrap();
1224        sink.commit_epoch(1).await.unwrap();
1225
1226        // Begin epoch 1 again (at-least-once doesn't skip)
1227        sink.begin_epoch(1).await.unwrap();
1228        assert_eq!(sink.current_epoch(), 1);
1229        assert_eq!(sink.buffered_rows(), 0); // Buffer was cleared
1230    }
1231
1232    #[tokio::test]
1233    async fn test_rollback_clears_buffer() {
1234        let mut config = test_config();
1235        config.max_buffer_records = 1000;
1236        let mut sink = DeltaLakeSink::new(config);
1237        sink.state = ConnectorState::Running;
1238
1239        let batch = test_batch(50);
1240        sink.write_batch(&batch).await.unwrap();
1241        assert_eq!(sink.buffered_rows(), 50);
1242
1243        sink.rollback_epoch(0).await.unwrap();
1244        assert_eq!(sink.buffered_rows(), 0);
1245        assert_eq!(sink.buffered_bytes(), 0);
1246        assert_eq!(sink.pending_files, 0);
1247    }
1248
1249    /// D001: Rollback after `pre_commit` must discard staged data.
1250    /// `pre_commit` stages batches; rollback discards them without writing to Delta.
1251    #[tokio::test]
1252    async fn test_rollback_after_pre_commit_discards_staged() {
1253        let mut config = test_config();
1254        config.max_buffer_records = 1000;
1255        let mut sink = DeltaLakeSink::new(config);
1256        sink.state = ConnectorState::Running;
1257
1258        sink.begin_epoch(1).await.unwrap();
1259        let batch = test_batch(50);
1260        sink.write_batch(&batch).await.unwrap();
1261        assert_eq!(sink.buffered_rows(), 50);
1262
1263        // pre_commit stages the buffer
1264        sink.pre_commit(1).await.unwrap();
1265        assert_eq!(sink.buffered_rows(), 0);
1266        assert_eq!(sink.staged_rows, 50);
1267        assert!(!sink.staged_batches.is_empty());
1268
1269        // rollback discards both buffer and staged data
1270        sink.rollback_epoch(1).await.unwrap();
1271        assert_eq!(sink.buffered_rows(), 0);
1272        assert_eq!(sink.staged_rows, 0);
1273        assert_eq!(sink.staged_bytes, 0);
1274        assert!(sink.staged_batches.is_empty());
1275        assert_eq!(sink.delta_version(), 0); // no Delta write occurred
1276    }
1277
1278    /// Staged data is preserved across `pre_commit` → failed commit → rollback.
1279    /// This verifies that `pre_commit` does not destroy staged state, so a
1280    /// subsequent rollback can discard it cleanly.
1281    #[tokio::test]
1282    async fn test_staged_data_preserved_until_commit_or_rollback() {
1283        let mut config = test_config();
1284        config.max_buffer_records = 1000;
1285        let mut sink = DeltaLakeSink::new(config);
1286        sink.state = ConnectorState::Running;
1287
1288        sink.begin_epoch(1).await.unwrap();
1289        sink.write_batch(&test_batch(25)).await.unwrap();
1290        sink.write_batch(&test_batch(25)).await.unwrap();
1291
1292        // pre_commit moves buffer → staged
1293        sink.pre_commit(1).await.unwrap();
1294        assert_eq!(sink.staged_rows, 50);
1295        assert_eq!(sink.staged_batches.len(), 2);
1296        assert_eq!(sink.buffered_rows(), 0);
1297
1298        // Simulate: commit_epoch would write to Delta, but without the
1299        // feature we exercise the local path. Verify staged state is
1300        // consumed only on success.
1301        // (Without delta-lake feature, commit_epoch calls commit_local
1302        // which succeeds.)
1303
1304        // Instead, test rollback: staged data should be discarded.
1305        sink.rollback_epoch(1).await.unwrap();
1306        assert!(sink.staged_batches.is_empty());
1307        assert_eq!(sink.staged_rows, 0);
1308        assert_eq!(sink.staged_bytes, 0);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_commit_empty_epoch() {
1313        let mut sink = DeltaLakeSink::new(test_config());
1314        sink.state = ConnectorState::Running;
1315
1316        sink.begin_epoch(1).await.unwrap();
1317        // No writes
1318        sink.commit_epoch(1).await.unwrap();
1319        assert_eq!(sink.last_committed_epoch(), 1);
1320        assert_eq!(sink.delta_version(), 0); // No version bump (no files)
1321    }
1322
1323    #[cfg(not(feature = "delta-lake"))]
1324    #[tokio::test]
1325    async fn test_sequential_epochs() {
1326        let mut sink = DeltaLakeSink::new(test_config());
1327        sink.state = ConnectorState::Running;
1328
1329        for epoch in 1..=5 {
1330            sink.begin_epoch(epoch).await.unwrap();
1331            let batch = test_batch(10);
1332            sink.write_batch(&batch).await.unwrap();
1333            sink.pre_commit(epoch).await.unwrap();
1334            sink.commit_epoch(epoch).await.unwrap();
1335        }
1336
1337        assert_eq!(sink.last_committed_epoch(), 5);
1338        assert_eq!(sink.delta_version(), 5);
1339    }
1340
1341    // ── Flush tests ──
1342    // Note: These tests bypass open() and test business logic only.
1343
1344    #[tokio::test]
1345    async fn test_flush_coalesces_buffer() {
1346        let mut sink = DeltaLakeSink::new(test_config());
1347        sink.state = ConnectorState::Running;
1348
1349        let batch = test_batch(10);
1350        sink.write_batch(&batch).await.unwrap();
1351        sink.write_batch(&batch).await.unwrap();
1352        assert_eq!(sink.buffer.len(), 2);
1353
1354        // flush() coalesces batches but does not write to Delta.
1355        sink.flush().await.unwrap();
1356        assert_eq!(sink.buffer.len(), 1);
1357        assert_eq!(sink.buffered_rows(), 20);
1358    }
1359
1360    // ── Open and close tests ──
1361    // Note: These tests use fake paths that don't exist.
1362    // With the delta-lake feature, open() tries to actually access the path.
1363    // See delta_io.rs for integration tests with real I/O.
1364
1365    #[cfg(not(feature = "delta-lake"))]
1366    #[tokio::test]
1367    async fn test_open_requires_feature() {
1368        let mut sink = DeltaLakeSink::new(test_config());
1369
1370        let connector_config = ConnectorConfig::new("delta-lake");
1371        let result = sink.open(&connector_config).await;
1372
1373        assert!(result.is_err());
1374        let err = result.unwrap_err().to_string();
1375        assert!(err.contains("delta-lake"), "error: {err}");
1376    }
1377
1378    #[tokio::test]
1379    async fn test_close() {
1380        let mut sink = DeltaLakeSink::new(test_config());
1381        sink.state = ConnectorState::Running;
1382
1383        sink.close().await.unwrap();
1384        assert_eq!(sink.state(), ConnectorState::Closed);
1385    }
1386
1387    #[cfg(not(feature = "delta-lake"))]
1388    #[tokio::test]
1389    async fn test_close_flushes_remaining() {
1390        let mut config = test_config();
1391        config.max_buffer_records = 1000;
1392        let mut sink = DeltaLakeSink::new(config);
1393        sink.state = ConnectorState::Running;
1394
1395        let batch = test_batch(30);
1396        sink.write_batch(&batch).await.unwrap();
1397        assert_eq!(sink.buffered_rows(), 30);
1398
1399        sink.close().await.unwrap();
1400        assert_eq!(sink.buffered_rows(), 0);
1401
1402        let m = sink.metrics();
1403        assert_eq!(m.records_total, 30);
1404    }
1405
1406    // ── Health check tests ──
1407
1408    #[test]
1409    fn test_health_check_created() {
1410        let sink = DeltaLakeSink::new(test_config());
1411        assert_eq!(sink.health_check(), HealthStatus::Unknown);
1412    }
1413
1414    #[test]
1415    fn test_health_check_running() {
1416        let mut sink = DeltaLakeSink::new(test_config());
1417        sink.state = ConnectorState::Running;
1418        assert_eq!(sink.health_check(), HealthStatus::Healthy);
1419    }
1420
1421    #[test]
1422    fn test_health_check_closed() {
1423        let mut sink = DeltaLakeSink::new(test_config());
1424        sink.state = ConnectorState::Closed;
1425        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1426    }
1427
1428    #[test]
1429    fn test_health_check_failed() {
1430        let mut sink = DeltaLakeSink::new(test_config());
1431        sink.state = ConnectorState::Failed;
1432        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1433    }
1434
1435    #[test]
1436    fn test_health_check_paused() {
1437        let mut sink = DeltaLakeSink::new(test_config());
1438        sink.state = ConnectorState::Paused;
1439        assert!(matches!(sink.health_check(), HealthStatus::Degraded(_)));
1440    }
1441
1442    // ── Capabilities tests ──
1443
1444    #[test]
1445    fn test_capabilities_append_exactly_once() {
1446        let mut config = test_config();
1447        config.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1448        let sink = DeltaLakeSink::new(config);
1449        let caps = sink.capabilities();
1450        assert!(caps.exactly_once);
1451        assert!(caps.idempotent);
1452        assert!(!caps.upsert);
1453        assert!(!caps.changelog);
1454        assert!(!caps.schema_evolution);
1455        assert!(!caps.partitioned);
1456    }
1457
1458    #[test]
1459    fn test_capabilities_upsert() {
1460        let sink = DeltaLakeSink::new(upsert_config());
1461        let caps = sink.capabilities();
1462        assert!(caps.upsert);
1463        assert!(caps.changelog);
1464        assert!(caps.idempotent);
1465    }
1466
1467    #[test]
1468    fn test_capabilities_schema_evolution() {
1469        let mut config = test_config();
1470        config.schema_evolution = true;
1471        let sink = DeltaLakeSink::new(config);
1472        let caps = sink.capabilities();
1473        assert!(caps.schema_evolution);
1474    }
1475
1476    #[test]
1477    fn test_capabilities_partitioned() {
1478        let mut config = test_config();
1479        config.partition_columns = vec!["trade_date".to_string()];
1480        let sink = DeltaLakeSink::new(config);
1481        let caps = sink.capabilities();
1482        assert!(caps.partitioned);
1483    }
1484
1485    #[test]
1486    fn test_capabilities_at_least_once() {
1487        let mut config = test_config();
1488        config.delivery_guarantee = DeliveryGuarantee::AtLeastOnce;
1489        let sink = DeltaLakeSink::new(config);
1490        let caps = sink.capabilities();
1491        assert!(!caps.exactly_once);
1492        assert!(caps.idempotent);
1493    }
1494
1495    // ── Metrics tests ──
1496
1497    #[test]
1498    fn test_metrics_initial() {
1499        let sink = DeltaLakeSink::new(test_config());
1500        let m = sink.metrics();
1501        assert_eq!(m.records_total, 0);
1502        assert_eq!(m.bytes_total, 0);
1503        assert_eq!(m.errors_total, 0);
1504    }
1505
1506    // When delta-lake feature is enabled, this triggers auto-flush which
1507    // needs a real table. See delta_io::tests for integration coverage.
1508    #[cfg(not(feature = "delta-lake"))]
1509    #[tokio::test]
1510    async fn test_metrics_after_commit() {
1511        let mut sink = DeltaLakeSink::new(test_config());
1512        sink.state = ConnectorState::Running;
1513
1514        let batch = test_batch(10);
1515        sink.write_batch(&batch).await.unwrap();
1516        assert_eq!(sink.buffered_rows(), 10);
1517
1518        // Metrics are recorded on commit_epoch, not on pre_commit.
1519        sink.pre_commit(0).await.unwrap();
1520        sink.commit_epoch(0).await.unwrap();
1521        let m = sink.metrics();
1522        assert_eq!(m.records_total, 10);
1523        assert!(m.bytes_total > 0);
1524    }
1525
1526    // ── Changelog splitting tests ──
1527
1528    fn changelog_schema() -> SchemaRef {
1529        Arc::new(Schema::new(vec![
1530            Field::new("id", DataType::Int64, false),
1531            Field::new("name", DataType::Utf8, true),
1532            Field::new("_op", DataType::Utf8, false),
1533            Field::new("_ts_ms", DataType::Int64, false),
1534        ]))
1535    }
1536
1537    fn changelog_batch() -> RecordBatch {
1538        RecordBatch::try_new(
1539            changelog_schema(),
1540            vec![
1541                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
1542                Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
1543                Arc::new(StringArray::from(vec!["I", "U", "D", "I", "D"])),
1544                Arc::new(Int64Array::from(vec![100, 200, 300, 400, 500])),
1545            ],
1546        )
1547        .unwrap()
1548    }
1549
1550    #[test]
1551    fn test_split_changelog_batch() {
1552        let batch = changelog_batch();
1553        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1554
1555        // Inserts: rows 0 (I), 1 (U), 3 (I) = 3 rows
1556        assert_eq!(inserts.num_rows(), 3);
1557        // Deletes: rows 2 (D), 4 (D) = 2 rows
1558        assert_eq!(deletes.num_rows(), 2);
1559
1560        // Metadata columns should be stripped
1561        assert_eq!(inserts.num_columns(), 2); // id, name only
1562        assert_eq!(deletes.num_columns(), 2);
1563
1564        // Verify insert values
1565        let insert_ids = inserts
1566            .column(0)
1567            .as_any()
1568            .downcast_ref::<Int64Array>()
1569            .unwrap();
1570        assert_eq!(insert_ids.value(0), 1);
1571        assert_eq!(insert_ids.value(1), 2);
1572        assert_eq!(insert_ids.value(2), 4);
1573
1574        // Verify delete values
1575        let delete_ids = deletes
1576            .column(0)
1577            .as_any()
1578            .downcast_ref::<Int64Array>()
1579            .unwrap();
1580        assert_eq!(delete_ids.value(0), 3);
1581        assert_eq!(delete_ids.value(1), 5);
1582    }
1583
1584    #[test]
1585    fn test_split_changelog_all_inserts() {
1586        let schema = changelog_schema();
1587        let batch = RecordBatch::try_new(
1588            schema,
1589            vec![
1590                Arc::new(Int64Array::from(vec![1, 2])),
1591                Arc::new(StringArray::from(vec!["a", "b"])),
1592                Arc::new(StringArray::from(vec!["I", "I"])),
1593                Arc::new(Int64Array::from(vec![100, 200])),
1594            ],
1595        )
1596        .unwrap();
1597
1598        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1599        assert_eq!(inserts.num_rows(), 2);
1600        assert_eq!(deletes.num_rows(), 0);
1601    }
1602
1603    #[test]
1604    fn test_split_changelog_all_deletes() {
1605        let schema = changelog_schema();
1606        let batch = RecordBatch::try_new(
1607            schema,
1608            vec![
1609                Arc::new(Int64Array::from(vec![1, 2])),
1610                Arc::new(StringArray::from(vec!["a", "b"])),
1611                Arc::new(StringArray::from(vec!["D", "D"])),
1612                Arc::new(Int64Array::from(vec![100, 200])),
1613            ],
1614        )
1615        .unwrap();
1616
1617        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1618        assert_eq!(inserts.num_rows(), 0);
1619        assert_eq!(deletes.num_rows(), 2);
1620    }
1621
1622    #[test]
1623    fn test_split_changelog_missing_op_column() {
1624        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1625        let batch =
1626            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
1627
1628        let result = DeltaLakeSink::split_changelog_batch(&batch);
1629        assert!(result.is_err());
1630    }
1631
1632    #[test]
1633    fn test_split_changelog_snapshot_read() {
1634        let schema = changelog_schema();
1635        let batch = RecordBatch::try_new(
1636            schema,
1637            vec![
1638                Arc::new(Int64Array::from(vec![1])),
1639                Arc::new(StringArray::from(vec!["a"])),
1640                Arc::new(StringArray::from(vec!["r"])), // snapshot read
1641                Arc::new(Int64Array::from(vec![100])),
1642            ],
1643        )
1644        .unwrap();
1645
1646        let (inserts, deletes) = DeltaLakeSink::split_changelog_batch(&batch).unwrap();
1647        assert_eq!(inserts.num_rows(), 1);
1648        assert_eq!(deletes.num_rows(), 0);
1649    }
1650
1651    // ── Debug output test ──
1652
1653    #[test]
1654    fn test_debug_output() {
1655        let sink = DeltaLakeSink::new(test_config());
1656        let debug = format!("{sink:?}");
1657        assert!(debug.contains("DeltaLakeSink"));
1658        assert!(debug.contains("/tmp/delta_test"));
1659    }
1660}