laminar_connectors/files/
sink.rs1use std::io::{BufWriter, Write};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::{debug, info};
19
20use crate::config::ConnectorConfig;
21use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
22use crate::error::ConnectorError;
23use crate::health::HealthStatus;
24use crate::schema::traits::FormatEncoder;
25
26use super::config::{FileFormat, FileSinkConfig, SinkMode};
27
28pub struct FileSink {
30 config: Option<FileSinkConfig>,
32 schema: SchemaRef,
34 encoder: Option<Box<dyn FormatEncoder>>,
36 current_epoch: u64,
38 epoch_batches: Vec<RecordBatch>,
40 current_segment: usize,
42 segment_bytes: u64,
44 writer: Option<BufWriter<std::fs::File>>,
46 active_tmp_files: Vec<PathBuf>,
48 is_open: bool,
50}
51
52impl FileSink {
53 #[must_use]
55 pub fn new() -> Self {
56 Self::with_registry(None)
57 }
58
59 #[must_use]
61 pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
62 Self {
63 config: None,
64 schema: Arc::new(arrow_schema::Schema::empty()),
65 encoder: None,
66 current_epoch: 0,
67 epoch_batches: Vec::new(),
68 current_segment: 0,
69 segment_bytes: 0,
70 writer: None,
71 active_tmp_files: Vec::new(),
72 is_open: false,
73 }
74 }
75
76 fn open_segment(&mut self) -> Result<(), ConnectorError> {
78 let config = self
79 .config
80 .as_ref()
81 .ok_or_else(|| ConnectorError::InvalidState {
82 expected: "configured".into(),
83 actual: "unconfigured".into(),
84 })?;
85
86 let filename = format!(
87 "{}_{:06}_{:03}.{}.tmp",
88 config.prefix,
89 self.current_epoch,
90 self.current_segment,
91 config.format.extension()
92 );
93 let path = Path::new(&config.path).join(&filename);
94
95 let file = std::fs::OpenOptions::new()
96 .create(true)
97 .write(true)
98 .truncate(true)
99 .open(&path)
100 .map_err(|e| {
101 ConnectorError::WriteError(format!("cannot open '{}': {e}", path.display()))
102 })?;
103
104 self.writer = Some(BufWriter::new(file));
105 self.active_tmp_files.push(path);
106 self.segment_bytes = 0;
107 Ok(())
108 }
109
110 fn ensure_writer(&mut self) -> Result<(), ConnectorError> {
112 if self.writer.is_none() {
113 self.open_segment()?;
114 }
115 Ok(())
116 }
117
118 async fn close_writer_async(&mut self) -> Result<(), ConnectorError> {
121 let Some(writer) = self.writer.take() else {
122 return Ok(());
123 };
124 tokio::task::spawn_blocking(move || -> Result<(), ConnectorError> {
125 let mut w = writer;
126 w.flush()
127 .map_err(|e| ConnectorError::WriteError(format!("flush error: {e}")))
128 })
129 .await
130 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
131 }
132}
133
134impl Default for FileSink {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl std::fmt::Debug for FileSink {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("FileSink")
143 .field("is_open", &self.is_open)
144 .field("current_epoch", &self.current_epoch)
145 .field("epoch_batches", &self.epoch_batches.len())
146 .field("active_tmp_files", &self.active_tmp_files.len())
147 .finish()
148 }
149}
150
151#[async_trait]
152impl SinkConnector for FileSink {
153 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
154 let sink_config = FileSinkConfig::from_connector_config(config)?;
155
156 let out_dir = Path::new(&sink_config.path);
158 let out_dir_owned = out_dir.to_path_buf();
159 let prefix = sink_config.prefix.clone();
160 tokio::task::spawn_blocking(move || {
161 if !out_dir_owned.exists() {
162 std::fs::create_dir_all(&out_dir_owned)?;
163 }
164 cleanup_tmp_files(&out_dir_owned, &prefix);
165 Ok::<(), std::io::Error>(())
166 })
167 .await
168 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
169 .map_err(|e| ConnectorError::WriteError(format!("cannot init output dir: {e}")))?;
170
171 let schema = config
173 .arrow_schema()
174 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()));
175
176 let encoder = build_encoder(sink_config.format, &schema, &sink_config)?;
177
178 self.config = Some(sink_config);
179 self.schema = schema;
180 self.encoder = Some(encoder);
181 self.is_open = true;
182
183 info!("file sink opened");
184 Ok(())
185 }
186
187 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
188 let is_bulk = self
189 .config
190 .as_ref()
191 .ok_or_else(|| ConnectorError::InvalidState {
192 expected: "open".into(),
193 actual: "closed".into(),
194 })?
195 .format
196 .is_bulk_format();
197 let max_file_size = self.config.as_ref().and_then(|c| c.max_file_size);
198
199 if batch.num_rows() == 0 {
200 return Ok(WriteResult::new(0, 0));
201 }
202
203 let rows = batch.num_rows();
204
205 if is_bulk {
206 let max_epoch = self.config.as_ref().map_or(10_000, |c| c.max_epoch_batches);
207 if self.epoch_batches.len() >= max_epoch {
208 return Err(ConnectorError::WriteError(format!(
209 "file sink: epoch batch buffer full ({max_epoch} batches) — \
210 increase max_epoch_batches or flush more frequently"
211 )));
212 }
213 self.epoch_batches.push(batch.clone());
215 return Ok(WriteResult::new(rows, 0));
216 }
217
218 let encoded = self
221 .encoder
222 .as_ref()
223 .ok_or_else(|| ConnectorError::InvalidState {
224 expected: "encoder ready".into(),
225 actual: "no encoder".into(),
226 })?
227 .encode_batch(batch)
228 .map_err(|e| ConnectorError::WriteError(format!("encode error: {e}")))?;
229
230 self.ensure_writer()?;
231 let mut writer = self.writer.take().expect("ensure_writer just ran");
232 let (writer, bytes_written) =
233 tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
234 let mut total: u64 = 0;
235 for record_bytes in &encoded {
236 writer
237 .write_all(record_bytes)
238 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
239 writer
240 .write_all(b"\n")
241 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
242 total += record_bytes.len() as u64 + 1;
243 }
244 Ok((writer, total))
245 })
246 .await
247 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
248 self.writer = Some(writer);
249 self.segment_bytes += bytes_written;
250
251 if let Some(max_size) = max_file_size {
253 if self.segment_bytes >= max_size as u64 {
254 debug!("file sink: rotating at {} bytes", self.segment_bytes);
255 self.close_writer_async().await?;
256 self.current_segment += 1;
257 }
258 }
259
260 Ok(WriteResult::new(rows, bytes_written))
261 }
262
263 fn schema(&self) -> SchemaRef {
264 self.schema.clone()
265 }
266
267 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
268 self.current_epoch = epoch;
269 self.epoch_batches.clear();
270 self.current_segment = 0;
271 self.segment_bytes = 0;
272 self.close_writer_async().await?;
273 self.active_tmp_files.clear();
274 Ok(())
275 }
276
277 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
278 let is_bulk = self
279 .config
280 .as_ref()
281 .map_or(false, |c| c.format.is_bulk_format());
282
283 if is_bulk && !self.epoch_batches.is_empty() {
284 let encoder = self
286 .encoder
287 .as_ref()
288 .ok_or_else(|| ConnectorError::InvalidState {
289 expected: "encoder ready".into(),
290 actual: "no encoder".into(),
291 })?;
292
293 let combined = if self.epoch_batches.len() == 1 {
294 self.epoch_batches[0].clone()
295 } else {
296 arrow_select::concat::concat_batches(&self.schema, &self.epoch_batches)
297 .map_err(|e| ConnectorError::WriteError(format!("batch concat error: {e}")))?
298 };
299
300 let encoded = encoder
301 .encode_batch(&combined)
302 .map_err(|e| ConnectorError::WriteError(format!("Parquet encode error: {e}")))?;
303
304 if let Some(file_bytes) = encoded.into_iter().next() {
307 self.open_segment()?;
308 let writer = self.writer.take().expect("open_segment just ran");
309 let writer = tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
310 let mut w = writer;
311 w.write_all(&file_bytes)
312 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
313 Ok(w)
314 })
315 .await
316 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
317 self.writer = Some(writer);
318 }
319 self.epoch_batches.clear();
320 }
321
322 self.close_writer_async().await?;
324
325 let paths: Vec<PathBuf> = self.active_tmp_files.clone();
328 tokio::task::spawn_blocking(move || {
329 for path in &paths {
330 if path.exists() {
331 let f = std::fs::OpenOptions::new()
332 .write(true)
333 .open(path)
334 .map_err(|e| {
335 ConnectorError::WriteError(format!(
336 "cannot open '{}' for fsync: {e}",
337 path.display()
338 ))
339 })?;
340 f.sync_all().map_err(|e| {
341 ConnectorError::WriteError(format!(
342 "fsync failed on '{}': {e}",
343 path.display()
344 ))
345 })?;
346 }
347 }
348 Ok::<(), ConnectorError>(())
349 })
350 .await
351 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
352 }
353
354 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
355 let is_append = self
356 .config
357 .as_ref()
358 .map_or(false, |c| c.mode == SinkMode::Append);
359
360 if is_append {
361 return Ok(());
362 }
363
364 for tmp_path in &self.active_tmp_files {
366 if !tmp_path.exists() {
367 continue;
368 }
369 let final_name = tmp_path
370 .file_name()
371 .and_then(|n| n.to_str())
372 .and_then(|n| n.strip_suffix(".tmp"))
373 .ok_or_else(|| {
374 ConnectorError::WriteError(format!(
375 "tmp file '{}' has no .tmp suffix — cannot commit",
376 tmp_path.display()
377 ))
378 })?;
379 let final_path = tmp_path.parent().unwrap_or(Path::new(".")).join(final_name);
380
381 std::fs::rename(tmp_path, &final_path).map_err(|e| {
382 ConnectorError::WriteError(format!(
383 "cannot rename '{}' -> '{}': {e}",
384 tmp_path.display(),
385 final_path.display()
386 ))
387 })?;
388 debug!("file sink: committed {}", final_path.display());
389 }
390
391 self.active_tmp_files.clear();
392 Ok(())
393 }
394
395 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
396 self.close_writer_async().await?;
397 for tmp_path in &self.active_tmp_files {
398 if tmp_path.exists() {
399 let _ = std::fs::remove_file(tmp_path);
400 debug!("file sink: rolled back {}", tmp_path.display());
401 }
402 }
403 self.active_tmp_files.clear();
404 self.epoch_batches.clear();
405 Ok(())
406 }
407
408 fn health_check(&self) -> HealthStatus {
409 if self.is_open {
410 HealthStatus::Healthy
411 } else {
412 HealthStatus::Unknown
413 }
414 }
415
416 fn capabilities(&self) -> SinkConnectorCapabilities {
417 SinkConnectorCapabilities::new(Duration::from_secs(30))
418 .with_exactly_once()
419 .with_two_phase_commit()
420 }
421
422 async fn close(&mut self) -> Result<(), ConnectorError> {
423 self.close_writer_async().await?;
424 for tmp_path in &self.active_tmp_files {
425 if tmp_path.exists() {
426 let _ = std::fs::remove_file(tmp_path);
427 }
428 }
429 self.active_tmp_files.clear();
430 self.is_open = false;
431 info!("file sink closed");
432 Ok(())
433 }
434}
435
436fn build_encoder(
439 format: FileFormat,
440 schema: &SchemaRef,
441 config: &FileSinkConfig,
442) -> Result<Box<dyn FormatEncoder>, ConnectorError> {
443 match format {
444 FileFormat::Csv => {
445 let csv_config = crate::schema::CsvEncoderConfig {
446 delimiter: b',',
447 has_header: false,
448 };
449 let encoder = crate::schema::CsvEncoder::with_config(schema.clone(), csv_config);
450 Ok(Box::new(encoder))
451 }
452 FileFormat::Json | FileFormat::Text => {
453 let encoder = crate::schema::JsonEncoder::new(schema.clone());
454 Ok(Box::new(encoder))
455 }
456 FileFormat::Parquet => {
457 use parquet::basic::Compression;
458 let compression = match config.compression.to_lowercase().as_str() {
459 "none" | "uncompressed" => Compression::UNCOMPRESSED,
460 "snappy" => Compression::SNAPPY,
461 "gzip" => Compression::GZIP(parquet::basic::GzipLevel::default()),
462 "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
463 "lz4" => Compression::LZ4,
464 other => {
465 return Err(ConnectorError::ConfigurationError(format!(
466 "unknown Parquet compression: '{other}'"
467 )));
468 }
469 };
470 let parquet_config = crate::schema::parquet::ParquetEncoderConfig::default()
471 .with_compression(compression);
472 let encoder =
473 crate::schema::parquet::ParquetEncoder::with_config(schema.clone(), parquet_config);
474 Ok(Box::new(encoder))
475 }
476 FileFormat::ArrowIpc => {
477 let encoder = super::arrow_ipc_codec::ArrowIpcEncoder::new(schema.clone());
478 Ok(Box::new(encoder))
479 }
480 }
481}
482
483fn cleanup_tmp_files(dir: &Path, prefix: &str) {
485 let entries = match std::fs::read_dir(dir) {
486 Ok(e) => e,
487 Err(_) => return,
488 };
489 for entry in entries.flatten() {
490 let path = entry.path();
491 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
492 if name.starts_with(prefix) && name.ends_with(".tmp") {
493 info!("file sink: removing orphaned tmp file: {}", path.display());
494 let _ = std::fs::remove_file(&path);
495 }
496 }
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503 use arrow_array::{Int64Array, StringArray};
504 use arrow_schema::{DataType, Field, Schema};
505
506 fn test_schema() -> SchemaRef {
507 Arc::new(Schema::new(vec![
508 Field::new("id", DataType::Int64, false),
509 Field::new("name", DataType::Utf8, true),
510 ]))
511 }
512
513 fn test_batch(schema: &SchemaRef) -> RecordBatch {
514 RecordBatch::try_new(
515 schema.clone(),
516 vec![
517 Arc::new(Int64Array::from(vec![1, 2, 3])),
518 Arc::new(StringArray::from(vec!["a", "b", "c"])),
519 ],
520 )
521 .unwrap()
522 }
523
524 #[test]
525 fn test_sink_default() {
526 let sink = FileSink::new();
527 assert!(!sink.is_open);
528 assert_eq!(sink.current_epoch, 0);
529 }
530
531 #[tokio::test]
532 async fn test_sink_open_creates_dir() {
533 let dir = tempfile::tempdir().unwrap();
534 let out_path = dir.path().join("output");
535
536 let mut sink = FileSink::new();
537 let mut config = ConnectorConfig::new("files");
538 config.set("path", out_path.to_str().unwrap());
539 config.set("format", "json");
540
541 sink.open(&config).await.unwrap();
542 assert!(sink.is_open);
543 assert!(out_path.exists());
544 sink.close().await.unwrap();
545 }
546
547 #[tokio::test]
548 async fn test_sink_rolling_json_lifecycle() {
549 let dir = tempfile::tempdir().unwrap();
550 let out_path = dir.path().join("output");
551
552 let mut sink = FileSink::new();
553 let mut config = ConnectorConfig::new("files");
554 config.set("path", out_path.to_str().unwrap());
555 config.set("format", "json");
556
557 sink.open(&config).await.unwrap();
558 sink.begin_epoch(1).await.unwrap();
559
560 let schema = test_schema();
561 let batch = test_batch(&schema);
562 let result = sink.write_batch(&batch).await.unwrap();
563 assert_eq!(result.records_written, 3);
564
565 sink.pre_commit(1).await.unwrap();
566 sink.commit_epoch(1).await.unwrap();
567
568 let files: Vec<_> = std::fs::read_dir(&out_path)
570 .unwrap()
571 .flatten()
572 .filter(|e| {
573 let name = e.file_name();
574 let n = name.to_str().unwrap();
575 !n.ends_with(".tmp")
576 })
577 .collect();
578 assert!(!files.is_empty(), "expected committed file in output dir");
579
580 sink.close().await.unwrap();
581 }
582
583 #[tokio::test]
584 async fn test_sink_rollback_deletes_tmp() {
585 let dir = tempfile::tempdir().unwrap();
586 let out_path = dir.path().join("output");
587
588 let mut sink = FileSink::new();
589 let mut config = ConnectorConfig::new("files");
590 config.set("path", out_path.to_str().unwrap());
591 config.set("format", "json");
592
593 sink.open(&config).await.unwrap();
594 sink.begin_epoch(1).await.unwrap();
595
596 let schema = test_schema();
597 let batch = test_batch(&schema);
598 sink.write_batch(&batch).await.unwrap();
599
600 sink.rollback_epoch(1).await.unwrap();
601
602 let tmp_count = std::fs::read_dir(&out_path)
603 .unwrap()
604 .flatten()
605 .filter(|e| e.file_name().to_str().unwrap().ends_with(".tmp"))
606 .count();
607 assert_eq!(
608 tmp_count, 0,
609 "tmp files should be cleaned up after rollback"
610 );
611
612 sink.close().await.unwrap();
613 }
614
615 #[tokio::test]
616 async fn test_cleanup_tmp_on_open() {
617 let dir = tempfile::tempdir().unwrap();
618 let out_path = dir.path().join("output");
619 std::fs::create_dir_all(&out_path).unwrap();
620
621 std::fs::write(out_path.join("part_000001_000.jsonl.tmp"), b"orphan").unwrap();
623 std::fs::write(out_path.join("other_000001_000.jsonl.tmp"), b"keep").unwrap();
624
625 let mut sink = FileSink::new();
626 let mut config = ConnectorConfig::new("files");
627 config.set("path", out_path.to_str().unwrap());
628 config.set("format", "json");
629
630 sink.open(&config).await.unwrap();
631
632 assert!(!out_path.join("part_000001_000.jsonl.tmp").exists());
634 assert!(out_path.join("other_000001_000.jsonl.tmp").exists());
635
636 sink.close().await.unwrap();
637 }
638
639 #[tokio::test]
640 async fn test_capabilities() {
641 let sink = FileSink::new();
642 let caps = sink.capabilities();
643 assert!(caps.exactly_once);
644 assert!(caps.two_phase_commit);
645 }
646}