Skip to main content

laminar_connectors/files/
sink.rs

1//! File sink connector implementing [`SinkConnector`].
2//!
3//! Supports two modes:
4//! - **Append**: writes directly to the output file, flushing on pre-commit.
5//! - **Rolling**: creates a new `.tmp` file per epoch, atomically renamed on commit.
6//!
7//! For bulk formats (Parquet), mid-epoch rotation is disabled — files are
8//! written as a single unit on `pre_commit()`.
9
10use std::io::{BufWriter, Write};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::{debug, info};
19
20use crate::config::ConnectorConfig;
21use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
22use crate::error::ConnectorError;
23use crate::schema::traits::FormatEncoder;
24
25use super::config::{FileFormat, FileSinkConfig, SinkMode};
26
27/// File sink connector with append or rolling mode.
28pub struct FileSink {
29    /// Parsed configuration.
30    config: Option<FileSinkConfig>,
31    /// Output schema.
32    schema: SchemaRef,
33    /// Format encoder.
34    encoder: Option<Box<dyn FormatEncoder>>,
35    /// Current epoch.
36    current_epoch: u64,
37    /// Buffered batches for the current epoch (Parquet bulk writes only).
38    epoch_batches: Vec<RecordBatch>,
39    /// Current segment index within epoch (for mid-epoch rotation).
40    current_segment: usize,
41    /// Bytes written in current segment.
42    segment_bytes: u64,
43    /// Buffered file writer for the current segment (row formats only).
44    writer: Option<BufWriter<std::fs::File>>,
45    /// Active .tmp file paths in the current epoch.
46    active_tmp_files: Vec<PathBuf>,
47    /// Whether the sink is open.
48    is_open: bool,
49}
50
51impl FileSink {
52    /// Creates a new file sink with a placeholder schema.
53    #[must_use]
54    pub fn new() -> Self {
55        Self::with_registry(None)
56    }
57
58    /// Creates a new file sink with an optional Prometheus registry.
59    #[must_use]
60    pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
61        Self {
62            config: None,
63            schema: Arc::new(arrow_schema::Schema::empty()),
64            encoder: None,
65            current_epoch: 0,
66            epoch_batches: Vec::new(),
67            current_segment: 0,
68            segment_bytes: 0,
69            writer: None,
70            active_tmp_files: Vec::new(),
71            is_open: false,
72        }
73    }
74
75    /// Opens (or rotates to) a new `.tmp` segment file.
76    fn open_segment(&mut self) -> Result<(), ConnectorError> {
77        let config = self
78            .config
79            .as_ref()
80            .ok_or_else(|| ConnectorError::InvalidState {
81                expected: "configured".into(),
82                actual: "unconfigured".into(),
83            })?;
84
85        let filename = format!(
86            "{}_{:06}_{:03}.{}.tmp",
87            config.prefix,
88            self.current_epoch,
89            self.current_segment,
90            config.format.extension()
91        );
92        let path = Path::new(&config.path).join(&filename);
93
94        let file = std::fs::OpenOptions::new()
95            .create(true)
96            .write(true)
97            .truncate(true)
98            .open(&path)
99            .map_err(|e| {
100                ConnectorError::WriteError(format!("cannot open '{}': {e}", path.display()))
101            })?;
102
103        self.writer = Some(BufWriter::new(file));
104        self.active_tmp_files.push(path);
105        self.segment_bytes = 0;
106        Ok(())
107    }
108
109    /// Ensures a writer is open for the current segment.
110    fn ensure_writer(&mut self) -> Result<(), ConnectorError> {
111        if self.writer.is_none() {
112            self.open_segment()?;
113        }
114        Ok(())
115    }
116
117    /// Closes the current writer (flushes `BufWriter`) on the blocking
118    /// thread pool so the async runtime is not stalled.
119    async fn close_writer_async(&mut self) -> Result<(), ConnectorError> {
120        let Some(writer) = self.writer.take() else {
121            return Ok(());
122        };
123        tokio::task::spawn_blocking(move || -> Result<(), ConnectorError> {
124            let mut w = writer;
125            w.flush()
126                .map_err(|e| ConnectorError::WriteError(format!("flush error: {e}")))
127        })
128        .await
129        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
130    }
131}
132
133impl Default for FileSink {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl std::fmt::Debug for FileSink {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("FileSink")
142            .field("is_open", &self.is_open)
143            .field("current_epoch", &self.current_epoch)
144            .field("epoch_batches", &self.epoch_batches.len())
145            .field("active_tmp_files", &self.active_tmp_files.len())
146            .finish()
147    }
148}
149
150#[async_trait]
151impl SinkConnector for FileSink {
152    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
153        let sink_config = FileSinkConfig::from_connector_config(config)?;
154
155        // Ensure output directory exists.
156        let out_dir = Path::new(&sink_config.path);
157        let out_dir_owned = out_dir.to_path_buf();
158        let prefix = sink_config.prefix.clone();
159        tokio::task::spawn_blocking(move || {
160            if !out_dir_owned.exists() {
161                std::fs::create_dir_all(&out_dir_owned)?;
162            }
163            cleanup_tmp_files(&out_dir_owned, &prefix);
164            Ok::<(), std::io::Error>(())
165        })
166        .await
167        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
168        .map_err(|e| ConnectorError::WriteError(format!("cannot init output dir: {e}")))?;
169
170        // Resolve schema from connector config.
171        let schema = config
172            .arrow_schema()
173            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()));
174
175        let encoder = build_encoder(sink_config.format, &schema, &sink_config)?;
176
177        self.config = Some(sink_config);
178        self.schema = schema;
179        self.encoder = Some(encoder);
180        self.is_open = true;
181
182        info!("file sink opened");
183        Ok(())
184    }
185
186    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
187        let is_bulk = self
188            .config
189            .as_ref()
190            .ok_or_else(|| ConnectorError::InvalidState {
191                expected: "open".into(),
192                actual: "closed".into(),
193            })?
194            .format
195            .is_bulk_format();
196        let max_file_size = self.config.as_ref().and_then(|c| c.max_file_size);
197
198        if batch.num_rows() == 0 {
199            return Ok(WriteResult::new(0, 0));
200        }
201
202        let rows = batch.num_rows();
203
204        if is_bulk {
205            let max_epoch = self.config.as_ref().map_or(10_000, |c| c.max_epoch_batches);
206            if self.epoch_batches.len() >= max_epoch {
207                return Err(ConnectorError::WriteError(format!(
208                    "file sink: epoch batch buffer full ({max_epoch} batches) — \
209                     increase max_epoch_batches or flush more frequently"
210                )));
211            }
212            // Buffer for bulk write on pre_commit (Parquet).
213            self.epoch_batches.push(batch.clone());
214            return Ok(WriteResult::new(rows, 0));
215        }
216
217        // Row format: encode in-memory, then run the blocking write
218        // loop on the blocking pool so it can't stall the runtime.
219        let encoded = self
220            .encoder
221            .as_ref()
222            .ok_or_else(|| ConnectorError::InvalidState {
223                expected: "encoder ready".into(),
224                actual: "no encoder".into(),
225            })?
226            .encode_batch(batch)
227            .map_err(|e| ConnectorError::WriteError(format!("encode error: {e}")))?;
228
229        self.ensure_writer()?;
230        let mut writer = self.writer.take().expect("ensure_writer just ran");
231        let (writer, bytes_written) =
232            tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
233                let mut total: u64 = 0;
234                for record_bytes in &encoded {
235                    writer
236                        .write_all(record_bytes)
237                        .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
238                    writer
239                        .write_all(b"\n")
240                        .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
241                    total += record_bytes.len() as u64 + 1;
242                }
243                Ok((writer, total))
244            })
245            .await
246            .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
247        self.writer = Some(writer);
248        self.segment_bytes += bytes_written;
249
250        // Mid-epoch rotation for row formats.
251        if let Some(max_size) = max_file_size {
252            if self.segment_bytes >= max_size as u64 {
253                debug!("file sink: rotating at {} bytes", self.segment_bytes);
254                self.close_writer_async().await?;
255                self.current_segment += 1;
256            }
257        }
258
259        Ok(WriteResult::new(rows, bytes_written))
260    }
261
262    fn schema(&self) -> SchemaRef {
263        self.schema.clone()
264    }
265
266    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
267        self.current_epoch = epoch;
268        self.epoch_batches.clear();
269        self.current_segment = 0;
270        self.segment_bytes = 0;
271        self.close_writer_async().await?;
272        self.active_tmp_files.clear();
273        Ok(())
274    }
275
276    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
277        let is_bulk = self
278            .config
279            .as_ref()
280            .map_or(false, |c| c.format.is_bulk_format());
281
282        if is_bulk && !self.epoch_batches.is_empty() {
283            // Flush all buffered batches as a single Parquet file.
284            let encoder = self
285                .encoder
286                .as_ref()
287                .ok_or_else(|| ConnectorError::InvalidState {
288                    expected: "encoder ready".into(),
289                    actual: "no encoder".into(),
290                })?;
291
292            let combined = if self.epoch_batches.len() == 1 {
293                self.epoch_batches[0].clone()
294            } else {
295                arrow_select::concat::concat_batches(&self.schema, &self.epoch_batches)
296                    .map_err(|e| ConnectorError::WriteError(format!("batch concat error: {e}")))?
297            };
298
299            let encoded = encoder
300                .encode_batch(&combined)
301                .map_err(|e| ConnectorError::WriteError(format!("Parquet encode error: {e}")))?;
302
303            // Parquet encoder returns exactly one file blob. Run the
304            // file write on the blocking pool.
305            if let Some(file_bytes) = encoded.into_iter().next() {
306                self.open_segment()?;
307                let writer = self.writer.take().expect("open_segment just ran");
308                let writer = tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
309                    let mut w = writer;
310                    w.write_all(&file_bytes)
311                        .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
312                    Ok(w)
313                })
314                .await
315                .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
316                self.writer = Some(writer);
317            }
318            self.epoch_batches.clear();
319        }
320
321        // Flush and close the buffered writer (on the blocking pool).
322        self.close_writer_async().await?;
323
324        // Fsync all tmp files. Propagate errors — fsync failure means data
325        // is not durable and we must not let the checkpoint proceed.
326        let paths: Vec<PathBuf> = self.active_tmp_files.clone();
327        tokio::task::spawn_blocking(move || {
328            for path in &paths {
329                if path.exists() {
330                    let f = std::fs::OpenOptions::new()
331                        .write(true)
332                        .open(path)
333                        .map_err(|e| {
334                            ConnectorError::WriteError(format!(
335                                "cannot open '{}' for fsync: {e}",
336                                path.display()
337                            ))
338                        })?;
339                    f.sync_all().map_err(|e| {
340                        ConnectorError::WriteError(format!(
341                            "fsync failed on '{}': {e}",
342                            path.display()
343                        ))
344                    })?;
345                }
346            }
347            Ok::<(), ConnectorError>(())
348        })
349        .await
350        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
351    }
352
353    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
354        let is_append = self
355            .config
356            .as_ref()
357            .map_or(false, |c| c.mode == SinkMode::Append);
358
359        if is_append {
360            return Ok(());
361        }
362
363        // Rolling mode: rename .tmp → final.
364        for tmp_path in &self.active_tmp_files {
365            if !tmp_path.exists() {
366                continue;
367            }
368            let final_name = tmp_path
369                .file_name()
370                .and_then(|n| n.to_str())
371                .and_then(|n| n.strip_suffix(".tmp"))
372                .ok_or_else(|| {
373                    ConnectorError::WriteError(format!(
374                        "tmp file '{}' has no .tmp suffix — cannot commit",
375                        tmp_path.display()
376                    ))
377                })?;
378            let final_path = tmp_path.parent().unwrap_or(Path::new(".")).join(final_name);
379
380            std::fs::rename(tmp_path, &final_path).map_err(|e| {
381                ConnectorError::WriteError(format!(
382                    "cannot rename '{}' -> '{}': {e}",
383                    tmp_path.display(),
384                    final_path.display()
385                ))
386            })?;
387            debug!("file sink: committed {}", final_path.display());
388        }
389
390        self.active_tmp_files.clear();
391        Ok(())
392    }
393
394    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
395        self.close_writer_async().await?;
396        for tmp_path in &self.active_tmp_files {
397            if tmp_path.exists() {
398                let _ = std::fs::remove_file(tmp_path);
399                debug!("file sink: rolled back {}", tmp_path.display());
400            }
401        }
402        self.active_tmp_files.clear();
403        self.epoch_batches.clear();
404        Ok(())
405    }
406
407    fn capabilities(&self) -> SinkConnectorCapabilities {
408        SinkConnectorCapabilities::new(Duration::from_secs(30))
409            .with_exactly_once()
410            .with_two_phase_commit()
411    }
412
413    async fn close(&mut self) -> Result<(), ConnectorError> {
414        self.close_writer_async().await?;
415        for tmp_path in &self.active_tmp_files {
416            if tmp_path.exists() {
417                let _ = std::fs::remove_file(tmp_path);
418            }
419        }
420        self.active_tmp_files.clear();
421        self.is_open = false;
422        info!("file sink closed");
423        Ok(())
424    }
425}
426
427// ── Helpers ──────────────────────────────────────────────────────────
428
429fn build_encoder(
430    format: FileFormat,
431    schema: &SchemaRef,
432    config: &FileSinkConfig,
433) -> Result<Box<dyn FormatEncoder>, ConnectorError> {
434    match format {
435        FileFormat::Csv => {
436            let csv_config = crate::schema::CsvEncoderConfig {
437                delimiter: b',',
438                has_header: false,
439            };
440            let encoder = crate::schema::CsvEncoder::with_config(schema.clone(), csv_config);
441            Ok(Box::new(encoder))
442        }
443        FileFormat::Json | FileFormat::Text => {
444            let encoder = crate::schema::JsonEncoder::new(schema.clone());
445            Ok(Box::new(encoder))
446        }
447        FileFormat::Parquet => {
448            use parquet::basic::Compression;
449            let compression = match config.compression.to_lowercase().as_str() {
450                "none" | "uncompressed" => Compression::UNCOMPRESSED,
451                "snappy" => Compression::SNAPPY,
452                "gzip" => Compression::GZIP(parquet::basic::GzipLevel::default()),
453                "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
454                "lz4" => Compression::LZ4,
455                other => {
456                    return Err(ConnectorError::ConfigurationError(format!(
457                        "unknown Parquet compression: '{other}'"
458                    )));
459                }
460            };
461            let parquet_config = crate::schema::parquet::ParquetEncoderConfig::default()
462                .with_compression(compression);
463            let encoder =
464                crate::schema::parquet::ParquetEncoder::with_config(schema.clone(), parquet_config);
465            Ok(Box::new(encoder))
466        }
467        FileFormat::ArrowIpc => {
468            let encoder = super::arrow_ipc_codec::ArrowIpcEncoder::new(schema.clone());
469            Ok(Box::new(encoder))
470        }
471    }
472}
473
474/// Removes orphaned `.tmp` files matching the given prefix from a previous crash.
475fn cleanup_tmp_files(dir: &Path, prefix: &str) {
476    let entries = match std::fs::read_dir(dir) {
477        Ok(e) => e,
478        Err(_) => return,
479    };
480    for entry in entries.flatten() {
481        let path = entry.path();
482        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
483            if name.starts_with(prefix) && name.ends_with(".tmp") {
484                info!("file sink: removing orphaned tmp file: {}", path.display());
485                let _ = std::fs::remove_file(&path);
486            }
487        }
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use arrow_array::{Int64Array, StringArray};
495    use arrow_schema::{DataType, Field, Schema};
496
497    fn test_schema() -> SchemaRef {
498        Arc::new(Schema::new(vec![
499            Field::new("id", DataType::Int64, false),
500            Field::new("name", DataType::Utf8, true),
501        ]))
502    }
503
504    fn test_batch(schema: &SchemaRef) -> RecordBatch {
505        RecordBatch::try_new(
506            schema.clone(),
507            vec![
508                Arc::new(Int64Array::from(vec![1, 2, 3])),
509                Arc::new(StringArray::from(vec!["a", "b", "c"])),
510            ],
511        )
512        .unwrap()
513    }
514
515    #[test]
516    fn test_sink_default() {
517        let sink = FileSink::new();
518        assert!(!sink.is_open);
519        assert_eq!(sink.current_epoch, 0);
520    }
521
522    #[tokio::test]
523    async fn test_sink_open_creates_dir() {
524        let dir = tempfile::tempdir().unwrap();
525        let out_path = dir.path().join("output");
526
527        let mut sink = FileSink::new();
528        let mut config = ConnectorConfig::new("files");
529        config.set("path", out_path.to_str().unwrap());
530        config.set("format", "json");
531
532        sink.open(&config).await.unwrap();
533        assert!(sink.is_open);
534        assert!(out_path.exists());
535        sink.close().await.unwrap();
536    }
537
538    #[tokio::test]
539    async fn test_sink_rolling_json_lifecycle() {
540        let dir = tempfile::tempdir().unwrap();
541        let out_path = dir.path().join("output");
542
543        let mut sink = FileSink::new();
544        let mut config = ConnectorConfig::new("files");
545        config.set("path", out_path.to_str().unwrap());
546        config.set("format", "json");
547
548        sink.open(&config).await.unwrap();
549        sink.begin_epoch(1).await.unwrap();
550
551        let schema = test_schema();
552        let batch = test_batch(&schema);
553        let result = sink.write_batch(&batch).await.unwrap();
554        assert_eq!(result.records_written, 3);
555
556        sink.pre_commit(1).await.unwrap();
557        sink.commit_epoch(1).await.unwrap();
558
559        // Check that a final file exists (not .tmp).
560        let files: Vec<_> = std::fs::read_dir(&out_path)
561            .unwrap()
562            .flatten()
563            .filter(|e| {
564                let name = e.file_name();
565                let n = name.to_str().unwrap();
566                !n.ends_with(".tmp")
567            })
568            .collect();
569        assert!(!files.is_empty(), "expected committed file in output dir");
570
571        sink.close().await.unwrap();
572    }
573
574    #[tokio::test]
575    async fn test_sink_rollback_deletes_tmp() {
576        let dir = tempfile::tempdir().unwrap();
577        let out_path = dir.path().join("output");
578
579        let mut sink = FileSink::new();
580        let mut config = ConnectorConfig::new("files");
581        config.set("path", out_path.to_str().unwrap());
582        config.set("format", "json");
583
584        sink.open(&config).await.unwrap();
585        sink.begin_epoch(1).await.unwrap();
586
587        let schema = test_schema();
588        let batch = test_batch(&schema);
589        sink.write_batch(&batch).await.unwrap();
590
591        sink.rollback_epoch(1).await.unwrap();
592
593        let tmp_count = std::fs::read_dir(&out_path)
594            .unwrap()
595            .flatten()
596            .filter(|e| e.file_name().to_str().unwrap().ends_with(".tmp"))
597            .count();
598        assert_eq!(
599            tmp_count, 0,
600            "tmp files should be cleaned up after rollback"
601        );
602
603        sink.close().await.unwrap();
604    }
605
606    #[tokio::test]
607    async fn test_cleanup_tmp_on_open() {
608        let dir = tempfile::tempdir().unwrap();
609        let out_path = dir.path().join("output");
610        std::fs::create_dir_all(&out_path).unwrap();
611
612        // Create orphaned .tmp files — only the ones matching our prefix should be removed.
613        std::fs::write(out_path.join("part_000001_000.jsonl.tmp"), b"orphan").unwrap();
614        std::fs::write(out_path.join("other_000001_000.jsonl.tmp"), b"keep").unwrap();
615
616        let mut sink = FileSink::new();
617        let mut config = ConnectorConfig::new("files");
618        config.set("path", out_path.to_str().unwrap());
619        config.set("format", "json");
620
621        sink.open(&config).await.unwrap();
622
623        // Only "part_*" orphaned tmp should be removed.
624        assert!(!out_path.join("part_000001_000.jsonl.tmp").exists());
625        assert!(out_path.join("other_000001_000.jsonl.tmp").exists());
626
627        sink.close().await.unwrap();
628    }
629
630    #[tokio::test]
631    async fn test_capabilities() {
632        let sink = FileSink::new();
633        let caps = sink.capabilities();
634        assert!(caps.exactly_once);
635        assert!(caps.two_phase_commit);
636    }
637}