laminar_connectors/files/
text_decoder.rs1use std::sync::Arc;
7
8use arrow_array::{RecordBatch, StringArray};
9use arrow_schema::{DataType, Field, Schema, SchemaRef};
10
11use crate::schema::error::{SchemaError, SchemaResult};
12use crate::schema::traits::FormatDecoder;
13use crate::schema::types::RawRecord;
14
15#[derive(Debug)]
17pub struct TextLineDecoder {
18 schema: SchemaRef,
19}
20
21impl TextLineDecoder {
22 #[must_use]
24 pub fn new() -> Self {
25 let schema = Arc::new(Schema::new(vec![Field::new("line", DataType::Utf8, false)]));
26 Self { schema }
27 }
28}
29
30impl Default for TextLineDecoder {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl FormatDecoder for TextLineDecoder {
37 fn output_schema(&self) -> SchemaRef {
38 self.schema.clone()
39 }
40
41 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
42 if records.is_empty() {
43 return Ok(RecordBatch::new_empty(self.schema.clone()));
44 }
45
46 let mut texts = Vec::with_capacity(records.len());
48 for record in records {
49 let text = std::str::from_utf8(&record.value).map_err(|e| {
50 SchemaError::DecodeError(format!("invalid UTF-8 in text file: {e}"))
51 })?;
52 texts.push(text);
53 }
54
55 let lines: Vec<&str> = texts
56 .iter()
57 .flat_map(|t| t.lines())
58 .filter(|l| !l.is_empty())
59 .collect();
60 let array = StringArray::from_iter_values(lines);
61 RecordBatch::try_new(self.schema.clone(), vec![Arc::new(array)])
62 .map_err(|e| SchemaError::DecodeError(format!("batch construction error: {e}")))
63 }
64
65 fn format_name(&self) -> &str {
66 "text"
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use arrow_array::cast::AsArray;
74
75 #[test]
76 fn test_empty_input() {
77 let decoder = TextLineDecoder::new();
78 let batch = decoder.decode_batch(&[]).unwrap();
79 assert_eq!(batch.num_rows(), 0);
80 assert_eq!(batch.schema().fields().len(), 1);
81 assert_eq!(batch.schema().field(0).name(), "line");
82 }
83
84 #[test]
85 fn test_single_record_multiple_lines() {
86 let decoder = TextLineDecoder::new();
87 let record = RawRecord::new(b"hello\nworld\nfoo".to_vec());
88 let batch = decoder.decode_batch(&[record]).unwrap();
89 assert_eq!(batch.num_rows(), 3);
90 let col = batch.column(0).as_string::<i32>();
91 assert_eq!(col.value(0), "hello");
92 assert_eq!(col.value(1), "world");
93 assert_eq!(col.value(2), "foo");
94 }
95
96 #[test]
97 fn test_skips_empty_lines() {
98 let decoder = TextLineDecoder::new();
99 let record = RawRecord::new(b"a\n\nb\n".to_vec());
100 let batch = decoder.decode_batch(&[record]).unwrap();
101 assert_eq!(batch.num_rows(), 2);
102 }
103
104 #[test]
105 fn test_multiple_records() {
106 let decoder = TextLineDecoder::new();
107 let r1 = RawRecord::new(b"line1\nline2".to_vec());
108 let r2 = RawRecord::new(b"line3".to_vec());
109 let batch = decoder.decode_batch(&[r1, r2]).unwrap();
110 assert_eq!(batch.num_rows(), 3);
111 }
112
113 #[test]
114 fn test_format_name() {
115 let decoder = TextLineDecoder::new();
116 assert_eq!(decoder.format_name(), "text");
117 }
118
119 #[test]
120 fn test_schema() {
121 let decoder = TextLineDecoder::new();
122 let schema = decoder.output_schema();
123 assert_eq!(schema.fields().len(), 1);
124 assert_eq!(schema.field(0).name(), "line");
125 assert_eq!(schema.field(0).data_type(), &DataType::Utf8);
126 }
127}