Skip to main content

laminar_connectors/files/
text_decoder.rs

1//! Plain text line decoder implementing [`FormatDecoder`].
2//!
3//! Splits raw bytes by newlines and produces a single-column `RecordBatch`
4//! with column `line: Utf8`.
5
6use std::sync::Arc;
7
8use arrow_array::{RecordBatch, StringArray};
9use arrow_schema::{DataType, Field, Schema, SchemaRef};
10
11use crate::schema::error::{SchemaError, SchemaResult};
12use crate::schema::traits::FormatDecoder;
13use crate::schema::types::RawRecord;
14
15/// Decodes raw bytes as newline-delimited text into a single `line` column.
16#[derive(Debug)]
17pub struct TextLineDecoder {
18    schema: SchemaRef,
19}
20
21impl TextLineDecoder {
22    /// Creates a new text line decoder.
23    #[must_use]
24    pub fn new() -> Self {
25        let schema = Arc::new(Schema::new(vec![Field::new("line", DataType::Utf8, false)]));
26        Self { schema }
27    }
28}
29
30impl Default for TextLineDecoder {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl FormatDecoder for TextLineDecoder {
37    fn output_schema(&self) -> SchemaRef {
38        self.schema.clone()
39    }
40
41    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
42        if records.is_empty() {
43            return Ok(RecordBatch::new_empty(self.schema.clone()));
44        }
45
46        // Validate UTF-8 upfront, then collect &str slices to avoid per-line allocation.
47        let mut texts = Vec::with_capacity(records.len());
48        for record in records {
49            let text = std::str::from_utf8(&record.value).map_err(|e| {
50                SchemaError::DecodeError(format!("invalid UTF-8 in text file: {e}"))
51            })?;
52            texts.push(text);
53        }
54
55        let lines: Vec<&str> = texts
56            .iter()
57            .flat_map(|t| t.lines())
58            .filter(|l| !l.is_empty())
59            .collect();
60        let array = StringArray::from_iter_values(lines);
61        RecordBatch::try_new(self.schema.clone(), vec![Arc::new(array)])
62            .map_err(|e| SchemaError::DecodeError(format!("batch construction error: {e}")))
63    }
64
65    fn format_name(&self) -> &str {
66        "text"
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use arrow_array::cast::AsArray;
74
75    #[test]
76    fn test_empty_input() {
77        let decoder = TextLineDecoder::new();
78        let batch = decoder.decode_batch(&[]).unwrap();
79        assert_eq!(batch.num_rows(), 0);
80        assert_eq!(batch.schema().fields().len(), 1);
81        assert_eq!(batch.schema().field(0).name(), "line");
82    }
83
84    #[test]
85    fn test_single_record_multiple_lines() {
86        let decoder = TextLineDecoder::new();
87        let record = RawRecord::new(b"hello\nworld\nfoo".to_vec());
88        let batch = decoder.decode_batch(&[record]).unwrap();
89        assert_eq!(batch.num_rows(), 3);
90        let col = batch.column(0).as_string::<i32>();
91        assert_eq!(col.value(0), "hello");
92        assert_eq!(col.value(1), "world");
93        assert_eq!(col.value(2), "foo");
94    }
95
96    #[test]
97    fn test_skips_empty_lines() {
98        let decoder = TextLineDecoder::new();
99        let record = RawRecord::new(b"a\n\nb\n".to_vec());
100        let batch = decoder.decode_batch(&[record]).unwrap();
101        assert_eq!(batch.num_rows(), 2);
102    }
103
104    #[test]
105    fn test_multiple_records() {
106        let decoder = TextLineDecoder::new();
107        let r1 = RawRecord::new(b"line1\nline2".to_vec());
108        let r2 = RawRecord::new(b"line3".to_vec());
109        let batch = decoder.decode_batch(&[r1, r2]).unwrap();
110        assert_eq!(batch.num_rows(), 3);
111    }
112
113    #[test]
114    fn test_format_name() {
115        let decoder = TextLineDecoder::new();
116        assert_eq!(decoder.format_name(), "text");
117    }
118
119    #[test]
120    fn test_schema() {
121        let decoder = TextLineDecoder::new();
122        let schema = decoder.output_schema();
123        assert_eq!(schema.fields().len(), 1);
124        assert_eq!(schema.field(0).name(), "line");
125        assert_eq!(schema.field(0).data_type(), &DataType::Utf8);
126    }
127}