Skip to main content

laminar_sql/datafusion/
json_types.rs

1//! JSONB type system for JSON UDF evaluation.
2//!
3//! Minimal JSONB binary access types used by the JSON scalar UDFs.
4//! These mirror `laminar-connectors` JSONB tags but live in
5//! `laminar-sql` to keep the SQL layer self-contained.
6
7/// JSONB binary format type tags.
8///
9/// Must be kept in sync with `laminar_connectors::schema::json::jsonb::tags`.
10pub mod tags {
11    /// Null value.
12    pub const NULL: u8 = 0x00;
13    /// Boolean false.
14    pub const BOOL_FALSE: u8 = 0x01;
15    /// Boolean true.
16    pub const BOOL_TRUE: u8 = 0x02;
17    /// Int64 (8 bytes little-endian).
18    pub const INT64: u8 = 0x03;
19    /// Float64 (8 bytes IEEE 754 little-endian).
20    pub const FLOAT64: u8 = 0x04;
21    /// String (4-byte LE length + UTF-8 bytes).
22    pub const STRING: u8 = 0x05;
23    /// Array (4-byte count + offset table + elements).
24    pub const ARRAY: u8 = 0x06;
25    /// Object (4-byte count + offset table + key/value data).
26    pub const OBJECT: u8 = 0x07;
27}
28
29/// Returns the PostgreSQL-compatible type name for the outermost JSONB value.
30///
31/// Reads only the first byte (type tag) — O(1).
32#[must_use]
33pub fn jsonb_type_name(jsonb: &[u8]) -> Option<&'static str> {
34    Some(match *jsonb.first()? {
35        tags::NULL => "null",
36        tags::BOOL_FALSE | tags::BOOL_TRUE => "boolean",
37        tags::INT64 | tags::FLOAT64 => "number",
38        tags::STRING => "string",
39        tags::ARRAY => "array",
40        tags::OBJECT => "object",
41        _ => return None,
42    })
43}
44
45/// Access a field by name in a JSONB object.
46///
47/// Returns a byte slice pointing to the field's JSONB value,
48/// or `None` if the field does not exist or the value is not an object.
49///
50/// O(log n) binary search on sorted keys.
51#[must_use]
52pub fn jsonb_get_field<'a>(jsonb: &'a [u8], field_name: &str) -> Option<&'a [u8]> {
53    if jsonb.first()? != &tags::OBJECT {
54        return None;
55    }
56
57    let field_count = read_u32(jsonb, 1)? as usize;
58    if field_count == 0 {
59        return None;
60    }
61
62    let offset_table_start = 5;
63    let data_start = offset_table_start + field_count * 8;
64
65    let mut lo = 0usize;
66    let mut hi = field_count;
67    while lo < hi {
68        let mid = lo + (hi - lo) / 2;
69        let entry = offset_table_start + mid * 8;
70        let key_off = read_u32(jsonb, entry)? as usize;
71
72        let key_abs = data_start + key_off;
73        let key_len = read_u16(jsonb, key_abs)? as usize;
74        let key_bytes = jsonb.get(key_abs + 2..key_abs + 2 + key_len)?;
75        let key_str = std::str::from_utf8(key_bytes).ok()?;
76
77        match key_str.cmp(field_name) {
78            std::cmp::Ordering::Equal => {
79                let val_off = read_u32(jsonb, entry + 4)? as usize;
80                return jsonb.get(data_start + val_off..);
81            }
82            std::cmp::Ordering::Less => lo = mid + 1,
83            std::cmp::Ordering::Greater => hi = mid,
84        }
85    }
86    None
87}
88
89/// Get a JSONB array element by index.
90///
91/// Returns a byte slice pointing to the element's JSONB value,
92/// or `None` if the index is out of bounds or the value is not an array.
93#[must_use]
94pub fn jsonb_array_get(jsonb: &[u8], index: usize) -> Option<&[u8]> {
95    if jsonb.first()? != &tags::ARRAY {
96        return None;
97    }
98    let count = read_u32(jsonb, 1)? as usize;
99    if index >= count {
100        return None;
101    }
102    let offset_table_start = 5;
103    let data_start = offset_table_start + count * 4;
104    let entry_pos = offset_table_start + index * 4;
105    let elem_off = read_u32(jsonb, entry_pos)? as usize;
106    jsonb.get(data_start + elem_off..)
107}
108
109/// Check if a JSONB object contains a given key.
110#[must_use]
111pub fn jsonb_has_key(jsonb: &[u8], key: &str) -> bool {
112    jsonb_get_field(jsonb, key).is_some()
113}
114
115/// Convert a JSONB value slice to its text representation.
116///
117/// For strings, returns the unquoted string value.
118/// For other types, returns the JSON representation.
119#[must_use]
120pub fn jsonb_to_text(jsonb: &[u8]) -> Option<String> {
121    let tag = *jsonb.first()?;
122    match tag {
123        tags::BOOL_FALSE => Some("false".to_owned()),
124        tags::BOOL_TRUE => Some("true".to_owned()),
125        tags::INT64 => {
126            let v = i64::from_le_bytes(jsonb.get(1..9)?.try_into().ok()?);
127            Some(v.to_string())
128        }
129        tags::FLOAT64 => {
130            let v = f64::from_le_bytes(jsonb.get(1..9)?.try_into().ok()?);
131            Some(v.to_string())
132        }
133        tags::STRING => {
134            let len = read_u32(jsonb, 1)? as usize;
135            Some(std::str::from_utf8(jsonb.get(5..5 + len)?).ok()?.to_owned())
136        }
137        tags::ARRAY | tags::OBJECT => jsonb_to_json_string(jsonb),
138        // NULL and unknown tags return None (PostgreSQL returns NULL for null)
139        _ => None,
140    }
141}
142
143/// Convert a JSONB value to a JSON string representation.
144fn jsonb_to_json_string(jsonb: &[u8]) -> Option<String> {
145    let tag = *jsonb.first()?;
146    Some(match tag {
147        tags::NULL => "null".to_owned(),
148        tags::BOOL_FALSE => "false".to_owned(),
149        tags::BOOL_TRUE => "true".to_owned(),
150        tags::INT64 => {
151            let v = i64::from_le_bytes(jsonb.get(1..9)?.try_into().ok()?);
152            v.to_string()
153        }
154        tags::FLOAT64 => {
155            let v = f64::from_le_bytes(jsonb.get(1..9)?.try_into().ok()?);
156            v.to_string()
157        }
158        tags::STRING => {
159            let len = read_u32(jsonb, 1)? as usize;
160            let s = std::str::from_utf8(jsonb.get(5..5 + len)?).ok()?;
161            format!("\"{s}\"")
162        }
163        tags::ARRAY => {
164            let count = read_u32(jsonb, 1)? as usize;
165            let mut parts = Vec::with_capacity(count);
166            for i in 0..count {
167                let elem = jsonb_array_get(jsonb, i)?;
168                parts.push(jsonb_to_json_string(elem)?);
169            }
170            format!("[{}]", parts.join(","))
171        }
172        tags::OBJECT => {
173            let count = read_u32(jsonb, 1)? as usize;
174            let offset_table_start = 5;
175            let data_start = offset_table_start + count * 8;
176            let mut parts = Vec::with_capacity(count);
177            for i in 0..count {
178                let entry = offset_table_start + i * 8;
179                let key_off = read_u32(jsonb, entry)? as usize;
180                let key_abs = data_start + key_off;
181                let key_len = read_u16(jsonb, key_abs)? as usize;
182                let key =
183                    std::str::from_utf8(jsonb.get(key_abs + 2..key_abs + 2 + key_len)?).ok()?;
184                let val_off = read_u32(jsonb, entry + 4)? as usize;
185                let val_slice = jsonb.get(data_start + val_off..)?;
186                parts.push(format!("\"{}\":{}", key, jsonb_to_json_string(val_slice)?));
187            }
188            format!("{{{}}}", parts.join(","))
189        }
190        _ => return None,
191    })
192}
193
194/// Convert a JSONB binary value to a `serde_json::Value`.
195///
196/// Recursively decodes the JSONB binary format into the equivalent
197/// `serde_json::Value`, avoiding the text round-trip through JSON strings.
198///
199/// Returns `None` if the JSONB bytes are malformed.
200#[must_use]
201#[allow(clippy::cast_possible_truncation)]
202pub fn jsonb_to_value(jsonb: &[u8]) -> Option<serde_json::Value> {
203    let tag = *jsonb.first()?;
204    Some(match tag {
205        tags::NULL => serde_json::Value::Null,
206        tags::BOOL_FALSE => serde_json::Value::Bool(false),
207        tags::BOOL_TRUE => serde_json::Value::Bool(true),
208        tags::INT64 => {
209            let v = i64::from_le_bytes(jsonb.get(1..9)?.try_into().ok()?);
210            serde_json::Value::Number(v.into())
211        }
212        tags::FLOAT64 => {
213            let v = f64::from_le_bytes(jsonb.get(1..9)?.try_into().ok()?);
214            serde_json::Value::Number(serde_json::Number::from_f64(v)?)
215        }
216        tags::STRING => {
217            let len = read_u32(jsonb, 1)? as usize;
218            let s = std::str::from_utf8(jsonb.get(5..5 + len)?).ok()?;
219            serde_json::Value::String(s.to_owned())
220        }
221        tags::ARRAY => {
222            let count = read_u32(jsonb, 1)? as usize;
223            let mut arr = Vec::with_capacity(count);
224            for i in 0..count {
225                let elem = jsonb_array_get(jsonb, i)?;
226                arr.push(jsonb_to_value(elem)?);
227            }
228            serde_json::Value::Array(arr)
229        }
230        tags::OBJECT => {
231            let count = read_u32(jsonb, 1)? as usize;
232            let offset_table_start = 5;
233            let data_start = offset_table_start + count * 8;
234            let mut map = serde_json::Map::with_capacity(count);
235            for i in 0..count {
236                let entry = offset_table_start + i * 8;
237                let key_off = read_u32(jsonb, entry)? as usize;
238                let key_abs = data_start + key_off;
239                let key_len = read_u16(jsonb, key_abs)? as usize;
240                let key =
241                    std::str::from_utf8(jsonb.get(key_abs + 2..key_abs + 2 + key_len)?).ok()?;
242                let val_off = read_u32(jsonb, entry + 4)? as usize;
243                let val_slice = jsonb.get(data_start + val_off..)?;
244                map.insert(key.to_owned(), jsonb_to_value(val_slice)?);
245            }
246            serde_json::Value::Object(map)
247        }
248        _ => return None,
249    })
250}
251
252/// Check whether JSONB `left` contains `right` (PostgreSQL `@>` semantics).
253///
254/// An object contains another if every key in `right` exists in `left`
255/// with a matching value. An array contains another if it's a superset.
256/// Scalars match by equality.
257#[must_use]
258pub fn jsonb_contains(left: &[u8], right: &[u8]) -> Option<bool> {
259    let lt = *left.first()?;
260    let rt = *right.first()?;
261
262    if lt != rt {
263        return Some(false);
264    }
265
266    Some(match lt {
267        tags::NULL | tags::BOOL_FALSE | tags::BOOL_TRUE => true, // tags already matched
268        tags::INT64 | tags::FLOAT64 => left.get(1..9)? == right.get(1..9)?,
269        tags::STRING => {
270            let l_len = read_u32(left, 1)? as usize;
271            let r_len = read_u32(right, 1)? as usize;
272            l_len == r_len && left.get(5..5 + l_len)? == right.get(5..5 + r_len)?
273        }
274        tags::OBJECT => {
275            // Every key in right must exist in left with a contained value.
276            let r_count = read_u32(right, 1)? as usize;
277            let r_offset_table = 5;
278            let r_data_start = r_offset_table + r_count * 8;
279            for i in 0..r_count {
280                let entry = r_offset_table + i * 8;
281                let key_off = read_u32(right, entry)? as usize;
282                let key_abs = r_data_start + key_off;
283                let key_len = read_u16(right, key_abs)? as usize;
284                let key =
285                    std::str::from_utf8(right.get(key_abs + 2..key_abs + 2 + key_len)?).ok()?;
286
287                let val_off = read_u32(right, entry + 4)? as usize;
288                let r_val = right.get(r_data_start + val_off..)?;
289
290                match jsonb_get_field(left, key) {
291                    Some(l_val) => {
292                        if jsonb_contains(l_val, r_val) != Some(true) {
293                            return Some(false);
294                        }
295                    }
296                    None => return Some(false), // key not found
297                }
298            }
299            true
300        }
301        tags::ARRAY => {
302            // Every element in right must exist somewhere in left.
303            let r_count = read_u32(right, 1)? as usize;
304            let l_count = read_u32(left, 1)? as usize;
305            'outer: for ri in 0..r_count {
306                let r_elem = jsonb_array_get(right, ri)?;
307                for li in 0..l_count {
308                    let l_elem = jsonb_array_get(left, li)?;
309                    if jsonb_contains(l_elem, r_elem) == Some(true) {
310                        continue 'outer;
311                    }
312                }
313                return Some(false);
314            }
315            true
316        }
317        _ => false,
318    })
319}
320
321/// Encode a `serde_json::Value` into JSONB binary format.
322///
323/// Used by `json_build_object`, `json_build_array`, `to_jsonb` etc.
324#[must_use]
325#[allow(clippy::cast_possible_truncation)]
326pub fn encode_jsonb(value: &serde_json::Value) -> Vec<u8> {
327    let mut buf = Vec::with_capacity(256);
328    encode_jsonb_into(value, &mut buf);
329    buf
330}
331
332/// Encode a JSON value into the given buffer.
333#[allow(clippy::cast_possible_truncation)]
334pub fn encode_jsonb_into(value: &serde_json::Value, buf: &mut Vec<u8>) {
335    match value {
336        serde_json::Value::Null => buf.push(tags::NULL),
337        serde_json::Value::Bool(false) => buf.push(tags::BOOL_FALSE),
338        serde_json::Value::Bool(true) => buf.push(tags::BOOL_TRUE),
339        serde_json::Value::Number(n) => {
340            if let Some(i) = n.as_i64() {
341                buf.push(tags::INT64);
342                buf.extend_from_slice(&i.to_le_bytes());
343            } else if let Some(f) = n.as_f64() {
344                buf.push(tags::FLOAT64);
345                buf.extend_from_slice(&f.to_le_bytes());
346            }
347        }
348        serde_json::Value::String(s) => {
349            buf.push(tags::STRING);
350            buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
351            buf.extend_from_slice(s.as_bytes());
352        }
353        serde_json::Value::Array(arr) => {
354            buf.push(tags::ARRAY);
355            buf.extend_from_slice(&(arr.len() as u32).to_le_bytes());
356            let offset_table_pos = buf.len();
357            buf.resize(buf.len() + arr.len() * 4, 0);
358            let data_start = buf.len();
359            for (i, elem) in arr.iter().enumerate() {
360                let elem_offset = (buf.len() - data_start) as u32;
361                let entry_pos = offset_table_pos + i * 4;
362                buf[entry_pos..entry_pos + 4].copy_from_slice(&elem_offset.to_le_bytes());
363                encode_jsonb_into(elem, buf);
364            }
365        }
366        serde_json::Value::Object(obj) => {
367            buf.push(tags::OBJECT);
368            let mut keys: Vec<&String> = obj.keys().collect();
369            keys.sort();
370            buf.extend_from_slice(&(keys.len() as u32).to_le_bytes());
371            let offset_table_pos = buf.len();
372            buf.resize(buf.len() + keys.len() * 8, 0);
373            let data_start = buf.len();
374            for (i, key) in keys.iter().enumerate() {
375                let key_offset = (buf.len() - data_start) as u32;
376                let entry_pos = offset_table_pos + i * 8;
377                buf[entry_pos..entry_pos + 4].copy_from_slice(&key_offset.to_le_bytes());
378                buf.extend_from_slice(&(key.len() as u16).to_le_bytes());
379                buf.extend_from_slice(key.as_bytes());
380                let val_offset = (buf.len() - data_start) as u32;
381                buf[entry_pos + 4..entry_pos + 8].copy_from_slice(&val_offset.to_le_bytes());
382                encode_jsonb_into(&obj[*key], buf);
383            }
384        }
385    }
386}
387
388// ── Helpers ──────────────────────────────────────────────────────
389
390#[inline]
391fn read_u32(buf: &[u8], offset: usize) -> Option<u32> {
392    Some(u32::from_le_bytes(
393        buf.get(offset..offset + 4)?.try_into().ok()?,
394    ))
395}
396
397#[inline]
398fn read_u16(buf: &[u8], offset: usize) -> Option<u16> {
399    Some(u16::from_le_bytes(
400        buf.get(offset..offset + 2)?.try_into().ok()?,
401    ))
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use serde_json::json;
408
409    fn enc(v: &serde_json::Value) -> Vec<u8> {
410        encode_jsonb(v)
411    }
412
413    #[test]
414    fn test_type_name() {
415        assert_eq!(jsonb_type_name(&enc(&json!(null))), Some("null"));
416        assert_eq!(jsonb_type_name(&enc(&json!(true))), Some("boolean"));
417        assert_eq!(jsonb_type_name(&enc(&json!(false))), Some("boolean"));
418        assert_eq!(jsonb_type_name(&enc(&json!(42))), Some("number"));
419        assert_eq!(jsonb_type_name(&enc(&json!(3.125))), Some("number"));
420        assert_eq!(jsonb_type_name(&enc(&json!("hi"))), Some("string"));
421        assert_eq!(jsonb_type_name(&enc(&json!([1]))), Some("array"));
422        assert_eq!(jsonb_type_name(&enc(&json!({"a": 1}))), Some("object"));
423        assert_eq!(jsonb_type_name(&[]), None);
424        assert_eq!(jsonb_type_name(&[0xFF]), None);
425    }
426
427    #[test]
428    fn test_get_field() {
429        let b = enc(&json!({"name": "Alice", "age": 30}));
430        let name = jsonb_get_field(&b, "name").unwrap();
431        assert_eq!(jsonb_to_text(name), Some("Alice".to_owned()));
432        let age = jsonb_get_field(&b, "age").unwrap();
433        assert_eq!(jsonb_to_text(age), Some("30".to_owned()));
434        assert!(jsonb_get_field(&b, "missing").is_none());
435    }
436
437    #[test]
438    fn test_array_get() {
439        let b = enc(&json!([10, 20, 30]));
440        let e1 = jsonb_array_get(&b, 1).unwrap();
441        assert_eq!(jsonb_to_text(e1), Some("20".to_owned()));
442        assert!(jsonb_array_get(&b, 5).is_none());
443    }
444
445    #[test]
446    fn test_has_key() {
447        let b = enc(&json!({"a": 1, "b": 2}));
448        assert!(jsonb_has_key(&b, "a"));
449        assert!(!jsonb_has_key(&b, "c"));
450    }
451
452    #[test]
453    fn test_to_text_string() {
454        let b = enc(&json!("hello"));
455        assert_eq!(jsonb_to_text(&b), Some("hello".to_owned()));
456    }
457
458    #[test]
459    fn test_to_text_null() {
460        let b = enc(&json!(null));
461        assert_eq!(jsonb_to_text(&b), None);
462    }
463
464    #[test]
465    fn test_to_text_object() {
466        let b = enc(&json!({"a": 1}));
467        assert_eq!(jsonb_to_text(&b), Some("{\"a\":1}".to_owned()));
468    }
469
470    #[test]
471    fn test_to_text_array() {
472        let b = enc(&json!([1, "two"]));
473        assert_eq!(jsonb_to_text(&b), Some("[1,\"two\"]".to_owned()));
474    }
475
476    #[test]
477    fn test_contains_object() {
478        let left = enc(&json!({"a": 1, "b": 2, "c": 3}));
479        let right = enc(&json!({"a": 1, "c": 3}));
480        assert_eq!(jsonb_contains(&left, &right), Some(true));
481    }
482
483    #[test]
484    fn test_contains_object_false() {
485        let left = enc(&json!({"a": 1}));
486        let right = enc(&json!({"a": 1, "b": 2}));
487        assert_eq!(jsonb_contains(&left, &right), Some(false));
488    }
489
490    #[test]
491    fn test_contains_array() {
492        let left = enc(&json!([1, 2, 3]));
493        let right = enc(&json!([1, 3]));
494        assert_eq!(jsonb_contains(&left, &right), Some(true));
495    }
496
497    #[test]
498    fn test_contains_scalar() {
499        let a = enc(&json!(42));
500        let b = enc(&json!(42));
501        let c = enc(&json!(99));
502        assert_eq!(jsonb_contains(&a, &b), Some(true));
503        assert_eq!(jsonb_contains(&a, &c), Some(false));
504    }
505
506    #[test]
507    fn test_contains_type_mismatch() {
508        let a = enc(&json!(42));
509        let b = enc(&json!("42"));
510        assert_eq!(jsonb_contains(&a, &b), Some(false));
511    }
512
513    #[test]
514    fn test_nested_get() {
515        let b = enc(&json!({"user": {"address": {"city": "London"}}}));
516        let user = jsonb_get_field(&b, "user").unwrap();
517        let addr = jsonb_get_field(user, "address").unwrap();
518        let city = jsonb_get_field(addr, "city").unwrap();
519        assert_eq!(jsonb_to_text(city), Some("London".to_owned()));
520    }
521
522    #[test]
523    fn test_encode_decode_roundtrip() {
524        let vals = vec![
525            json!(null),
526            json!(true),
527            json!(false),
528            json!(42),
529            json!(3.125),
530            json!("hello"),
531            json!([1, "two", null]),
532            json!({"key": "value", "num": 42}),
533        ];
534        for v in vals {
535            let b = enc(&v);
536            let text = jsonb_to_json_string(&b);
537            assert!(text.is_some(), "Failed to round-trip: {v:?}");
538        }
539    }
540
541    #[test]
542    fn test_jsonb_to_value_scalars() {
543        assert_eq!(jsonb_to_value(&enc(&json!(null))), Some(json!(null)));
544        assert_eq!(jsonb_to_value(&enc(&json!(true))), Some(json!(true)));
545        assert_eq!(jsonb_to_value(&enc(&json!(false))), Some(json!(false)));
546        assert_eq!(jsonb_to_value(&enc(&json!(42))), Some(json!(42)));
547        assert_eq!(jsonb_to_value(&enc(&json!(3.125))), Some(json!(3.125)));
548        assert_eq!(jsonb_to_value(&enc(&json!("hello"))), Some(json!("hello")));
549    }
550
551    #[test]
552    fn test_jsonb_to_value_complex() {
553        let obj = json!({"a": 1, "b": [2, 3], "c": {"d": true}});
554        let bytes = enc(&obj);
555        assert_eq!(jsonb_to_value(&bytes), Some(obj));
556    }
557
558    #[test]
559    fn test_jsonb_to_value_empty() {
560        assert_eq!(jsonb_to_value(&[]), None);
561        assert_eq!(jsonb_to_value(&[0xFF]), None);
562    }
563}