Skip to main content

laminar_connectors/websocket/
serializer.rs

1//! Arrow `RecordBatch` → WebSocket message serialization.
2//!
3//! Converts Arrow data into JSON or binary formats for delivery
4//! to connected WebSocket clients.
5
6use arrow_array::RecordBatch;
7use arrow_cast::display::{ArrayFormatter, FormatOptions};
8
9use crate::error::ConnectorError;
10
11use super::sink_config::SinkFormat;
12
13/// Serializes Arrow `RecordBatch` data into WebSocket message payloads.
14pub struct BatchSerializer {
15    /// Output format.
16    format: SinkFormat,
17}
18
19impl BatchSerializer {
20    /// Creates a new serializer for the given format.
21    #[must_use]
22    pub fn new(format: SinkFormat) -> Self {
23        Self { format }
24    }
25
26    /// Returns the configured format.
27    #[must_use]
28    pub fn format(&self) -> &SinkFormat {
29        &self.format
30    }
31
32    /// Serializes a `RecordBatch` into a JSON value suitable for the subscription protocol.
33    ///
34    /// Returns a `serde_json::Value` array where each element is a JSON object
35    /// representing one row.
36    ///
37    /// # Errors
38    ///
39    /// Returns `ConnectorError` if serialization fails.
40    pub fn serialize_to_json(
41        &self,
42        batch: &RecordBatch,
43    ) -> Result<serde_json::Value, ConnectorError> {
44        let schema = batch.schema();
45        let num_rows = batch.num_rows();
46        let num_cols = batch.num_columns();
47
48        let formatters: Vec<ArrayFormatter<'_>> = (0..num_cols)
49            .map(|i| {
50                ArrayFormatter::try_new(batch.column(i), &FormatOptions::default()).map_err(|e| {
51                    ConnectorError::Internal(format!(
52                        "failed to create formatter for column {i}: {e}"
53                    ))
54                })
55            })
56            .collect::<Result<Vec<_>, _>>()?;
57
58        let mut rows = Vec::with_capacity(num_rows);
59
60        for row in 0..num_rows {
61            let mut obj = serde_json::Map::with_capacity(num_cols);
62            for (col, formatter) in formatters.iter().enumerate() {
63                let field = schema.field(col);
64                if batch.column(col).is_null(row) {
65                    obj.insert(field.name().clone(), serde_json::Value::Null);
66                } else {
67                    let value_str = formatter.value(row).to_string();
68                    // Try to parse as number or boolean for cleaner JSON
69                    let json_val = if let Ok(n) = value_str.parse::<i64>() {
70                        serde_json::Value::Number(n.into())
71                    } else if let Ok(n) = value_str.parse::<f64>() {
72                        serde_json::Number::from_f64(n).map_or_else(
73                            || serde_json::Value::String(value_str.clone()),
74                            serde_json::Value::Number,
75                        )
76                    } else if value_str == "true" {
77                        serde_json::Value::Bool(true)
78                    } else if value_str == "false" {
79                        serde_json::Value::Bool(false)
80                    } else {
81                        serde_json::Value::String(value_str)
82                    };
83                    obj.insert(field.name().clone(), json_val);
84                }
85            }
86            rows.push(serde_json::Value::Object(obj));
87        }
88
89        Ok(serde_json::Value::Array(rows))
90    }
91
92    /// Serializes a `RecordBatch` into per-row JSON strings.
93    ///
94    /// Each returned `String` is a complete JSON object for one row.
95    ///
96    /// # Errors
97    ///
98    /// Returns `ConnectorError` if serialization fails.
99    pub fn serialize_rows(&self, batch: &RecordBatch) -> Result<Vec<String>, ConnectorError> {
100        match self.format {
101            SinkFormat::Json | SinkFormat::JsonLines => {
102                let json_val = self.serialize_to_json(batch)?;
103                match json_val {
104                    serde_json::Value::Array(rows) => rows
105                        .into_iter()
106                        .map(|v| {
107                            serde_json::to_string(&v).map_err(|e| {
108                                ConnectorError::Serde(crate::error::SerdeError::Json(e.to_string()))
109                            })
110                        })
111                        .collect(),
112                    _ => Err(ConnectorError::Internal(
113                        "expected array from serialize_to_json".into(),
114                    )),
115                }
116            }
117            SinkFormat::ArrowIpc | SinkFormat::Binary => {
118                // For binary formats, serialize the entire batch as one message
119                let json_val = self.serialize_to_json(batch)?;
120                let s = serde_json::to_string(&json_val).map_err(|e| {
121                    ConnectorError::Serde(crate::error::SerdeError::Json(e.to_string()))
122                })?;
123                Ok(vec![s])
124            }
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use arrow_array::{Int64Array, StringArray};
133    use arrow_schema::{DataType, Field, Schema};
134    use std::sync::Arc;
135
136    fn test_batch() -> RecordBatch {
137        let schema = Arc::new(Schema::new(vec![
138            Field::new("id", DataType::Int64, false),
139            Field::new("name", DataType::Utf8, false),
140        ]));
141        RecordBatch::try_new(
142            schema,
143            vec![
144                Arc::new(Int64Array::from(vec![1, 2, 3])),
145                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
146            ],
147        )
148        .unwrap()
149    }
150
151    #[test]
152    fn test_serialize_to_json() {
153        let serializer = BatchSerializer::new(SinkFormat::Json);
154        let batch = test_batch();
155        let json = serializer.serialize_to_json(&batch).unwrap();
156
157        let arr = json.as_array().unwrap();
158        assert_eq!(arr.len(), 3);
159        assert_eq!(arr[0]["id"], 1);
160        assert_eq!(arr[0]["name"], "Alice");
161        assert_eq!(arr[2]["name"], "Charlie");
162    }
163
164    #[test]
165    fn test_serialize_rows() {
166        let serializer = BatchSerializer::new(SinkFormat::Json);
167        let batch = test_batch();
168        let rows = serializer.serialize_rows(&batch).unwrap();
169
170        assert_eq!(rows.len(), 3);
171        let parsed: serde_json::Value = serde_json::from_str(&rows[0]).unwrap();
172        assert_eq!(parsed["id"], 1);
173        assert_eq!(parsed["name"], "Alice");
174    }
175
176    #[test]
177    fn test_serialize_empty_batch() {
178        let serializer = BatchSerializer::new(SinkFormat::Json);
179        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
180        let batch = RecordBatch::new_empty(schema);
181        let json = serializer.serialize_to_json(&batch).unwrap();
182        assert_eq!(json.as_array().unwrap().len(), 0);
183    }
184
185    #[test]
186    fn test_serialize_with_nulls() {
187        let schema = Arc::new(Schema::new(vec![
188            Field::new("id", DataType::Int64, true),
189            Field::new("name", DataType::Utf8, true),
190        ]));
191        let batch = RecordBatch::try_new(
192            schema,
193            vec![
194                Arc::new(Int64Array::from(vec![Some(1), None])),
195                Arc::new(StringArray::from(vec![Some("Alice"), None])),
196            ],
197        )
198        .unwrap();
199
200        let serializer = BatchSerializer::new(SinkFormat::Json);
201        let json = serializer.serialize_to_json(&batch).unwrap();
202        let arr = json.as_array().unwrap();
203        assert!(arr[1]["id"].is_null());
204        assert!(arr[1]["name"].is_null());
205    }
206}