laminar_connectors/files/
sink.rs1use std::io::{BufWriter, Write};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use arrow_array::RecordBatch;
15use arrow_schema::SchemaRef;
16use async_trait::async_trait;
17use tracing::{debug, info};
18
19use crate::config::ConnectorConfig;
20use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
21use crate::error::ConnectorError;
22use crate::health::HealthStatus;
23use crate::schema::traits::FormatEncoder;
24
25use super::config::{FileFormat, FileSinkConfig, SinkMode};
26
27pub struct FileSink {
29 config: Option<FileSinkConfig>,
31 schema: SchemaRef,
33 encoder: Option<Box<dyn FormatEncoder>>,
35 current_epoch: u64,
37 epoch_batches: Vec<RecordBatch>,
39 current_segment: usize,
41 segment_bytes: u64,
43 writer: Option<BufWriter<std::fs::File>>,
45 active_tmp_files: Vec<PathBuf>,
47 is_open: bool,
49}
50
51impl FileSink {
52 #[must_use]
54 pub fn new() -> Self {
55 Self {
56 config: None,
57 schema: Arc::new(arrow_schema::Schema::empty()),
58 encoder: None,
59 current_epoch: 0,
60 epoch_batches: Vec::new(),
61 current_segment: 0,
62 segment_bytes: 0,
63 writer: None,
64 active_tmp_files: Vec::new(),
65 is_open: false,
66 }
67 }
68
69 fn open_segment(&mut self) -> Result<(), ConnectorError> {
71 let config = self
72 .config
73 .as_ref()
74 .ok_or_else(|| ConnectorError::InvalidState {
75 expected: "configured".into(),
76 actual: "unconfigured".into(),
77 })?;
78
79 let filename = format!(
80 "{}_{:06}_{:03}.{}.tmp",
81 config.prefix,
82 self.current_epoch,
83 self.current_segment,
84 config.format.extension()
85 );
86 let path = Path::new(&config.path).join(&filename);
87
88 let file = std::fs::OpenOptions::new()
89 .create(true)
90 .write(true)
91 .truncate(true)
92 .open(&path)
93 .map_err(|e| {
94 ConnectorError::WriteError(format!("cannot open '{}': {e}", path.display()))
95 })?;
96
97 self.writer = Some(BufWriter::new(file));
98 self.active_tmp_files.push(path);
99 self.segment_bytes = 0;
100 Ok(())
101 }
102
103 fn ensure_writer(&mut self) -> Result<(), ConnectorError> {
105 if self.writer.is_none() {
106 self.open_segment()?;
107 }
108 Ok(())
109 }
110
111 fn close_writer(&mut self) -> Result<(), ConnectorError> {
113 if let Some(mut w) = self.writer.take() {
114 w.flush()
115 .map_err(|e| ConnectorError::WriteError(format!("flush error: {e}")))?;
116 }
117 Ok(())
118 }
119}
120
121impl Default for FileSink {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127impl std::fmt::Debug for FileSink {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("FileSink")
130 .field("is_open", &self.is_open)
131 .field("current_epoch", &self.current_epoch)
132 .field("epoch_batches", &self.epoch_batches.len())
133 .field("active_tmp_files", &self.active_tmp_files.len())
134 .finish()
135 }
136}
137
138#[async_trait]
139impl SinkConnector for FileSink {
140 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
141 let sink_config = FileSinkConfig::from_connector_config(config)?;
142
143 let out_dir = Path::new(&sink_config.path);
145 let out_dir_owned = out_dir.to_path_buf();
146 let prefix = sink_config.prefix.clone();
147 tokio::task::spawn_blocking(move || {
148 if !out_dir_owned.exists() {
149 std::fs::create_dir_all(&out_dir_owned)?;
150 }
151 cleanup_tmp_files(&out_dir_owned, &prefix);
152 Ok::<(), std::io::Error>(())
153 })
154 .await
155 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
156 .map_err(|e| ConnectorError::WriteError(format!("cannot init output dir: {e}")))?;
157
158 let schema = config
160 .arrow_schema()
161 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()));
162
163 let encoder = build_encoder(sink_config.format, &schema, &sink_config)?;
164
165 self.config = Some(sink_config);
166 self.schema = schema;
167 self.encoder = Some(encoder);
168 self.is_open = true;
169
170 info!("file sink opened");
171 Ok(())
172 }
173
174 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
175 let is_bulk = self
176 .config
177 .as_ref()
178 .ok_or_else(|| ConnectorError::InvalidState {
179 expected: "open".into(),
180 actual: "closed".into(),
181 })?
182 .format
183 .is_bulk_format();
184 let max_file_size = self.config.as_ref().and_then(|c| c.max_file_size);
185
186 if batch.num_rows() == 0 {
187 return Ok(WriteResult::new(0, 0));
188 }
189
190 let rows = batch.num_rows();
191
192 if is_bulk {
193 self.epoch_batches.push(batch.clone());
195 return Ok(WriteResult::new(rows, 0));
196 }
197
198 let encoder = self
200 .encoder
201 .as_ref()
202 .ok_or_else(|| ConnectorError::InvalidState {
203 expected: "encoder ready".into(),
204 actual: "no encoder".into(),
205 })?;
206
207 let encoded = encoder
208 .encode_batch(batch)
209 .map_err(|e| ConnectorError::WriteError(format!("encode error: {e}")))?;
210
211 self.ensure_writer()?;
212 let writer = self.writer.as_mut().unwrap();
213
214 let mut bytes_written: u64 = 0;
215 for record_bytes in &encoded {
216 writer
217 .write_all(record_bytes)
218 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
219 writer
220 .write_all(b"\n")
221 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
222 bytes_written += record_bytes.len() as u64 + 1;
223 }
224
225 self.segment_bytes += bytes_written;
226
227 if let Some(max_size) = max_file_size {
229 if self.segment_bytes >= max_size as u64 {
230 debug!("file sink: rotating at {} bytes", self.segment_bytes);
231 self.close_writer()?;
232 self.current_segment += 1;
233 }
235 }
236
237 Ok(WriteResult::new(rows, bytes_written))
238 }
239
240 fn schema(&self) -> SchemaRef {
241 self.schema.clone()
242 }
243
244 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
245 self.current_epoch = epoch;
246 self.epoch_batches.clear();
247 self.current_segment = 0;
248 self.segment_bytes = 0;
249 self.close_writer()?;
250 self.active_tmp_files.clear();
251 Ok(())
252 }
253
254 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
255 let is_bulk = self
256 .config
257 .as_ref()
258 .map_or(false, |c| c.format.is_bulk_format());
259
260 if is_bulk && !self.epoch_batches.is_empty() {
261 let encoder = self
263 .encoder
264 .as_ref()
265 .ok_or_else(|| ConnectorError::InvalidState {
266 expected: "encoder ready".into(),
267 actual: "no encoder".into(),
268 })?;
269
270 let combined = if self.epoch_batches.len() == 1 {
271 self.epoch_batches[0].clone()
272 } else {
273 arrow_select::concat::concat_batches(&self.schema, &self.epoch_batches)
274 .map_err(|e| ConnectorError::WriteError(format!("batch concat error: {e}")))?
275 };
276
277 let encoded = encoder
278 .encode_batch(&combined)
279 .map_err(|e| ConnectorError::WriteError(format!("Parquet encode error: {e}")))?;
280
281 if let Some(file_bytes) = encoded.first() {
283 self.open_segment()?;
284 let writer = self.writer.as_mut().unwrap();
285 writer
286 .write_all(file_bytes)
287 .map_err(|e| ConnectorError::WriteError(format!("write error: {e}")))?;
288 }
289 self.epoch_batches.clear();
290 }
291
292 self.close_writer()?;
294
295 let paths: Vec<PathBuf> = self.active_tmp_files.clone();
298 tokio::task::spawn_blocking(move || {
299 for path in &paths {
300 if path.exists() {
301 let f = std::fs::OpenOptions::new()
302 .write(true)
303 .open(path)
304 .map_err(|e| {
305 ConnectorError::WriteError(format!(
306 "cannot open '{}' for fsync: {e}",
307 path.display()
308 ))
309 })?;
310 f.sync_all().map_err(|e| {
311 ConnectorError::WriteError(format!(
312 "fsync failed on '{}': {e}",
313 path.display()
314 ))
315 })?;
316 }
317 }
318 Ok::<(), ConnectorError>(())
319 })
320 .await
321 .map_err(|e| ConnectorError::WriteError(format!("spawn_blocking failed: {e}")))?
322 }
323
324 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
325 let is_append = self
326 .config
327 .as_ref()
328 .map_or(false, |c| c.mode == SinkMode::Append);
329
330 if is_append {
331 return Ok(());
332 }
333
334 for tmp_path in &self.active_tmp_files {
336 if !tmp_path.exists() {
337 continue;
338 }
339 let final_name = tmp_path
340 .file_name()
341 .and_then(|n| n.to_str())
342 .and_then(|n| n.strip_suffix(".tmp"))
343 .ok_or_else(|| {
344 ConnectorError::WriteError(format!(
345 "tmp file '{}' has no .tmp suffix — cannot commit",
346 tmp_path.display()
347 ))
348 })?;
349 let final_path = tmp_path.parent().unwrap_or(Path::new(".")).join(final_name);
350
351 std::fs::rename(tmp_path, &final_path).map_err(|e| {
352 ConnectorError::WriteError(format!(
353 "cannot rename '{}' -> '{}': {e}",
354 tmp_path.display(),
355 final_path.display()
356 ))
357 })?;
358 debug!("file sink: committed {}", final_path.display());
359 }
360
361 self.active_tmp_files.clear();
362 Ok(())
363 }
364
365 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
366 self.close_writer()?;
367 for tmp_path in &self.active_tmp_files {
368 if tmp_path.exists() {
369 let _ = std::fs::remove_file(tmp_path);
370 debug!("file sink: rolled back {}", tmp_path.display());
371 }
372 }
373 self.active_tmp_files.clear();
374 self.epoch_batches.clear();
375 Ok(())
376 }
377
378 fn health_check(&self) -> HealthStatus {
379 if self.is_open {
380 HealthStatus::Healthy
381 } else {
382 HealthStatus::Unknown
383 }
384 }
385
386 fn capabilities(&self) -> SinkConnectorCapabilities {
387 SinkConnectorCapabilities::default()
388 .with_exactly_once()
389 .with_two_phase_commit()
390 }
391
392 async fn close(&mut self) -> Result<(), ConnectorError> {
393 self.close_writer()?;
394 for tmp_path in &self.active_tmp_files {
395 if tmp_path.exists() {
396 let _ = std::fs::remove_file(tmp_path);
397 }
398 }
399 self.active_tmp_files.clear();
400 self.is_open = false;
401 info!("file sink closed");
402 Ok(())
403 }
404}
405
406fn build_encoder(
409 format: FileFormat,
410 schema: &SchemaRef,
411 config: &FileSinkConfig,
412) -> Result<Box<dyn FormatEncoder>, ConnectorError> {
413 match format {
414 FileFormat::Csv | FileFormat::Json | FileFormat::Text => {
415 let encoder = crate::schema::JsonEncoder::new(schema.clone());
419 Ok(Box::new(encoder))
420 }
421 FileFormat::Parquet => {
422 use parquet::basic::Compression;
423 let compression = match config.compression.to_lowercase().as_str() {
424 "none" | "uncompressed" => Compression::UNCOMPRESSED,
425 "snappy" => Compression::SNAPPY,
426 "gzip" => Compression::GZIP(parquet::basic::GzipLevel::default()),
427 "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
428 "lz4" => Compression::LZ4,
429 other => {
430 return Err(ConnectorError::ConfigurationError(format!(
431 "unknown Parquet compression: '{other}'"
432 )));
433 }
434 };
435 let parquet_config = crate::schema::parquet::ParquetEncoderConfig::default()
436 .with_compression(compression);
437 let encoder =
438 crate::schema::parquet::ParquetEncoder::with_config(schema.clone(), parquet_config);
439 Ok(Box::new(encoder))
440 }
441 }
442}
443
444fn cleanup_tmp_files(dir: &Path, prefix: &str) {
446 let entries = match std::fs::read_dir(dir) {
447 Ok(e) => e,
448 Err(_) => return,
449 };
450 for entry in entries.flatten() {
451 let path = entry.path();
452 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
453 if name.starts_with(prefix) && name.ends_with(".tmp") {
454 info!("file sink: removing orphaned tmp file: {}", path.display());
455 let _ = std::fs::remove_file(&path);
456 }
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use arrow_array::{Int64Array, StringArray};
465 use arrow_schema::{DataType, Field, Schema};
466
467 fn test_schema() -> SchemaRef {
468 Arc::new(Schema::new(vec![
469 Field::new("id", DataType::Int64, false),
470 Field::new("name", DataType::Utf8, true),
471 ]))
472 }
473
474 fn test_batch(schema: &SchemaRef) -> RecordBatch {
475 RecordBatch::try_new(
476 schema.clone(),
477 vec![
478 Arc::new(Int64Array::from(vec![1, 2, 3])),
479 Arc::new(StringArray::from(vec!["a", "b", "c"])),
480 ],
481 )
482 .unwrap()
483 }
484
485 #[test]
486 fn test_sink_default() {
487 let sink = FileSink::new();
488 assert!(!sink.is_open);
489 assert_eq!(sink.current_epoch, 0);
490 }
491
492 #[tokio::test]
493 async fn test_sink_open_creates_dir() {
494 let dir = tempfile::tempdir().unwrap();
495 let out_path = dir.path().join("output");
496
497 let mut sink = FileSink::new();
498 let mut config = ConnectorConfig::new("files");
499 config.set("path", out_path.to_str().unwrap());
500 config.set("format", "json");
501
502 sink.open(&config).await.unwrap();
503 assert!(sink.is_open);
504 assert!(out_path.exists());
505 sink.close().await.unwrap();
506 }
507
508 #[tokio::test]
509 async fn test_sink_rolling_json_lifecycle() {
510 let dir = tempfile::tempdir().unwrap();
511 let out_path = dir.path().join("output");
512
513 let mut sink = FileSink::new();
514 let mut config = ConnectorConfig::new("files");
515 config.set("path", out_path.to_str().unwrap());
516 config.set("format", "json");
517
518 sink.open(&config).await.unwrap();
519 sink.begin_epoch(1).await.unwrap();
520
521 let schema = test_schema();
522 let batch = test_batch(&schema);
523 let result = sink.write_batch(&batch).await.unwrap();
524 assert_eq!(result.records_written, 3);
525
526 sink.pre_commit(1).await.unwrap();
527 sink.commit_epoch(1).await.unwrap();
528
529 let files: Vec<_> = std::fs::read_dir(&out_path)
531 .unwrap()
532 .flatten()
533 .filter(|e| {
534 let name = e.file_name();
535 let n = name.to_str().unwrap();
536 !n.ends_with(".tmp")
537 })
538 .collect();
539 assert!(!files.is_empty(), "expected committed file in output dir");
540
541 sink.close().await.unwrap();
542 }
543
544 #[tokio::test]
545 async fn test_sink_rollback_deletes_tmp() {
546 let dir = tempfile::tempdir().unwrap();
547 let out_path = dir.path().join("output");
548
549 let mut sink = FileSink::new();
550 let mut config = ConnectorConfig::new("files");
551 config.set("path", out_path.to_str().unwrap());
552 config.set("format", "json");
553
554 sink.open(&config).await.unwrap();
555 sink.begin_epoch(1).await.unwrap();
556
557 let schema = test_schema();
558 let batch = test_batch(&schema);
559 sink.write_batch(&batch).await.unwrap();
560
561 sink.rollback_epoch(1).await.unwrap();
562
563 let tmp_count = std::fs::read_dir(&out_path)
564 .unwrap()
565 .flatten()
566 .filter(|e| e.file_name().to_str().unwrap().ends_with(".tmp"))
567 .count();
568 assert_eq!(
569 tmp_count, 0,
570 "tmp files should be cleaned up after rollback"
571 );
572
573 sink.close().await.unwrap();
574 }
575
576 #[tokio::test]
577 async fn test_cleanup_tmp_on_open() {
578 let dir = tempfile::tempdir().unwrap();
579 let out_path = dir.path().join("output");
580 std::fs::create_dir_all(&out_path).unwrap();
581
582 std::fs::write(out_path.join("part_000001_000.jsonl.tmp"), b"orphan").unwrap();
584 std::fs::write(out_path.join("other_000001_000.jsonl.tmp"), b"keep").unwrap();
585
586 let mut sink = FileSink::new();
587 let mut config = ConnectorConfig::new("files");
588 config.set("path", out_path.to_str().unwrap());
589 config.set("format", "json");
590
591 sink.open(&config).await.unwrap();
592
593 assert!(!out_path.join("part_000001_000.jsonl.tmp").exists());
595 assert!(out_path.join("other_000001_000.jsonl.tmp").exists());
596
597 sink.close().await.unwrap();
598 }
599
600 #[tokio::test]
601 async fn test_capabilities() {
602 let sink = FileSink::new();
603 let caps = sink.capabilities();
604 assert!(caps.exactly_once);
605 assert!(caps.two_phase_commit);
606 }
607}