Skip to main content

laminar_connectors/files/
sink.rs

1//! File sink connector implementing [`SinkConnector`].
2//!
3//! Supports two modes:
4//! - **Append**: writes directly to the output file, flushing on pre-commit.
5//! - **Rolling**: creates a new `.tmp` file per epoch, atomically renamed on commit.
6//!
7//! For bulk formats (Parquet), mid-epoch rotation is disabled — files are
8//! written as a single unit on `pre_commit()`.
9
10use std::io::{BufWriter, Write};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use arrow_array::RecordBatch;
15use arrow_schema::SchemaRef;
16use async_trait::async_trait;
17use tracing::{debug, info};
18
19use crate::config::ConnectorConfig;
20use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
21use crate::error::ConnectorError;
22use crate::health::HealthStatus;
23use crate::schema::traits::FormatEncoder;
24
25use super::config::{FileFormat, FileSinkConfig, SinkMode};
26
27/// File sink connector with append or rolling mode.
28pub struct FileSink {
29    /// Parsed configuration.
30    config: Option<FileSinkConfig>,
31    /// Output schema.
32    schema: SchemaRef,
33    /// Format encoder.
34    encoder: Option<Box<dyn FormatEncoder>>,
35    /// Current epoch.
36    current_epoch: u64,
37    /// Buffered batches for the current epoch (Parquet bulk writes only).
38    epoch_batches: Vec<RecordBatch>,
39    /// Current segment index within epoch (for mid-epoch rotation).
40    current_segment: usize,
41    /// Bytes written in current segment.
42    segment_bytes: u64,
43    /// Buffered file writer for the current segment (row formats only).
44    writer: Option<BufWriter<std::fs::File>>,
45    /// Active .tmp file paths in the current epoch.
46    active_tmp_files: Vec<PathBuf>,
47    /// Whether the sink is open.
48    is_open: bool,
49}
50
51impl FileSink {
52    /// Creates a new file sink with a placeholder schema.
53    #[must_use]
54    pub fn new() -> Self {
55        Self {
56            config: None,
57            schema: Arc::new(arrow_schema::Schema::empty()),
58            encoder: None,
59            current_epoch: 0,
60            epoch_batches: Vec::new(),
61            current_segment: 0,
62            segment_bytes: 0,
63            writer: None,
64            active_tmp_files: Vec::new(),
65            is_open: false,
66        }
67    }
68
69    /// Opens (or rotates to) a new `.tmp` segment file.
70    fn open_segment(&mut self) -> Result<(), ConnectorError> {
71        let config = self
72            .config
73            .as_ref()
74            .ok_or_else(|| ConnectorError::InvalidState {
75                expected: "configured".into(),
76                actual: "unconfigured".into(),
77            })?;
78
79        let filename = format!(
80            "{}_{:06}_{:03}.{}.tmp",
81            config.prefix,
82            self.current_epoch,
83            self.current_segment,
84            config.format.extension()
85        );
86        let path = Path::new(&config.path).join(&filename);
87
88        let file = std::fs::OpenOptions::new()
89            .create(true)
90            .write(true)
91            .truncate(true)
92            .open(&path)
93            .map_err(|e| {
94                ConnectorError::WriteError(format!("cannot open '{}': {e}", path.display()))
95            })?;
96
97        self.writer = Some(BufWriter::new(file));
98        self.active_tmp_files.push(path);
99        self.segment_bytes = 0;
100        Ok(())
101    }
102
103    /// Ensures a writer is open for the current segment.
104    fn ensure_writer(&mut self) -> Result<(), ConnectorError> {
105        if self.writer.is_none() {
106            self.open_segment()?;
107        }
108        Ok(())
109    }
110
111    /// Closes the current writer (flushes `BufWriter`).
112    fn close_writer(&mut self) -> Result<(), ConnectorError> {
113        if let Some(mut w) = self.writer.take() {
114            w.flush()
115                .map_err(|e| ConnectorError::WriteError(format!("flush error: {e}")))?;
116        }
117        Ok(())
118    }
119}
120
121impl Default for FileSink {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127impl std::fmt::Debug for FileSink {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("FileSink")
130            .field("is_open", &self.is_open)
131            .field("current_epoch", &self.current_epoch)
132            .field("epoch_batches", &self.epoch_batches.len())
133            .field("active_tmp_files", &self.active_tmp_files.len())
134            .finish()
135    }
136}
137
138#[async_trait]
139impl SinkConnector for FileSink {
140    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
141        let sink_config = FileSinkConfig::from_connector_config(config)?;
142
143        // Ensure output directory exists.
144        let out_dir = Path::new(&sink_config.path);
145        let out_dir_owned = out_dir.to_path_buf();
146        let prefix = sink_config.prefix.clone();
147        tokio::task::spawn_blocking(move || {
148            if !out_dir_owned.exists() {
149                std::fs::create_dir_all(&out_dir_owned)?;
150            }
151            cleanup_tmp_files(&out_dir_owned, &prefix);
152            Ok::<(), std::io::Error>(())
153        })
154        .await
155        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
156        .map_err(|e| ConnectorError::WriteError(format!("cannot init output dir: {e}")))?;
157
158        // Resolve schema from connector config.
159        let schema = config
160            .arrow_schema()
161            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()));
162
163        let encoder = build_encoder(sink_config.format, &schema, &sink_config)?;
164
165        self.config = Some(sink_config);
166        self.schema = schema;
167        self.encoder = Some(encoder);
168        self.is_open = true;
169
170        info!("file sink opened");
171        Ok(())
172    }
173
174    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
175        let is_bulk = self
176            .config
177            .as_ref()
178            .ok_or_else(|| ConnectorError::InvalidState {
179                expected: "open".into(),
180                actual: "closed".into(),
181            })?
182            .format
183            .is_bulk_format();
184        let max_file_size = self.config.as_ref().and_then(|c| c.max_file_size);
185
186        if batch.num_rows() == 0 {
187            return Ok(WriteResult::new(0, 0));
188        }
189
190        let rows = batch.num_rows();
191
192        if is_bulk {
193            // Buffer for bulk write on pre_commit (Parquet).
194            self.epoch_batches.push(batch.clone());
195            return Ok(WriteResult::new(rows, 0));
196        }
197
198        // Row format: encode and write immediately via buffered writer.
199        let encoder = self
200            .encoder
201            .as_ref()
202            .ok_or_else(|| ConnectorError::InvalidState {
203                expected: "encoder ready".into(),
204                actual: "no encoder".into(),
205            })?;
206
207        let encoded = encoder
208            .encode_batch(batch)
209            .map_err(|e| ConnectorError::WriteError(format!("encode error: {e}")))?;
210
211        self.ensure_writer()?;
212        let writer = self.writer.as_mut().unwrap();
213
214        let mut bytes_written: u64 = 0;
215        for record_bytes in &encoded {
216            writer
217                .write_all(record_bytes)
218                .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
219            writer
220                .write_all(b"\n")
221                .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
222            bytes_written += record_bytes.len() as u64 + 1;
223        }
224
225        self.segment_bytes += bytes_written;
226
227        // Mid-epoch rotation for row formats (if max_file_size exceeded).
228        if let Some(max_size) = max_file_size {
229            if self.segment_bytes >= max_size as u64 {
230                debug!("file sink: rotating at {} bytes", self.segment_bytes);
231                self.close_writer()?;
232                self.current_segment += 1;
233                // Next write_batch will open a new segment via ensure_writer().
234            }
235        }
236
237        Ok(WriteResult::new(rows, bytes_written))
238    }
239
240    fn schema(&self) -> SchemaRef {
241        self.schema.clone()
242    }
243
244    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
245        self.current_epoch = epoch;
246        self.epoch_batches.clear();
247        self.current_segment = 0;
248        self.segment_bytes = 0;
249        self.close_writer()?;
250        self.active_tmp_files.clear();
251        Ok(())
252    }
253
254    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
255        let is_bulk = self
256            .config
257            .as_ref()
258            .map_or(false, |c| c.format.is_bulk_format());
259
260        if is_bulk && !self.epoch_batches.is_empty() {
261            // Flush all buffered batches as a single Parquet file.
262            let encoder = self
263                .encoder
264                .as_ref()
265                .ok_or_else(|| ConnectorError::InvalidState {
266                    expected: "encoder ready".into(),
267                    actual: "no encoder".into(),
268                })?;
269
270            let combined = if self.epoch_batches.len() == 1 {
271                self.epoch_batches[0].clone()
272            } else {
273                arrow_select::concat::concat_batches(&self.schema, &self.epoch_batches)
274                    .map_err(|e| ConnectorError::WriteError(format!("batch concat error: {e}")))?
275            };
276
277            let encoded = encoder
278                .encode_batch(&combined)
279                .map_err(|e| ConnectorError::WriteError(format!("Parquet encode error: {e}")))?;
280
281            // Parquet encoder returns exactly one file blob.
282            if let Some(file_bytes) = encoded.first() {
283                self.open_segment()?;
284                let writer = self.writer.as_mut().unwrap();
285                writer
286                    .write_all(file_bytes)
287                    .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
288            }
289            self.epoch_batches.clear();
290        }
291
292        // Flush and close the buffered writer.
293        self.close_writer()?;
294
295        // Fsync all tmp files. Propagate errors — fsync failure means data
296        // is not durable and we must not let the checkpoint proceed.
297        let paths: Vec<PathBuf> = self.active_tmp_files.clone();
298        tokio::task::spawn_blocking(move || {
299            for path in &paths {
300                if path.exists() {
301                    let f = std::fs::OpenOptions::new()
302                        .write(true)
303                        .open(path)
304                        .map_err(|e| {
305                            ConnectorError::WriteError(format!(
306                                "cannot open '{}' for fsync: {e}",
307                                path.display()
308                            ))
309                        })?;
310                    f.sync_all().map_err(|e| {
311                        ConnectorError::WriteError(format!(
312                            "fsync failed on '{}': {e}",
313                            path.display()
314                        ))
315                    })?;
316                }
317            }
318            Ok::<(), ConnectorError>(())
319        })
320        .await
321        .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
322    }
323
324    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
325        let is_append = self
326            .config
327            .as_ref()
328            .map_or(false, |c| c.mode == SinkMode::Append);
329
330        if is_append {
331            return Ok(());
332        }
333
334        // Rolling mode: rename .tmp → final.
335        for tmp_path in &self.active_tmp_files {
336            if !tmp_path.exists() {
337                continue;
338            }
339            let final_name = tmp_path
340                .file_name()
341                .and_then(|n| n.to_str())
342                .and_then(|n| n.strip_suffix(".tmp"))
343                .ok_or_else(|| {
344                    ConnectorError::WriteError(format!(
345                        "tmp file '{}' has no .tmp suffix — cannot commit",
346                        tmp_path.display()
347                    ))
348                })?;
349            let final_path = tmp_path.parent().unwrap_or(Path::new(".")).join(final_name);
350
351            std::fs::rename(tmp_path, &final_path).map_err(|e| {
352                ConnectorError::WriteError(format!(
353                    "cannot rename '{}' -> '{}': {e}",
354                    tmp_path.display(),
355                    final_path.display()
356                ))
357            })?;
358            debug!("file sink: committed {}", final_path.display());
359        }
360
361        self.active_tmp_files.clear();
362        Ok(())
363    }
364
365    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
366        self.close_writer()?;
367        for tmp_path in &self.active_tmp_files {
368            if tmp_path.exists() {
369                let _ = std::fs::remove_file(tmp_path);
370                debug!("file sink: rolled back {}", tmp_path.display());
371            }
372        }
373        self.active_tmp_files.clear();
374        self.epoch_batches.clear();
375        Ok(())
376    }
377
378    fn health_check(&self) -> HealthStatus {
379        if self.is_open {
380            HealthStatus::Healthy
381        } else {
382            HealthStatus::Unknown
383        }
384    }
385
386    fn capabilities(&self) -> SinkConnectorCapabilities {
387        SinkConnectorCapabilities::default()
388            .with_exactly_once()
389            .with_two_phase_commit()
390    }
391
392    async fn close(&mut self) -> Result<(), ConnectorError> {
393        self.close_writer()?;
394        for tmp_path in &self.active_tmp_files {
395            if tmp_path.exists() {
396                let _ = std::fs::remove_file(tmp_path);
397            }
398        }
399        self.active_tmp_files.clear();
400        self.is_open = false;
401        info!("file sink closed");
402        Ok(())
403    }
404}
405
406// ── Helpers ──────────────────────────────────────────────────────────
407
408fn build_encoder(
409    format: FileFormat,
410    schema: &SchemaRef,
411    config: &FileSinkConfig,
412) -> Result<Box<dyn FormatEncoder>, ConnectorError> {
413    match format {
414        FileFormat::Csv | FileFormat::Json | FileFormat::Text => {
415            // JSON encoder produces one JSON object per row — suitable for
416            // JSON Lines output. CSV and text also use this as a baseline;
417            // a dedicated CSV encoder can be added later via the serde module.
418            let encoder = crate::schema::JsonEncoder::new(schema.clone());
419            Ok(Box::new(encoder))
420        }
421        FileFormat::Parquet => {
422            use parquet::basic::Compression;
423            let compression = match config.compression.to_lowercase().as_str() {
424                "none" | "uncompressed" => Compression::UNCOMPRESSED,
425                "snappy" => Compression::SNAPPY,
426                "gzip" => Compression::GZIP(parquet::basic::GzipLevel::default()),
427                "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
428                "lz4" => Compression::LZ4,
429                other => {
430                    return Err(ConnectorError::ConfigurationError(format!(
431                        "unknown Parquet compression: '{other}'"
432                    )));
433                }
434            };
435            let parquet_config = crate::schema::parquet::ParquetEncoderConfig::default()
436                .with_compression(compression);
437            let encoder =
438                crate::schema::parquet::ParquetEncoder::with_config(schema.clone(), parquet_config);
439            Ok(Box::new(encoder))
440        }
441    }
442}
443
444/// Removes orphaned `.tmp` files matching the given prefix from a previous crash.
445fn cleanup_tmp_files(dir: &Path, prefix: &str) {
446    let entries = match std::fs::read_dir(dir) {
447        Ok(e) => e,
448        Err(_) => return,
449    };
450    for entry in entries.flatten() {
451        let path = entry.path();
452        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
453            if name.starts_with(prefix) && name.ends_with(".tmp") {
454                info!("file sink: removing orphaned tmp file: {}", path.display());
455                let _ = std::fs::remove_file(&path);
456            }
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use arrow_array::{Int64Array, StringArray};
465    use arrow_schema::{DataType, Field, Schema};
466
467    fn test_schema() -> SchemaRef {
468        Arc::new(Schema::new(vec![
469            Field::new("id", DataType::Int64, false),
470            Field::new("name", DataType::Utf8, true),
471        ]))
472    }
473
474    fn test_batch(schema: &SchemaRef) -> RecordBatch {
475        RecordBatch::try_new(
476            schema.clone(),
477            vec![
478                Arc::new(Int64Array::from(vec![1, 2, 3])),
479                Arc::new(StringArray::from(vec!["a", "b", "c"])),
480            ],
481        )
482        .unwrap()
483    }
484
485    #[test]
486    fn test_sink_default() {
487        let sink = FileSink::new();
488        assert!(!sink.is_open);
489        assert_eq!(sink.current_epoch, 0);
490    }
491
492    #[tokio::test]
493    async fn test_sink_open_creates_dir() {
494        let dir = tempfile::tempdir().unwrap();
495        let out_path = dir.path().join("output");
496
497        let mut sink = FileSink::new();
498        let mut config = ConnectorConfig::new("files");
499        config.set("path", out_path.to_str().unwrap());
500        config.set("format", "json");
501
502        sink.open(&config).await.unwrap();
503        assert!(sink.is_open);
504        assert!(out_path.exists());
505        sink.close().await.unwrap();
506    }
507
508    #[tokio::test]
509    async fn test_sink_rolling_json_lifecycle() {
510        let dir = tempfile::tempdir().unwrap();
511        let out_path = dir.path().join("output");
512
513        let mut sink = FileSink::new();
514        let mut config = ConnectorConfig::new("files");
515        config.set("path", out_path.to_str().unwrap());
516        config.set("format", "json");
517
518        sink.open(&config).await.unwrap();
519        sink.begin_epoch(1).await.unwrap();
520
521        let schema = test_schema();
522        let batch = test_batch(&schema);
523        let result = sink.write_batch(&batch).await.unwrap();
524        assert_eq!(result.records_written, 3);
525
526        sink.pre_commit(1).await.unwrap();
527        sink.commit_epoch(1).await.unwrap();
528
529        // Check that a final file exists (not .tmp).
530        let files: Vec<_> = std::fs::read_dir(&out_path)
531            .unwrap()
532            .flatten()
533            .filter(|e| {
534                let name = e.file_name();
535                let n = name.to_str().unwrap();
536                !n.ends_with(".tmp")
537            })
538            .collect();
539        assert!(!files.is_empty(), "expected committed file in output dir");
540
541        sink.close().await.unwrap();
542    }
543
544    #[tokio::test]
545    async fn test_sink_rollback_deletes_tmp() {
546        let dir = tempfile::tempdir().unwrap();
547        let out_path = dir.path().join("output");
548
549        let mut sink = FileSink::new();
550        let mut config = ConnectorConfig::new("files");
551        config.set("path", out_path.to_str().unwrap());
552        config.set("format", "json");
553
554        sink.open(&config).await.unwrap();
555        sink.begin_epoch(1).await.unwrap();
556
557        let schema = test_schema();
558        let batch = test_batch(&schema);
559        sink.write_batch(&batch).await.unwrap();
560
561        sink.rollback_epoch(1).await.unwrap();
562
563        let tmp_count = std::fs::read_dir(&out_path)
564            .unwrap()
565            .flatten()
566            .filter(|e| e.file_name().to_str().unwrap().ends_with(".tmp"))
567            .count();
568        assert_eq!(
569            tmp_count, 0,
570            "tmp files should be cleaned up after rollback"
571        );
572
573        sink.close().await.unwrap();
574    }
575
576    #[tokio::test]
577    async fn test_cleanup_tmp_on_open() {
578        let dir = tempfile::tempdir().unwrap();
579        let out_path = dir.path().join("output");
580        std::fs::create_dir_all(&out_path).unwrap();
581
582        // Create orphaned .tmp files — only the ones matching our prefix should be removed.
583        std::fs::write(out_path.join("part_000001_000.jsonl.tmp"), b"orphan").unwrap();
584        std::fs::write(out_path.join("other_000001_000.jsonl.tmp"), b"keep").unwrap();
585
586        let mut sink = FileSink::new();
587        let mut config = ConnectorConfig::new("files");
588        config.set("path", out_path.to_str().unwrap());
589        config.set("format", "json");
590
591        sink.open(&config).await.unwrap();
592
593        // Only "part_*" orphaned tmp should be removed.
594        assert!(!out_path.join("part_000001_000.jsonl.tmp").exists());
595        assert!(out_path.join("other_000001_000.jsonl.tmp").exists());
596
597        sink.close().await.unwrap();
598    }
599
600    #[tokio::test]
601    async fn test_capabilities() {
602        let sink = FileSink::new();
603        let caps = sink.capabilities();
604        assert!(caps.exactly_once);
605        assert!(caps.two_phase_commit);
606    }
607}