laminar_connectors/websocket/
serializer.rs1use arrow_array::RecordBatch;
7use arrow_cast::display::{ArrayFormatter, FormatOptions};
8
9use crate::error::ConnectorError;
10
11use super::sink_config::SinkFormat;
12
13pub struct BatchSerializer {
15 format: SinkFormat,
17}
18
19impl BatchSerializer {
20 #[must_use]
22 pub fn new(format: SinkFormat) -> Self {
23 Self { format }
24 }
25
26 #[must_use]
28 pub fn format(&self) -> &SinkFormat {
29 &self.format
30 }
31
32 pub fn serialize_to_json(
41 &self,
42 batch: &RecordBatch,
43 ) -> Result<serde_json::Value, ConnectorError> {
44 let schema = batch.schema();
45 let num_rows = batch.num_rows();
46 let num_cols = batch.num_columns();
47
48 let formatters: Vec<ArrayFormatter<'_>> = (0..num_cols)
49 .map(|i| {
50 ArrayFormatter::try_new(batch.column(i), &FormatOptions::default()).map_err(|e| {
51 ConnectorError::Internal(format!(
52 "failed to create formatter for column {i}: {e}"
53 ))
54 })
55 })
56 .collect::<Result<Vec<_>, _>>()?;
57
58 let mut rows = Vec::with_capacity(num_rows);
59
60 for row in 0..num_rows {
61 let mut obj = serde_json::Map::with_capacity(num_cols);
62 for (col, formatter) in formatters.iter().enumerate() {
63 let field = schema.field(col);
64 if batch.column(col).is_null(row) {
65 obj.insert(field.name().clone(), serde_json::Value::Null);
66 } else {
67 let value_str = formatter.value(row).to_string();
68 let json_val = if let Ok(n) = value_str.parse::<i64>() {
70 serde_json::Value::Number(n.into())
71 } else if let Ok(n) = value_str.parse::<f64>() {
72 serde_json::Number::from_f64(n).map_or_else(
73 || serde_json::Value::String(value_str.clone()),
74 serde_json::Value::Number,
75 )
76 } else if value_str == "true" {
77 serde_json::Value::Bool(true)
78 } else if value_str == "false" {
79 serde_json::Value::Bool(false)
80 } else {
81 serde_json::Value::String(value_str)
82 };
83 obj.insert(field.name().clone(), json_val);
84 }
85 }
86 rows.push(serde_json::Value::Object(obj));
87 }
88
89 Ok(serde_json::Value::Array(rows))
90 }
91
92 pub fn serialize_rows(&self, batch: &RecordBatch) -> Result<Vec<String>, ConnectorError> {
100 match self.format {
101 SinkFormat::Json | SinkFormat::JsonLines => {
102 let json_val = self.serialize_to_json(batch)?;
103 match json_val {
104 serde_json::Value::Array(rows) => rows
105 .into_iter()
106 .map(|v| {
107 serde_json::to_string(&v).map_err(|e| {
108 ConnectorError::Serde(crate::error::SerdeError::Json(e.to_string()))
109 })
110 })
111 .collect(),
112 _ => Err(ConnectorError::Internal(
113 "expected array from serialize_to_json".into(),
114 )),
115 }
116 }
117 SinkFormat::ArrowIpc | SinkFormat::Binary => {
118 let json_val = self.serialize_to_json(batch)?;
120 let s = serde_json::to_string(&json_val).map_err(|e| {
121 ConnectorError::Serde(crate::error::SerdeError::Json(e.to_string()))
122 })?;
123 Ok(vec![s])
124 }
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use arrow_array::{Int64Array, StringArray};
133 use arrow_schema::{DataType, Field, Schema};
134 use std::sync::Arc;
135
136 fn test_batch() -> RecordBatch {
137 let schema = Arc::new(Schema::new(vec![
138 Field::new("id", DataType::Int64, false),
139 Field::new("name", DataType::Utf8, false),
140 ]));
141 RecordBatch::try_new(
142 schema,
143 vec![
144 Arc::new(Int64Array::from(vec![1, 2, 3])),
145 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
146 ],
147 )
148 .unwrap()
149 }
150
151 #[test]
152 fn test_serialize_to_json() {
153 let serializer = BatchSerializer::new(SinkFormat::Json);
154 let batch = test_batch();
155 let json = serializer.serialize_to_json(&batch).unwrap();
156
157 let arr = json.as_array().unwrap();
158 assert_eq!(arr.len(), 3);
159 assert_eq!(arr[0]["id"], 1);
160 assert_eq!(arr[0]["name"], "Alice");
161 assert_eq!(arr[2]["name"], "Charlie");
162 }
163
164 #[test]
165 fn test_serialize_rows() {
166 let serializer = BatchSerializer::new(SinkFormat::Json);
167 let batch = test_batch();
168 let rows = serializer.serialize_rows(&batch).unwrap();
169
170 assert_eq!(rows.len(), 3);
171 let parsed: serde_json::Value = serde_json::from_str(&rows[0]).unwrap();
172 assert_eq!(parsed["id"], 1);
173 assert_eq!(parsed["name"], "Alice");
174 }
175
176 #[test]
177 fn test_serialize_empty_batch() {
178 let serializer = BatchSerializer::new(SinkFormat::Json);
179 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
180 let batch = RecordBatch::new_empty(schema);
181 let json = serializer.serialize_to_json(&batch).unwrap();
182 assert_eq!(json.as_array().unwrap().len(), 0);
183 }
184
185 #[test]
186 fn test_serialize_with_nulls() {
187 let schema = Arc::new(Schema::new(vec![
188 Field::new("id", DataType::Int64, true),
189 Field::new("name", DataType::Utf8, true),
190 ]));
191 let batch = RecordBatch::try_new(
192 schema,
193 vec![
194 Arc::new(Int64Array::from(vec![Some(1), None])),
195 Arc::new(StringArray::from(vec![Some("Alice"), None])),
196 ],
197 )
198 .unwrap();
199
200 let serializer = BatchSerializer::new(SinkFormat::Json);
201 let json = serializer.serialize_to_json(&batch).unwrap();
202 let arr = json.as_array().unwrap();
203 assert!(arr[1]["id"].is_null());
204 assert!(arr[1]["name"].is_null());
205 }
206}