Skip to main content

laminar_connectors/files/
sink.rs

1//! File sink connector implementing [`SinkConnector`].
2//!
3//! Supports two modes:
4//! - **Append**: writes directly to the output file, flushing on pre-commit.
5//! - **Rolling**: creates a new `.tmp` file per epoch, atomically renamed on commit.
6//!
7//! For bulk formats (Parquet), mid-epoch rotation is disabled — files are
8//! written as a single unit on `pre_commit()`.
9
10use std::io::{BufWriter, Write};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::{debug, info};
19
20use crate::config::ConnectorConfig;
21use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
22use crate::error::ConnectorError;
23use crate::health::HealthStatus;
24use crate::schema::traits::FormatEncoder;
25
26use super::config::{FileFormat, FileSinkConfig, SinkMode};
27
28/// File sink connector with append or rolling mode.
29pub struct FileSink {
30    /// Parsed configuration.
31    config: Option<FileSinkConfig>,
32    /// Output schema.
33    schema: SchemaRef,
34    /// Format encoder.
35    encoder: Option<Box<dyn FormatEncoder>>,
36    /// Current epoch.
37    current_epoch: u64,
38    /// Buffered batches for the current epoch (Parquet bulk writes only).
39    epoch_batches: Vec<RecordBatch>,
40    /// Current segment index within epoch (for mid-epoch rotation).
41    current_segment: usize,
42    /// Bytes written in current segment.
43    segment_bytes: u64,
44    /// Buffered file writer for the current segment (row formats only).
45    writer: Option<BufWriter<std::fs::File>>,
46    /// Active .tmp file paths in the current epoch.
47    active_tmp_files: Vec<PathBuf>,
48    /// Whether the sink is open.
49    is_open: bool,
50}
51
52impl FileSink {
53    /// Creates a new file sink with a placeholder schema.
54    #[must_use]
55    pub fn new() -> Self {
56        Self::with_registry(None)
57    }
58
59    /// Creates a new file sink with an optional Prometheus registry.
60    #[must_use]
61    pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
62        Self {
63            config: None,
64            schema: Arc::new(arrow_schema::Schema::empty()),
65            encoder: None,
66            current_epoch: 0,
67            epoch_batches: Vec::new(),
68            current_segment: 0,
69            segment_bytes: 0,
70            writer: None,
71            active_tmp_files: Vec::new(),
72            is_open: false,
73        }
74    }
75
76    /// Opens (or rotates to) a new `.tmp` segment file.
77    fn open_segment(&mut self) -> Result<(), ConnectorError> {
78        let config = self
79            .config
80            .as_ref()
81            .ok_or_else(|| ConnectorError::InvalidState {
82                expected: "configured".into(),
83                actual: "unconfigured".into(),
84            })?;
85
86        let filename = format!(
87            "{}_{:06}_{:03}.{}.tmp",
88            config.prefix,
89            self.current_epoch,
90            self.current_segment,
91            config.format.extension()
92        );
93        let path = Path::new(&config.path).join(&filename);
94
95        let file = std::fs::OpenOptions::new()
96            .create(true)
97            .write(true)
98            .truncate(true)
99            .open(&path)
100            .map_err(|e| {
101                ConnectorError::WriteError(format!("cannot open '{}': {e}", path.display()))
102            })?;
103
104        self.writer = Some(BufWriter::new(file));
105        self.active_tmp_files.push(path);
106        self.segment_bytes = 0;
107        Ok(())
108    }
109
110    /// Ensures a writer is open for the current segment.
111    fn ensure_writer(&mut self) -> Result<(), ConnectorError> {
112        if self.writer.is_none() {
113            self.open_segment()?;
114        }
115        Ok(())
116    }
117
118    /// Closes the current writer (flushes `BufWriter`) on the blocking
119    /// thread pool so the async runtime is not stalled.
120    async fn close_writer_async(&mut self) -> Result<(), ConnectorError> {
121        let Some(writer) = self.writer.take() else {
122            return Ok(());
123        };
124        tokio::task::spawn_blocking(move || -> Result<(), ConnectorError> {
125            let mut w = writer;
126            w.flush()
127                .map_err(|e| ConnectorError::WriteError(format!("flush error: {e}")))
128        })
129        .await
130        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
131    }
132}
133
134impl Default for FileSink {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl std::fmt::Debug for FileSink {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("FileSink")
143            .field("is_open", &self.is_open)
144            .field("current_epoch", &self.current_epoch)
145            .field("epoch_batches", &self.epoch_batches.len())
146            .field("active_tmp_files", &self.active_tmp_files.len())
147            .finish()
148    }
149}
150
151#[async_trait]
152impl SinkConnector for FileSink {
153    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
154        let sink_config = FileSinkConfig::from_connector_config(config)?;
155
156        // Ensure output directory exists.
157        let out_dir = Path::new(&sink_config.path);
158        let out_dir_owned = out_dir.to_path_buf();
159        let prefix = sink_config.prefix.clone();
160        tokio::task::spawn_blocking(move || {
161            if !out_dir_owned.exists() {
162                std::fs::create_dir_all(&out_dir_owned)?;
163            }
164            cleanup_tmp_files(&out_dir_owned, &prefix);
165            Ok::<(), std::io::Error>(())
166        })
167        .await
168        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
169        .map_err(|e| ConnectorError::WriteError(format!("cannot init output dir: {e}")))?;
170
171        // Resolve schema from connector config.
172        let schema = config
173            .arrow_schema()
174            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()));
175
176        let encoder = build_encoder(sink_config.format, &schema, &sink_config)?;
177
178        self.config = Some(sink_config);
179        self.schema = schema;
180        self.encoder = Some(encoder);
181        self.is_open = true;
182
183        info!("file sink opened");
184        Ok(())
185    }
186
187    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
188        let is_bulk = self
189            .config
190            .as_ref()
191            .ok_or_else(|| ConnectorError::InvalidState {
192                expected: "open".into(),
193                actual: "closed".into(),
194            })?
195            .format
196            .is_bulk_format();
197        let max_file_size = self.config.as_ref().and_then(|c| c.max_file_size);
198
199        if batch.num_rows() == 0 {
200            return Ok(WriteResult::new(0, 0));
201        }
202
203        let rows = batch.num_rows();
204
205        if is_bulk {
206            let max_epoch = self.config.as_ref().map_or(10_000, |c| c.max_epoch_batches);
207            if self.epoch_batches.len() >= max_epoch {
208                return Err(ConnectorError::WriteError(format!(
209                    "file sink: epoch batch buffer full ({max_epoch} batches) — \
210                     increase max_epoch_batches or flush more frequently"
211                )));
212            }
213            // Buffer for bulk write on pre_commit (Parquet).
214            self.epoch_batches.push(batch.clone());
215            return Ok(WriteResult::new(rows, 0));
216        }
217
218        // Row format: encode in-memory, then run the blocking write
219        // loop on the blocking pool so it can't stall the runtime.
220        let encoded = self
221            .encoder
222            .as_ref()
223            .ok_or_else(|| ConnectorError::InvalidState {
224                expected: "encoder ready".into(),
225                actual: "no encoder".into(),
226            })?
227            .encode_batch(batch)
228            .map_err(|e| ConnectorError::WriteError(format!("encode error: {e}")))?;
229
230        self.ensure_writer()?;
231        let mut writer = self.writer.take().expect("ensure_writer just ran");
232        let (writer, bytes_written) =
233            tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
234                let mut total: u64 = 0;
235                for record_bytes in &encoded {
236                    writer
237                        .write_all(record_bytes)
238                        .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
239                    writer
240                        .write_all(b"\n")
241                        .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
242                    total += record_bytes.len() as u64 + 1;
243                }
244                Ok((writer, total))
245            })
246            .await
247            .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
248        self.writer = Some(writer);
249        self.segment_bytes += bytes_written;
250
251        // Mid-epoch rotation for row formats.
252        if let Some(max_size) = max_file_size {
253            if self.segment_bytes >= max_size as u64 {
254                debug!("file sink: rotating at {} bytes", self.segment_bytes);
255                self.close_writer_async().await?;
256                self.current_segment += 1;
257            }
258        }
259
260        Ok(WriteResult::new(rows, bytes_written))
261    }
262
263    fn schema(&self) -> SchemaRef {
264        self.schema.clone()
265    }
266
267    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
268        self.current_epoch = epoch;
269        self.epoch_batches.clear();
270        self.current_segment = 0;
271        self.segment_bytes = 0;
272        self.close_writer_async().await?;
273        self.active_tmp_files.clear();
274        Ok(())
275    }
276
277    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
278        let is_bulk = self
279            .config
280            .as_ref()
281            .map_or(false, |c| c.format.is_bulk_format());
282
283        if is_bulk && !self.epoch_batches.is_empty() {
284            // Flush all buffered batches as a single Parquet file.
285            let encoder = self
286                .encoder
287                .as_ref()
288                .ok_or_else(|| ConnectorError::InvalidState {
289                    expected: "encoder ready".into(),
290                    actual: "no encoder".into(),
291                })?;
292
293            let combined = if self.epoch_batches.len() == 1 {
294                self.epoch_batches[0].clone()
295            } else {
296                arrow_select::concat::concat_batches(&self.schema, &self.epoch_batches)
297                    .map_err(|e| ConnectorError::WriteError(format!("batch concat error: {e}")))?
298            };
299
300            let encoded = encoder
301                .encode_batch(&combined)
302                .map_err(|e| ConnectorError::WriteError(format!("Parquet encode error: {e}")))?;
303
304            // Parquet encoder returns exactly one file blob. Run the
305            // file write on the blocking pool.
306            if let Some(file_bytes) = encoded.into_iter().next() {
307                self.open_segment()?;
308                let writer = self.writer.take().expect("open_segment just ran");
309                let writer = tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
310                    let mut w = writer;
311                    w.write_all(&file_bytes)
312                        .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
313                    Ok(w)
314                })
315                .await
316                .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
317                self.writer = Some(writer);
318            }
319            self.epoch_batches.clear();
320        }
321
322        // Flush and close the buffered writer (on the blocking pool).
323        self.close_writer_async().await?;
324
325        // Fsync all tmp files. Propagate errors — fsync failure means data
326        // is not durable and we must not let the checkpoint proceed.
327        let paths: Vec<PathBuf> = self.active_tmp_files.clone();
328        tokio::task::spawn_blocking(move || {
329            for path in &paths {
330                if path.exists() {
331                    let f = std::fs::OpenOptions::new()
332                        .write(true)
333                        .open(path)
334                        .map_err(|e| {
335                            ConnectorError::WriteError(format!(
336                                "cannot open '{}' for fsync: {e}",
337                                path.display()
338                            ))
339                        })?;
340                    f.sync_all().map_err(|e| {
341                        ConnectorError::WriteError(format!(
342                            "fsync failed on '{}': {e}",
343                            path.display()
344                        ))
345                    })?;
346                }
347            }
348            Ok::<(), ConnectorError>(())
349        })
350        .await
351        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
352    }
353
354    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
355        let is_append = self
356            .config
357            .as_ref()
358            .map_or(false, |c| c.mode == SinkMode::Append);
359
360        if is_append {
361            return Ok(());
362        }
363
364        // Rolling mode: rename .tmp → final.
365        for tmp_path in &self.active_tmp_files {
366            if !tmp_path.exists() {
367                continue;
368            }
369            let final_name = tmp_path
370                .file_name()
371                .and_then(|n| n.to_str())
372                .and_then(|n| n.strip_suffix(".tmp"))
373                .ok_or_else(|| {
374                    ConnectorError::WriteError(format!(
375                        "tmp file '{}' has no .tmp suffix — cannot commit",
376                        tmp_path.display()
377                    ))
378                })?;
379            let final_path = tmp_path.parent().unwrap_or(Path::new(".")).join(final_name);
380
381            std::fs::rename(tmp_path, &final_path).map_err(|e| {
382                ConnectorError::WriteError(format!(
383                    "cannot rename '{}' -> '{}': {e}",
384                    tmp_path.display(),
385                    final_path.display()
386                ))
387            })?;
388            debug!("file sink: committed {}", final_path.display());
389        }
390
391        self.active_tmp_files.clear();
392        Ok(())
393    }
394
395    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
396        self.close_writer_async().await?;
397        for tmp_path in &self.active_tmp_files {
398            if tmp_path.exists() {
399                let _ = std::fs::remove_file(tmp_path);
400                debug!("file sink: rolled back {}", tmp_path.display());
401            }
402        }
403        self.active_tmp_files.clear();
404        self.epoch_batches.clear();
405        Ok(())
406    }
407
408    fn health_check(&self) -> HealthStatus {
409        if self.is_open {
410            HealthStatus::Healthy
411        } else {
412            HealthStatus::Unknown
413        }
414    }
415
416    fn capabilities(&self) -> SinkConnectorCapabilities {
417        SinkConnectorCapabilities::new(Duration::from_secs(30))
418            .with_exactly_once()
419            .with_two_phase_commit()
420    }
421
422    async fn close(&mut self) -> Result<(), ConnectorError> {
423        self.close_writer_async().await?;
424        for tmp_path in &self.active_tmp_files {
425            if tmp_path.exists() {
426                let _ = std::fs::remove_file(tmp_path);
427            }
428        }
429        self.active_tmp_files.clear();
430        self.is_open = false;
431        info!("file sink closed");
432        Ok(())
433    }
434}
435
436// ── Helpers ──────────────────────────────────────────────────────────
437
438fn build_encoder(
439    format: FileFormat,
440    schema: &SchemaRef,
441    config: &FileSinkConfig,
442) -> Result<Box<dyn FormatEncoder>, ConnectorError> {
443    match format {
444        FileFormat::Csv => {
445            let csv_config = crate::schema::CsvEncoderConfig {
446                delimiter: b',',
447                has_header: false,
448            };
449            let encoder = crate::schema::CsvEncoder::with_config(schema.clone(), csv_config);
450            Ok(Box::new(encoder))
451        }
452        FileFormat::Json | FileFormat::Text => {
453            let encoder = crate::schema::JsonEncoder::new(schema.clone());
454            Ok(Box::new(encoder))
455        }
456        FileFormat::Parquet => {
457            use parquet::basic::Compression;
458            let compression = match config.compression.to_lowercase().as_str() {
459                "none" | "uncompressed" => Compression::UNCOMPRESSED,
460                "snappy" => Compression::SNAPPY,
461                "gzip" => Compression::GZIP(parquet::basic::GzipLevel::default()),
462                "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
463                "lz4" => Compression::LZ4,
464                other => {
465                    return Err(ConnectorError::ConfigurationError(format!(
466                        "unknown Parquet compression: '{other}'"
467                    )));
468                }
469            };
470            let parquet_config = crate::schema::parquet::ParquetEncoderConfig::default()
471                .with_compression(compression);
472            let encoder =
473                crate::schema::parquet::ParquetEncoder::with_config(schema.clone(), parquet_config);
474            Ok(Box::new(encoder))
475        }
476        FileFormat::ArrowIpc => {
477            let encoder = super::arrow_ipc_codec::ArrowIpcEncoder::new(schema.clone());
478            Ok(Box::new(encoder))
479        }
480    }
481}
482
483/// Removes orphaned `.tmp` files matching the given prefix from a previous crash.
484fn cleanup_tmp_files(dir: &Path, prefix: &str) {
485    let entries = match std::fs::read_dir(dir) {
486        Ok(e) => e,
487        Err(_) => return,
488    };
489    for entry in entries.flatten() {
490        let path = entry.path();
491        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
492            if name.starts_with(prefix) && name.ends_with(".tmp") {
493                info!("file sink: removing orphaned tmp file: {}", path.display());
494                let _ = std::fs::remove_file(&path);
495            }
496        }
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503    use arrow_array::{Int64Array, StringArray};
504    use arrow_schema::{DataType, Field, Schema};
505
506    fn test_schema() -> SchemaRef {
507        Arc::new(Schema::new(vec![
508            Field::new("id", DataType::Int64, false),
509            Field::new("name", DataType::Utf8, true),
510        ]))
511    }
512
513    fn test_batch(schema: &SchemaRef) -> RecordBatch {
514        RecordBatch::try_new(
515            schema.clone(),
516            vec![
517                Arc::new(Int64Array::from(vec![1, 2, 3])),
518                Arc::new(StringArray::from(vec!["a", "b", "c"])),
519            ],
520        )
521        .unwrap()
522    }
523
524    #[test]
525    fn test_sink_default() {
526        let sink = FileSink::new();
527        assert!(!sink.is_open);
528        assert_eq!(sink.current_epoch, 0);
529    }
530
531    #[tokio::test]
532    async fn test_sink_open_creates_dir() {
533        let dir = tempfile::tempdir().unwrap();
534        let out_path = dir.path().join("output");
535
536        let mut sink = FileSink::new();
537        let mut config = ConnectorConfig::new("files");
538        config.set("path", out_path.to_str().unwrap());
539        config.set("format", "json");
540
541        sink.open(&config).await.unwrap();
542        assert!(sink.is_open);
543        assert!(out_path.exists());
544        sink.close().await.unwrap();
545    }
546
547    #[tokio::test]
548    async fn test_sink_rolling_json_lifecycle() {
549        let dir = tempfile::tempdir().unwrap();
550        let out_path = dir.path().join("output");
551
552        let mut sink = FileSink::new();
553        let mut config = ConnectorConfig::new("files");
554        config.set("path", out_path.to_str().unwrap());
555        config.set("format", "json");
556
557        sink.open(&config).await.unwrap();
558        sink.begin_epoch(1).await.unwrap();
559
560        let schema = test_schema();
561        let batch = test_batch(&schema);
562        let result = sink.write_batch(&batch).await.unwrap();
563        assert_eq!(result.records_written, 3);
564
565        sink.pre_commit(1).await.unwrap();
566        sink.commit_epoch(1).await.unwrap();
567
568        // Check that a final file exists (not .tmp).
569        let files: Vec<_> = std::fs::read_dir(&out_path)
570            .unwrap()
571            .flatten()
572            .filter(|e| {
573                let name = e.file_name();
574                let n = name.to_str().unwrap();
575                !n.ends_with(".tmp")
576            })
577            .collect();
578        assert!(!files.is_empty(), "expected committed file in output dir");
579
580        sink.close().await.unwrap();
581    }
582
583    #[tokio::test]
584    async fn test_sink_rollback_deletes_tmp() {
585        let dir = tempfile::tempdir().unwrap();
586        let out_path = dir.path().join("output");
587
588        let mut sink = FileSink::new();
589        let mut config = ConnectorConfig::new("files");
590        config.set("path", out_path.to_str().unwrap());
591        config.set("format", "json");
592
593        sink.open(&config).await.unwrap();
594        sink.begin_epoch(1).await.unwrap();
595
596        let schema = test_schema();
597        let batch = test_batch(&schema);
598        sink.write_batch(&batch).await.unwrap();
599
600        sink.rollback_epoch(1).await.unwrap();
601
602        let tmp_count = std::fs::read_dir(&out_path)
603            .unwrap()
604            .flatten()
605            .filter(|e| e.file_name().to_str().unwrap().ends_with(".tmp"))
606            .count();
607        assert_eq!(
608            tmp_count, 0,
609            "tmp files should be cleaned up after rollback"
610        );
611
612        sink.close().await.unwrap();
613    }
614
615    #[tokio::test]
616    async fn test_cleanup_tmp_on_open() {
617        let dir = tempfile::tempdir().unwrap();
618        let out_path = dir.path().join("output");
619        std::fs::create_dir_all(&out_path).unwrap();
620
621        // Create orphaned .tmp files — only the ones matching our prefix should be removed.
622        std::fs::write(out_path.join("part_000001_000.jsonl.tmp"), b"orphan").unwrap();
623        std::fs::write(out_path.join("other_000001_000.jsonl.tmp"), b"keep").unwrap();
624
625        let mut sink = FileSink::new();
626        let mut config = ConnectorConfig::new("files");
627        config.set("path", out_path.to_str().unwrap());
628        config.set("format", "json");
629
630        sink.open(&config).await.unwrap();
631
632        // Only "part_*" orphaned tmp should be removed.
633        assert!(!out_path.join("part_000001_000.jsonl.tmp").exists());
634        assert!(out_path.join("other_000001_000.jsonl.tmp").exists());
635
636        sink.close().await.unwrap();
637    }
638
639    #[tokio::test]
640    async fn test_capabilities() {
641        let sink = FileSink::new();
642        let caps = sink.capabilities();
643        assert!(caps.exactly_once);
644        assert!(caps.two_phase_commit);
645    }
646}