laminar_connectors/files/
sink.rs1use std::io::{BufWriter, Write};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::{debug, info};
19
20use crate::config::ConnectorConfig;
21use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
22use crate::error::ConnectorError;
23use crate::schema::traits::FormatEncoder;
24
25use super::config::{FileFormat, FileSinkConfig, SinkMode};
26
27pub struct FileSink {
29 config: Option<FileSinkConfig>,
31 schema: SchemaRef,
33 encoder: Option<Box<dyn FormatEncoder>>,
35 current_epoch: u64,
37 epoch_batches: Vec<RecordBatch>,
39 current_segment: usize,
41 segment_bytes: u64,
43 writer: Option<BufWriter<std::fs::File>>,
45 active_tmp_files: Vec<PathBuf>,
47 is_open: bool,
49}
50
51impl FileSink {
52 #[must_use]
54 pub fn new() -> Self {
55 Self::with_registry(None)
56 }
57
58 #[must_use]
60 pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
61 Self {
62 config: None,
63 schema: Arc::new(arrow_schema::Schema::empty()),
64 encoder: None,
65 current_epoch: 0,
66 epoch_batches: Vec::new(),
67 current_segment: 0,
68 segment_bytes: 0,
69 writer: None,
70 active_tmp_files: Vec::new(),
71 is_open: false,
72 }
73 }
74
75 fn open_segment(&mut self) -> Result<(), ConnectorError> {
77 let config = self
78 .config
79 .as_ref()
80 .ok_or_else(|| ConnectorError::InvalidState {
81 expected: "configured".into(),
82 actual: "unconfigured".into(),
83 })?;
84
85 let filename = format!(
86 "{}_{:06}_{:03}.{}.tmp",
87 config.prefix,
88 self.current_epoch,
89 self.current_segment,
90 config.format.extension()
91 );
92 let path = Path::new(&config.path).join(&filename);
93
94 let file = std::fs::OpenOptions::new()
95 .create(true)
96 .write(true)
97 .truncate(true)
98 .open(&path)
99 .map_err(|e| {
100 ConnectorError::WriteError(format!("cannot open '{}': {e}", path.display()))
101 })?;
102
103 self.writer = Some(BufWriter::new(file));
104 self.active_tmp_files.push(path);
105 self.segment_bytes = 0;
106 Ok(())
107 }
108
109 fn ensure_writer(&mut self) -> Result<(), ConnectorError> {
111 if self.writer.is_none() {
112 self.open_segment()?;
113 }
114 Ok(())
115 }
116
117 async fn close_writer_async(&mut self) -> Result<(), ConnectorError> {
120 let Some(writer) = self.writer.take() else {
121 return Ok(());
122 };
123 tokio::task::spawn_blocking(move || -> Result<(), ConnectorError> {
124 let mut w = writer;
125 w.flush()
126 .map_err(|e| ConnectorError::WriteError(format!("flush error: {e}")))
127 })
128 .await
129 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
130 }
131}
132
133impl Default for FileSink {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl std::fmt::Debug for FileSink {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("FileSink")
142 .field("is_open", &self.is_open)
143 .field("current_epoch", &self.current_epoch)
144 .field("epoch_batches", &self.epoch_batches.len())
145 .field("active_tmp_files", &self.active_tmp_files.len())
146 .finish()
147 }
148}
149
150#[async_trait]
151impl SinkConnector for FileSink {
152 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
153 let sink_config = FileSinkConfig::from_connector_config(config)?;
154
155 let out_dir = Path::new(&sink_config.path);
157 let out_dir_owned = out_dir.to_path_buf();
158 let prefix = sink_config.prefix.clone();
159 tokio::task::spawn_blocking(move || {
160 if !out_dir_owned.exists() {
161 std::fs::create_dir_all(&out_dir_owned)?;
162 }
163 cleanup_tmp_files(&out_dir_owned, &prefix);
164 Ok::<(), std::io::Error>(())
165 })
166 .await
167 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
168 .map_err(|e| ConnectorError::WriteError(format!("cannot init output dir: {e}")))?;
169
170 let schema = config
172 .arrow_schema()
173 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()));
174
175 let encoder = build_encoder(sink_config.format, &schema, &sink_config)?;
176
177 self.config = Some(sink_config);
178 self.schema = schema;
179 self.encoder = Some(encoder);
180 self.is_open = true;
181
182 info!("file sink opened");
183 Ok(())
184 }
185
186 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
187 let is_bulk = self
188 .config
189 .as_ref()
190 .ok_or_else(|| ConnectorError::InvalidState {
191 expected: "open".into(),
192 actual: "closed".into(),
193 })?
194 .format
195 .is_bulk_format();
196 let max_file_size = self.config.as_ref().and_then(|c| c.max_file_size);
197
198 if batch.num_rows() == 0 {
199 return Ok(WriteResult::new(0, 0));
200 }
201
202 let rows = batch.num_rows();
203
204 if is_bulk {
205 let max_epoch = self.config.as_ref().map_or(10_000, |c| c.max_epoch_batches);
206 if self.epoch_batches.len() >= max_epoch {
207 return Err(ConnectorError::WriteError(format!(
208 "file sink: epoch batch buffer full ({max_epoch} batches) — \
209 increase max_epoch_batches or flush more frequently"
210 )));
211 }
212 self.epoch_batches.push(batch.clone());
214 return Ok(WriteResult::new(rows, 0));
215 }
216
217 let encoded = self
220 .encoder
221 .as_ref()
222 .ok_or_else(|| ConnectorError::InvalidState {
223 expected: "encoder ready".into(),
224 actual: "no encoder".into(),
225 })?
226 .encode_batch(batch)
227 .map_err(|e| ConnectorError::WriteError(format!("encode error: {e}")))?;
228
229 self.ensure_writer()?;
230 let mut writer = self.writer.take().expect("ensure_writer just ran");
231 let (writer, bytes_written) =
232 tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
233 let mut total: u64 = 0;
234 for record_bytes in &encoded {
235 writer
236 .write_all(record_bytes)
237 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
238 writer
239 .write_all(b"\n")
240 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
241 total += record_bytes.len() as u64 + 1;
242 }
243 Ok((writer, total))
244 })
245 .await
246 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
247 self.writer = Some(writer);
248 self.segment_bytes += bytes_written;
249
250 if let Some(max_size) = max_file_size {
252 if self.segment_bytes >= max_size as u64 {
253 debug!("file sink: rotating at {} bytes", self.segment_bytes);
254 self.close_writer_async().await?;
255 self.current_segment += 1;
256 }
257 }
258
259 Ok(WriteResult::new(rows, bytes_written))
260 }
261
262 fn schema(&self) -> SchemaRef {
263 self.schema.clone()
264 }
265
266 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
267 self.current_epoch = epoch;
268 self.epoch_batches.clear();
269 self.current_segment = 0;
270 self.segment_bytes = 0;
271 self.close_writer_async().await?;
272 self.active_tmp_files.clear();
273 Ok(())
274 }
275
276 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
277 let is_bulk = self
278 .config
279 .as_ref()
280 .map_or(false, |c| c.format.is_bulk_format());
281
282 if is_bulk && !self.epoch_batches.is_empty() {
283 let encoder = self
285 .encoder
286 .as_ref()
287 .ok_or_else(|| ConnectorError::InvalidState {
288 expected: "encoder ready".into(),
289 actual: "no encoder".into(),
290 })?;
291
292 let combined = if self.epoch_batches.len() == 1 {
293 self.epoch_batches[0].clone()
294 } else {
295 arrow_select::concat::concat_batches(&self.schema, &self.epoch_batches)
296 .map_err(|e| ConnectorError::WriteError(format!("batch concat error: {e}")))?
297 };
298
299 let encoded = encoder
300 .encode_batch(&combined)
301 .map_err(|e| ConnectorError::WriteError(format!("Parquet encode error: {e}")))?;
302
303 if let Some(file_bytes) = encoded.into_iter().next() {
306 self.open_segment()?;
307 let writer = self.writer.take().expect("open_segment just ran");
308 let writer = tokio::task::spawn_blocking(move || -> Result<_, ConnectorError> {
309 let mut w = writer;
310 w.write_all(&file_bytes)
311 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
312 Ok(w)
313 })
314 .await
315 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))??;
316 self.writer = Some(writer);
317 }
318 self.epoch_batches.clear();
319 }
320
321 self.close_writer_async().await?;
323
324 let paths: Vec<PathBuf> = self.active_tmp_files.clone();
327 tokio::task::spawn_blocking(move || {
328 for path in &paths {
329 if path.exists() {
330 let f = std::fs::OpenOptions::new()
331 .write(true)
332 .open(path)
333 .map_err(|e| {
334 ConnectorError::WriteError(format!(
335 "cannot open '{}' for fsync: {e}",
336 path.display()
337 ))
338 })?;
339 f.sync_all().map_err(|e| {
340 ConnectorError::WriteError(format!(
341 "fsync failed on '{}': {e}",
342 path.display()
343 ))
344 })?;
345 }
346 }
347 Ok::<(), ConnectorError>(())
348 })
349 .await
350 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
351 }
352
353 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
354 let is_append = self
355 .config
356 .as_ref()
357 .map_or(false, |c| c.mode == SinkMode::Append);
358
359 if is_append {
360 return Ok(());
361 }
362
363 for tmp_path in &self.active_tmp_files {
365 if !tmp_path.exists() {
366 continue;
367 }
368 let final_name = tmp_path
369 .file_name()
370 .and_then(|n| n.to_str())
371 .and_then(|n| n.strip_suffix(".tmp"))
372 .ok_or_else(|| {
373 ConnectorError::WriteError(format!(
374 "tmp file '{}' has no .tmp suffix — cannot commit",
375 tmp_path.display()
376 ))
377 })?;
378 let final_path = tmp_path.parent().unwrap_or(Path::new(".")).join(final_name);
379
380 std::fs::rename(tmp_path, &final_path).map_err(|e| {
381 ConnectorError::WriteError(format!(
382 "cannot rename '{}' -> '{}': {e}",
383 tmp_path.display(),
384 final_path.display()
385 ))
386 })?;
387 debug!("file sink: committed {}", final_path.display());
388 }
389
390 self.active_tmp_files.clear();
391 Ok(())
392 }
393
394 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
395 self.close_writer_async().await?;
396 for tmp_path in &self.active_tmp_files {
397 if tmp_path.exists() {
398 let _ = std::fs::remove_file(tmp_path);
399 debug!("file sink: rolled back {}", tmp_path.display());
400 }
401 }
402 self.active_tmp_files.clear();
403 self.epoch_batches.clear();
404 Ok(())
405 }
406
407 fn capabilities(&self) -> SinkConnectorCapabilities {
408 SinkConnectorCapabilities::new(Duration::from_secs(30))
409 .with_exactly_once()
410 .with_two_phase_commit()
411 }
412
413 async fn close(&mut self) -> Result<(), ConnectorError> {
414 self.close_writer_async().await?;
415 for tmp_path in &self.active_tmp_files {
416 if tmp_path.exists() {
417 let _ = std::fs::remove_file(tmp_path);
418 }
419 }
420 self.active_tmp_files.clear();
421 self.is_open = false;
422 info!("file sink closed");
423 Ok(())
424 }
425}
426
427fn build_encoder(
430 format: FileFormat,
431 schema: &SchemaRef,
432 config: &FileSinkConfig,
433) -> Result<Box<dyn FormatEncoder>, ConnectorError> {
434 match format {
435 FileFormat::Csv => {
436 let csv_config = crate::schema::CsvEncoderConfig {
437 delimiter: b',',
438 has_header: false,
439 };
440 let encoder = crate::schema::CsvEncoder::with_config(schema.clone(), csv_config);
441 Ok(Box::new(encoder))
442 }
443 FileFormat::Json | FileFormat::Text => {
444 let encoder = crate::schema::JsonEncoder::new(schema.clone());
445 Ok(Box::new(encoder))
446 }
447 FileFormat::Parquet => {
448 use parquet::basic::Compression;
449 let compression = match config.compression.to_lowercase().as_str() {
450 "none" | "uncompressed" => Compression::UNCOMPRESSED,
451 "snappy" => Compression::SNAPPY,
452 "gzip" => Compression::GZIP(parquet::basic::GzipLevel::default()),
453 "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
454 "lz4" => Compression::LZ4,
455 other => {
456 return Err(ConnectorError::ConfigurationError(format!(
457 "unknown Parquet compression: '{other}'"
458 )));
459 }
460 };
461 let parquet_config = crate::schema::parquet::ParquetEncoderConfig::default()
462 .with_compression(compression);
463 let encoder =
464 crate::schema::parquet::ParquetEncoder::with_config(schema.clone(), parquet_config);
465 Ok(Box::new(encoder))
466 }
467 FileFormat::ArrowIpc => {
468 let encoder = super::arrow_ipc_codec::ArrowIpcEncoder::new(schema.clone());
469 Ok(Box::new(encoder))
470 }
471 }
472}
473
474fn cleanup_tmp_files(dir: &Path, prefix: &str) {
476 let entries = match std::fs::read_dir(dir) {
477 Ok(e) => e,
478 Err(_) => return,
479 };
480 for entry in entries.flatten() {
481 let path = entry.path();
482 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
483 if name.starts_with(prefix) && name.ends_with(".tmp") {
484 info!("file sink: removing orphaned tmp file: {}", path.display());
485 let _ = std::fs::remove_file(&path);
486 }
487 }
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use arrow_array::{Int64Array, StringArray};
495 use arrow_schema::{DataType, Field, Schema};
496
497 fn test_schema() -> SchemaRef {
498 Arc::new(Schema::new(vec![
499 Field::new("id", DataType::Int64, false),
500 Field::new("name", DataType::Utf8, true),
501 ]))
502 }
503
504 fn test_batch(schema: &SchemaRef) -> RecordBatch {
505 RecordBatch::try_new(
506 schema.clone(),
507 vec![
508 Arc::new(Int64Array::from(vec![1, 2, 3])),
509 Arc::new(StringArray::from(vec!["a", "b", "c"])),
510 ],
511 )
512 .unwrap()
513 }
514
515 #[test]
516 fn test_sink_default() {
517 let sink = FileSink::new();
518 assert!(!sink.is_open);
519 assert_eq!(sink.current_epoch, 0);
520 }
521
522 #[tokio::test]
523 async fn test_sink_open_creates_dir() {
524 let dir = tempfile::tempdir().unwrap();
525 let out_path = dir.path().join("output");
526
527 let mut sink = FileSink::new();
528 let mut config = ConnectorConfig::new("files");
529 config.set("path", out_path.to_str().unwrap());
530 config.set("format", "json");
531
532 sink.open(&config).await.unwrap();
533 assert!(sink.is_open);
534 assert!(out_path.exists());
535 sink.close().await.unwrap();
536 }
537
538 #[tokio::test]
539 async fn test_sink_rolling_json_lifecycle() {
540 let dir = tempfile::tempdir().unwrap();
541 let out_path = dir.path().join("output");
542
543 let mut sink = FileSink::new();
544 let mut config = ConnectorConfig::new("files");
545 config.set("path", out_path.to_str().unwrap());
546 config.set("format", "json");
547
548 sink.open(&config).await.unwrap();
549 sink.begin_epoch(1).await.unwrap();
550
551 let schema = test_schema();
552 let batch = test_batch(&schema);
553 let result = sink.write_batch(&batch).await.unwrap();
554 assert_eq!(result.records_written, 3);
555
556 sink.pre_commit(1).await.unwrap();
557 sink.commit_epoch(1).await.unwrap();
558
559 let files: Vec<_> = std::fs::read_dir(&out_path)
561 .unwrap()
562 .flatten()
563 .filter(|e| {
564 let name = e.file_name();
565 let n = name.to_str().unwrap();
566 !n.ends_with(".tmp")
567 })
568 .collect();
569 assert!(!files.is_empty(), "expected committed file in output dir");
570
571 sink.close().await.unwrap();
572 }
573
574 #[tokio::test]
575 async fn test_sink_rollback_deletes_tmp() {
576 let dir = tempfile::tempdir().unwrap();
577 let out_path = dir.path().join("output");
578
579 let mut sink = FileSink::new();
580 let mut config = ConnectorConfig::new("files");
581 config.set("path", out_path.to_str().unwrap());
582 config.set("format", "json");
583
584 sink.open(&config).await.unwrap();
585 sink.begin_epoch(1).await.unwrap();
586
587 let schema = test_schema();
588 let batch = test_batch(&schema);
589 sink.write_batch(&batch).await.unwrap();
590
591 sink.rollback_epoch(1).await.unwrap();
592
593 let tmp_count = std::fs::read_dir(&out_path)
594 .unwrap()
595 .flatten()
596 .filter(|e| e.file_name().to_str().unwrap().ends_with(".tmp"))
597 .count();
598 assert_eq!(
599 tmp_count, 0,
600 "tmp files should be cleaned up after rollback"
601 );
602
603 sink.close().await.unwrap();
604 }
605
606 #[tokio::test]
607 async fn test_cleanup_tmp_on_open() {
608 let dir = tempfile::tempdir().unwrap();
609 let out_path = dir.path().join("output");
610 std::fs::create_dir_all(&out_path).unwrap();
611
612 std::fs::write(out_path.join("part_000001_000.jsonl.tmp"), b"orphan").unwrap();
614 std::fs::write(out_path.join("other_000001_000.jsonl.tmp"), b"keep").unwrap();
615
616 let mut sink = FileSink::new();
617 let mut config = ConnectorConfig::new("files");
618 config.set("path", out_path.to_str().unwrap());
619 config.set("format", "json");
620
621 sink.open(&config).await.unwrap();
622
623 assert!(!out_path.join("part_000001_000.jsonl.tmp").exists());
625 assert!(out_path.join("other_000001_000.jsonl.tmp").exists());
626
627 sink.close().await.unwrap();
628 }
629
630 #[tokio::test]
631 async fn test_capabilities() {
632 let sink = FileSink::new();
633 let caps = sink.capabilities();
634 assert!(caps.exactly_once);
635 assert!(caps.two_phase_commit);
636 }
637}