laminar_connectors/schema/json/
jsonb.rs1pub mod tags {
22 pub const NULL: u8 = 0x00;
24 pub const BOOL_FALSE: u8 = 0x01;
26 pub const BOOL_TRUE: u8 = 0x02;
28 pub const INT64: u8 = 0x03;
30 pub const FLOAT64: u8 = 0x04;
32 pub const STRING: u8 = 0x05;
34 pub const ARRAY: u8 = 0x06;
36 pub const OBJECT: u8 = 0x07;
38}
39
40#[derive(Debug)]
45pub struct JsonbEncoder {
46 buf: Vec<u8>,
47}
48
49impl JsonbEncoder {
50 #[must_use]
52 pub fn new() -> Self {
53 Self {
54 buf: Vec::with_capacity(4096),
55 }
56 }
57
58 pub fn encode(&mut self, value: &serde_json::Value) -> Vec<u8> {
60 self.buf.clear();
61 self.encode_value(value);
62 self.buf.clone()
63 }
64
65 #[allow(clippy::cast_possible_truncation)]
66 fn encode_value(&mut self, value: &serde_json::Value) {
67 match value {
68 serde_json::Value::Null => self.buf.push(tags::NULL),
69 serde_json::Value::Bool(false) => self.buf.push(tags::BOOL_FALSE),
70 serde_json::Value::Bool(true) => self.buf.push(tags::BOOL_TRUE),
71 serde_json::Value::Number(n) => {
72 if let Some(i) = n.as_i64() {
73 self.buf.push(tags::INT64);
74 self.buf.extend_from_slice(&i.to_le_bytes());
75 } else if let Some(f) = n.as_f64() {
76 self.buf.push(tags::FLOAT64);
77 self.buf.extend_from_slice(&f.to_le_bytes());
78 }
79 }
80 serde_json::Value::String(s) => {
81 self.buf.push(tags::STRING);
82 self.buf.extend_from_slice(&(s.len() as u32).to_le_bytes());
83 self.buf.extend_from_slice(s.as_bytes());
84 }
85 serde_json::Value::Array(arr) => {
86 self.buf.push(tags::ARRAY);
87 self.buf
88 .extend_from_slice(&(arr.len() as u32).to_le_bytes());
89 let offset_table_pos = self.buf.len();
91 self.buf.resize(self.buf.len() + arr.len() * 4, 0);
92 let data_start = self.buf.len();
93 for (i, elem) in arr.iter().enumerate() {
94 let elem_offset = (self.buf.len() - data_start) as u32;
95 let entry_pos = offset_table_pos + i * 4;
96 self.buf[entry_pos..entry_pos + 4].copy_from_slice(&elem_offset.to_le_bytes());
97 self.encode_value(elem);
98 }
99 }
100 serde_json::Value::Object(obj) => {
101 self.buf.push(tags::OBJECT);
102 let mut keys: Vec<&String> = obj.keys().collect();
104 keys.sort();
105 self.buf
106 .extend_from_slice(&(keys.len() as u32).to_le_bytes());
107 let offset_table_pos = self.buf.len();
109 self.buf.resize(self.buf.len() + keys.len() * 8, 0);
110 let data_start = self.buf.len();
111
112 for (i, key) in keys.iter().enumerate() {
113 let key_offset = (self.buf.len() - data_start) as u32;
115 let entry_pos = offset_table_pos + i * 8;
116 self.buf[entry_pos..entry_pos + 4].copy_from_slice(&key_offset.to_le_bytes());
117 self.buf
119 .extend_from_slice(&(key.len() as u16).to_le_bytes());
120 self.buf.extend_from_slice(key.as_bytes());
121 let val_offset = (self.buf.len() - data_start) as u32;
123 self.buf[entry_pos + 4..entry_pos + 8]
124 .copy_from_slice(&val_offset.to_le_bytes());
125 self.encode_value(&obj[*key]);
127 }
128 }
129 }
130 }
131}
132
133impl Default for JsonbEncoder {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139pub struct JsonbAccessor;
144
145impl JsonbAccessor {
146 #[inline]
153 #[must_use]
154 pub fn get_field<'a>(jsonb: &'a [u8], field_name: &str) -> Option<&'a [u8]> {
155 if jsonb.is_empty() || jsonb[0] != tags::OBJECT {
156 return None;
157 }
158
159 let field_count = u32::from_le_bytes(jsonb.get(1..5)?.try_into().ok()?) as usize;
160 if field_count == 0 {
161 return None;
162 }
163
164 let offset_table_start = 5;
165 let offset_table_end = offset_table_start + field_count * 8;
166 let data_start = offset_table_end;
167
168 let mut lo = 0usize;
170 let mut hi = field_count;
171 while lo < hi {
172 let mid = lo + (hi - lo) / 2;
173 let entry_offset = offset_table_start + mid * 8;
174 let key_off =
175 u32::from_le_bytes(jsonb.get(entry_offset..entry_offset + 4)?.try_into().ok()?)
176 as usize;
177
178 let key_abs = data_start + key_off;
179 let key_len =
180 u16::from_le_bytes(jsonb.get(key_abs..key_abs + 2)?.try_into().ok()?) as usize;
181 let key_bytes = jsonb.get(key_abs + 2..key_abs + 2 + key_len)?;
182 let key_str = std::str::from_utf8(key_bytes).ok()?;
183
184 match key_str.cmp(field_name) {
185 std::cmp::Ordering::Equal => {
186 let val_off = u32::from_le_bytes(
187 jsonb
188 .get(entry_offset + 4..entry_offset + 8)?
189 .try_into()
190 .ok()?,
191 ) as usize;
192 let val_abs = data_start + val_off;
193 return jsonb.get(val_abs..);
194 }
195 std::cmp::Ordering::Less => lo = mid + 1,
196 std::cmp::Ordering::Greater => hi = mid,
197 }
198 }
199 None
200 }
201
202 #[inline]
204 #[must_use]
205 pub fn is_null(jsonb_value: &[u8]) -> bool {
206 !jsonb_value.is_empty() && jsonb_value[0] == tags::NULL
207 }
208
209 #[inline]
211 #[must_use]
212 pub fn as_bool(jsonb_value: &[u8]) -> Option<bool> {
213 match *jsonb_value.first()? {
214 tags::BOOL_FALSE => Some(false),
215 tags::BOOL_TRUE => Some(true),
216 _ => None,
217 }
218 }
219
220 #[inline]
222 #[must_use]
223 pub fn as_i64(jsonb_value: &[u8]) -> Option<i64> {
224 if jsonb_value.first()? != &tags::INT64 {
225 return None;
226 }
227 Some(i64::from_le_bytes(jsonb_value.get(1..9)?.try_into().ok()?))
228 }
229
230 #[inline]
232 #[must_use]
233 pub fn as_f64(jsonb_value: &[u8]) -> Option<f64> {
234 if jsonb_value.first()? != &tags::FLOAT64 {
235 return None;
236 }
237 Some(f64::from_le_bytes(jsonb_value.get(1..9)?.try_into().ok()?))
238 }
239
240 #[inline]
242 #[must_use]
243 pub fn as_str(jsonb_value: &[u8]) -> Option<&str> {
244 if jsonb_value.first()? != &tags::STRING {
245 return None;
246 }
247 let len = u32::from_le_bytes(jsonb_value.get(1..5)?.try_into().ok()?) as usize;
248 std::str::from_utf8(jsonb_value.get(5..5 + len)?).ok()
249 }
250
251 #[inline]
253 #[must_use]
254 pub fn array_len(jsonb_value: &[u8]) -> Option<usize> {
255 if jsonb_value.first()? != &tags::ARRAY {
256 return None;
257 }
258 Some(u32::from_le_bytes(jsonb_value.get(1..5)?.try_into().ok()?) as usize)
259 }
260
261 #[inline]
263 #[must_use]
264 pub fn array_get(jsonb_value: &[u8], index: usize) -> Option<&[u8]> {
265 if jsonb_value.first()? != &tags::ARRAY {
266 return None;
267 }
268 let count = u32::from_le_bytes(jsonb_value.get(1..5)?.try_into().ok()?) as usize;
269 if index >= count {
270 return None;
271 }
272 let offset_table_start = 5;
273 let data_start = offset_table_start + count * 4;
274 let entry_pos = offset_table_start + index * 4;
275 let elem_off =
276 u32::from_le_bytes(jsonb_value.get(entry_pos..entry_pos + 4)?.try_into().ok()?)
277 as usize;
278 jsonb_value.get(data_start + elem_off..)
279 }
280
281 #[inline]
283 #[must_use]
284 pub fn object_len(jsonb_value: &[u8]) -> Option<usize> {
285 if jsonb_value.first()? != &tags::OBJECT {
286 return None;
287 }
288 Some(u32::from_le_bytes(jsonb_value.get(1..5)?.try_into().ok()?) as usize)
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use serde_json::json;
296
297 #[test]
298 fn test_encode_null() {
299 let mut enc = JsonbEncoder::new();
300 let bytes = enc.encode(&json!(null));
301 assert_eq!(bytes, vec![tags::NULL]);
302 }
303
304 #[test]
305 fn test_encode_bool() {
306 let mut enc = JsonbEncoder::new();
307 assert_eq!(enc.encode(&json!(false)), vec![tags::BOOL_FALSE]);
308 assert_eq!(enc.encode(&json!(true)), vec![tags::BOOL_TRUE]);
309 }
310
311 #[test]
312 fn test_encode_int64() {
313 let mut enc = JsonbEncoder::new();
314 let bytes = enc.encode(&json!(42));
315 assert_eq!(bytes[0], tags::INT64);
316 let val = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
317 assert_eq!(val, 42);
318 }
319
320 #[test]
321 fn test_encode_float64() {
322 let mut enc = JsonbEncoder::new();
323 let bytes = enc.encode(&json!(3.14));
324 assert_eq!(bytes[0], tags::FLOAT64);
325 let val = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
326 assert!((val - 3.14).abs() < f64::EPSILON);
327 }
328
329 #[test]
330 fn test_encode_string() {
331 let mut enc = JsonbEncoder::new();
332 let bytes = enc.encode(&json!("hello"));
333 assert_eq!(bytes[0], tags::STRING);
334 let len = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
335 assert_eq!(len, 5);
336 assert_eq!(&bytes[5..10], b"hello");
337 }
338
339 #[test]
340 fn test_accessor_null() {
341 let mut enc = JsonbEncoder::new();
342 let bytes = enc.encode(&json!(null));
343 assert!(JsonbAccessor::is_null(&bytes));
344 assert!(JsonbAccessor::as_bool(&bytes).is_none());
345 }
346
347 #[test]
348 fn test_accessor_bool() {
349 let mut enc = JsonbEncoder::new();
350 assert_eq!(
351 JsonbAccessor::as_bool(&enc.encode(&json!(true))),
352 Some(true)
353 );
354 assert_eq!(
355 JsonbAccessor::as_bool(&enc.encode(&json!(false))),
356 Some(false)
357 );
358 }
359
360 #[test]
361 fn test_accessor_i64() {
362 let mut enc = JsonbEncoder::new();
363 let bytes = enc.encode(&json!(-99));
364 assert_eq!(JsonbAccessor::as_i64(&bytes), Some(-99));
365 }
366
367 #[test]
368 fn test_accessor_f64() {
369 let mut enc = JsonbEncoder::new();
370 let bytes = enc.encode(&json!(2.718));
371 let val = JsonbAccessor::as_f64(&bytes).unwrap();
372 assert!((val - 2.718).abs() < f64::EPSILON);
373 }
374
375 #[test]
376 fn test_accessor_str() {
377 let mut enc = JsonbEncoder::new();
378 let bytes = enc.encode(&json!("world"));
379 assert_eq!(JsonbAccessor::as_str(&bytes), Some("world"));
380 }
381
382 #[test]
383 fn test_object_field_access() {
384 let mut enc = JsonbEncoder::new();
385 let bytes = enc.encode(&json!({"name": "Alice", "age": 30, "active": true}));
386
387 let name_val = JsonbAccessor::get_field(&bytes, "name").unwrap();
389 assert_eq!(JsonbAccessor::as_str(name_val), Some("Alice"));
390
391 let age_val = JsonbAccessor::get_field(&bytes, "age").unwrap();
392 assert_eq!(JsonbAccessor::as_i64(age_val), Some(30));
393
394 let active_val = JsonbAccessor::get_field(&bytes, "active").unwrap();
395 assert_eq!(JsonbAccessor::as_bool(active_val), Some(true));
396
397 assert!(JsonbAccessor::get_field(&bytes, "missing").is_none());
399 }
400
401 #[test]
402 fn test_object_empty() {
403 let mut enc = JsonbEncoder::new();
404 let bytes = enc.encode(&json!({}));
405 assert_eq!(JsonbAccessor::object_len(&bytes), Some(0));
406 assert!(JsonbAccessor::get_field(&bytes, "any").is_none());
407 }
408
409 #[test]
410 fn test_array_access() {
411 let mut enc = JsonbEncoder::new();
412 let bytes = enc.encode(&json!([10, 20, 30]));
413
414 assert_eq!(JsonbAccessor::array_len(&bytes), Some(3));
415
416 let elem0 = JsonbAccessor::array_get(&bytes, 0).unwrap();
417 assert_eq!(JsonbAccessor::as_i64(elem0), Some(10));
418
419 let elem2 = JsonbAccessor::array_get(&bytes, 2).unwrap();
420 assert_eq!(JsonbAccessor::as_i64(elem2), Some(30));
421
422 assert!(JsonbAccessor::array_get(&bytes, 5).is_none());
423 }
424
425 #[test]
426 fn test_nested_object() {
427 let mut enc = JsonbEncoder::new();
428 let bytes = enc.encode(&json!({"outer": {"inner": 42}}));
429
430 let outer = JsonbAccessor::get_field(&bytes, "outer").unwrap();
431 let inner = JsonbAccessor::get_field(outer, "inner").unwrap();
432 assert_eq!(JsonbAccessor::as_i64(inner), Some(42));
433 }
434
435 #[test]
436 fn test_nested_array_in_object() {
437 let mut enc = JsonbEncoder::new();
438 let bytes = enc.encode(&json!({"items": [1, 2, 3]}));
439
440 let items = JsonbAccessor::get_field(&bytes, "items").unwrap();
441 assert_eq!(JsonbAccessor::array_len(items), Some(3));
442 let elem1 = JsonbAccessor::array_get(items, 1).unwrap();
443 assert_eq!(JsonbAccessor::as_i64(elem1), Some(2));
444 }
445
446 #[test]
447 fn test_large_object() {
448 let mut enc = JsonbEncoder::new();
449 let mut obj = serde_json::Map::new();
450 for i in 0..100 {
451 obj.insert(format!("field_{i:03}"), json!(i));
452 }
453 let bytes = enc.encode(&serde_json::Value::Object(obj));
454
455 for i in 0..100 {
457 let key = format!("field_{i:03}");
458 let val = JsonbAccessor::get_field(&bytes, &key).unwrap();
459 assert_eq!(JsonbAccessor::as_i64(val), Some(i));
460 }
461 assert!(JsonbAccessor::get_field(&bytes, "nonexistent").is_none());
462 }
463
464 #[test]
465 fn test_unicode_keys() {
466 let mut enc = JsonbEncoder::new();
467 let bytes = enc.encode(&json!({"名前": "太郎", "年齢": 25}));
468
469 let name = JsonbAccessor::get_field(&bytes, "名前").unwrap();
470 assert_eq!(JsonbAccessor::as_str(name), Some("太郎"));
471
472 let age = JsonbAccessor::get_field(&bytes, "年齢").unwrap();
473 assert_eq!(JsonbAccessor::as_i64(age), Some(25));
474 }
475
476 #[test]
477 fn test_type_mismatch_returns_none() {
478 let mut enc = JsonbEncoder::new();
479 let bytes = enc.encode(&json!(42)); assert!(JsonbAccessor::as_str(&bytes).is_none());
481 assert!(JsonbAccessor::as_bool(&bytes).is_none());
482 assert!(JsonbAccessor::as_f64(&bytes).is_none());
483 }
484
485 #[test]
486 fn test_empty_slice() {
487 assert!(!JsonbAccessor::is_null(&[]));
489 assert!(JsonbAccessor::as_bool(&[]).is_none());
490 assert!(JsonbAccessor::as_i64(&[]).is_none());
491 assert!(JsonbAccessor::as_f64(&[]).is_none());
492 assert!(JsonbAccessor::as_str(&[]).is_none());
493 assert!(JsonbAccessor::get_field(&[], "x").is_none());
494 }
495}