Skip to main content

laminar_sql/datafusion/
json_path.rs

1//! SQL/JSON path query functions (F-SCHEMA-012).
2//!
3//! Implements a basic JSON path compiler and two scalar UDFs:
4//!
5//! - `jsonb_path_exists(jsonb, path_text) → boolean`
6//! - `jsonb_path_match(jsonb, path_text) → boolean`
7//!
8//! The path language supports a PostgreSQL SQL/JSON subset:
9//! - `$` — root element
10//! - `.key` — object member access
11//! - `[n]` — array element by index
12//! - `[*]` — wildcard array elements
13
14use std::any::Any;
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17
18use arrow::datatypes::DataType;
19use arrow_array::{Array, BooleanArray, LargeBinaryArray};
20use datafusion_common::Result;
21use datafusion_expr::{
22    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
23};
24use parking_lot::Mutex;
25
26use super::json_types;
27
28/// One-slot cache for compiled JSON paths.
29///
30/// Since a given UDF instance almost always evaluates the same constant path
31/// string, a single-entry cache avoids recompilation on every batch.
32type PathCache = Mutex<Option<(String, CompiledJsonPath)>>;
33
34// ── JSON Path Compiler ──────────────────────────────────────────────
35
36/// A single step in a compiled JSON path.
37#[derive(Debug, Clone, PartialEq)]
38pub enum JsonPathStep {
39    /// Root element (`$`).
40    Root,
41    /// Object member access (`.key`).
42    Member(String),
43    /// Array element by index (`[n]`).
44    ArrayIndex(i64),
45    /// Wildcard array element (`[*]`).
46    ArrayWildcard,
47}
48
49/// A compiled SQL/JSON path expression.
50///
51/// Path expressions are compiled from string form (e.g., `$.items[*].price`)
52/// into a sequence of steps for evaluation against JSONB binary data.
53#[derive(Debug, Clone, PartialEq)]
54pub struct CompiledJsonPath {
55    /// Sequence of path steps.
56    pub steps: Vec<JsonPathStep>,
57}
58
59impl CompiledJsonPath {
60    /// Compiles a SQL/JSON path string.
61    ///
62    /// # Supported syntax
63    ///
64    /// - `$` — root element (must appear first)
65    /// - `.key` — object member access
66    /// - `["key"]` — quoted member access
67    /// - `[n]` — array index (non-negative)
68    /// - `[*]` — wildcard array element
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the path syntax is invalid.
73    pub fn compile(path_str: &str) -> std::result::Result<Self, String> {
74        let path_str = path_str.trim();
75        if path_str.is_empty() {
76            return Err("empty path expression".into());
77        }
78
79        let mut steps = Vec::new();
80        let chars: Vec<char> = path_str.chars().collect();
81        let mut pos = 0;
82
83        // Must start with '$'
84        if chars.first() != Some(&'$') {
85            return Err(format!(
86                "path must start with '$', got '{}'",
87                chars.first().unwrap_or(&' ')
88            ));
89        }
90        steps.push(JsonPathStep::Root);
91        pos += 1;
92
93        while pos < chars.len() {
94            match chars[pos] {
95                '.' => {
96                    pos += 1;
97                    // Read member name
98                    let start = pos;
99                    while pos < chars.len()
100                        && chars[pos] != '.'
101                        && chars[pos] != '['
102                        && !chars[pos].is_whitespace()
103                    {
104                        pos += 1;
105                    }
106                    if pos == start {
107                        return Err(format!("empty member name at position {start}"));
108                    }
109                    let name: String = chars[start..pos].iter().collect();
110                    steps.push(JsonPathStep::Member(name));
111                }
112                '[' => {
113                    pos += 1;
114                    // Skip whitespace
115                    while pos < chars.len() && chars[pos].is_whitespace() {
116                        pos += 1;
117                    }
118                    if pos >= chars.len() {
119                        return Err("unclosed bracket".into());
120                    }
121                    if chars[pos] == '*' {
122                        steps.push(JsonPathStep::ArrayWildcard);
123                        pos += 1;
124                    } else if chars[pos] == '"' || chars[pos] == '\'' {
125                        // Quoted member access: ["key"] or ['key']
126                        let quote = chars[pos];
127                        pos += 1;
128                        let start = pos;
129                        while pos < chars.len() && chars[pos] != quote {
130                            pos += 1;
131                        }
132                        if pos >= chars.len() {
133                            return Err("unclosed quoted member".into());
134                        }
135                        let name: String = chars[start..pos].iter().collect();
136                        steps.push(JsonPathStep::Member(name));
137                        pos += 1; // skip closing quote
138                    } else {
139                        // Array index
140                        let start = pos;
141                        let mut negative = false;
142                        if pos < chars.len() && chars[pos] == '-' {
143                            negative = true;
144                            pos += 1;
145                        }
146                        while pos < chars.len() && chars[pos].is_ascii_digit() {
147                            pos += 1;
148                        }
149                        if pos == start || (negative && pos == start + 1) {
150                            return Err(format!("expected array index or '*' at position {start}"));
151                        }
152                        let idx_str: String = chars[start..pos].iter().collect();
153                        let idx: i64 = idx_str
154                            .parse()
155                            .map_err(|_| format!("invalid array index: '{idx_str}'"))?;
156                        steps.push(JsonPathStep::ArrayIndex(idx));
157                    }
158                    // Skip whitespace and expect ']'
159                    while pos < chars.len() && chars[pos].is_whitespace() {
160                        pos += 1;
161                    }
162                    if pos >= chars.len() || chars[pos] != ']' {
163                        return Err(format!("expected ']' at position {pos}"));
164                    }
165                    pos += 1;
166                }
167                c if c.is_whitespace() => {
168                    pos += 1;
169                }
170                c => {
171                    return Err(format!("unexpected character '{c}' at position {pos}"));
172                }
173            }
174        }
175
176        Ok(Self { steps })
177    }
178
179    /// Evaluates the compiled path against JSONB binary data.
180    ///
181    /// Returns all matched sub-values as byte slices into the original data.
182    #[must_use]
183    pub fn evaluate<'a>(&self, jsonb: &'a [u8]) -> Vec<&'a [u8]> {
184        if jsonb.is_empty() {
185            return Vec::new();
186        }
187        let mut current = vec![jsonb];
188
189        for step in &self.steps {
190            match step {
191                JsonPathStep::Root => {
192                    // Already at root
193                }
194                JsonPathStep::Member(name) => {
195                    let mut next = Vec::new();
196                    for data in &current {
197                        if let Some(val) = json_types::jsonb_get_field(data, name) {
198                            next.push(val);
199                        }
200                    }
201                    current = next;
202                }
203                JsonPathStep::ArrayIndex(idx) => {
204                    let mut next = Vec::new();
205                    for data in &current {
206                        if *idx >= 0 {
207                            if let Ok(i) = usize::try_from(*idx) {
208                                if let Some(val) = json_types::jsonb_array_get(data, i) {
209                                    next.push(val);
210                                }
211                            }
212                        }
213                    }
214                    current = next;
215                }
216                JsonPathStep::ArrayWildcard => {
217                    let mut next = Vec::new();
218                    for data in &current {
219                        if !data.is_empty() && data[0] == 0x06 {
220                            // ARRAY tag
221                            if data.len() >= 5 {
222                                let count = u32::from_le_bytes([data[1], data[2], data[3], data[4]])
223                                    as usize;
224                                for i in 0..count {
225                                    if let Some(val) = json_types::jsonb_array_get(data, i) {
226                                        next.push(val);
227                                    }
228                                }
229                            }
230                        }
231                    }
232                    current = next;
233                }
234            }
235        }
236
237        current
238    }
239
240    /// Returns `true` if evaluating this path against the JSONB data
241    /// produces at least one match.
242    #[must_use]
243    pub fn exists(&self, jsonb: &[u8]) -> bool {
244        !self.evaluate(jsonb).is_empty()
245    }
246}
247
248// ── jsonb_path_exists UDF ───────────────────────────────────────────
249
250/// `jsonb_path_exists(jsonb, path_text) → boolean`
251///
252/// Looks up or compiles a JSON path, caching the result.
253fn compile_cached(
254    cache: &PathCache,
255    path_str: &str,
256) -> std::result::Result<CompiledJsonPath, String> {
257    let mut guard = cache.lock();
258    if let Some((cached_str, cached_path)) = guard.as_ref() {
259        if cached_str == path_str {
260            return Ok(cached_path.clone());
261        }
262    }
263    let compiled = CompiledJsonPath::compile(path_str)?;
264    *guard = Some((path_str.to_owned(), compiled.clone()));
265    Ok(compiled)
266}
267
268/// Returns `true` if the JSON path matches any element in the JSONB value.
269#[derive(Debug)]
270pub struct JsonbPathExistsUdf {
271    signature: Signature,
272    path_cache: PathCache,
273}
274
275impl Default for JsonbPathExistsUdf {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281impl JsonbPathExistsUdf {
282    /// Creates a new `jsonb_path_exists` UDF.
283    #[must_use]
284    pub fn new() -> Self {
285        Self {
286            signature: Signature::new(
287                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
288                Volatility::Immutable,
289            ),
290            path_cache: Mutex::new(None),
291        }
292    }
293}
294
295impl PartialEq for JsonbPathExistsUdf {
296    fn eq(&self, _other: &Self) -> bool {
297        true
298    }
299}
300
301impl Eq for JsonbPathExistsUdf {}
302
303impl Hash for JsonbPathExistsUdf {
304    fn hash<H: Hasher>(&self, state: &mut H) {
305        "jsonb_path_exists".hash(state);
306    }
307}
308
309impl ScalarUDFImpl for JsonbPathExistsUdf {
310    fn as_any(&self) -> &dyn Any {
311        self
312    }
313
314    fn name(&self) -> &'static str {
315        "jsonb_path_exists"
316    }
317
318    fn signature(&self) -> &Signature {
319        &self.signature
320    }
321
322    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
323        Ok(DataType::Boolean)
324    }
325
326    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
327        let first = &args.args[0];
328        let second = &args.args[1];
329
330        match (first, second) {
331            (
332                ColumnarValue::Array(bin_arr),
333                ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
334            ) => {
335                let compiled = compile_cached(&self.path_cache, path_str)
336                    .map_err(datafusion_common::DataFusionError::Execution)?;
337                let binary = bin_arr
338                    .as_any()
339                    .downcast_ref::<LargeBinaryArray>()
340                    .ok_or_else(|| {
341                        datafusion_common::DataFusionError::Execution(
342                            "expected LargeBinary array".into(),
343                        )
344                    })?;
345                let results: BooleanArray = (0..binary.len())
346                    .map(|i| {
347                        if binary.is_null(i) {
348                            None
349                        } else {
350                            Some(compiled.exists(binary.value(i)))
351                        }
352                    })
353                    .collect();
354                Ok(ColumnarValue::Array(Arc::new(results)))
355            }
356            (
357                ColumnarValue::Scalar(datafusion_common::ScalarValue::LargeBinary(Some(bytes))),
358                ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
359            ) => {
360                let compiled = compile_cached(&self.path_cache, path_str)
361                    .map_err(datafusion_common::DataFusionError::Execution)?;
362                let result = compiled.exists(bytes);
363                Ok(ColumnarValue::Scalar(
364                    datafusion_common::ScalarValue::Boolean(Some(result)),
365                ))
366            }
367            _ => Ok(ColumnarValue::Scalar(
368                datafusion_common::ScalarValue::Boolean(None),
369            )),
370        }
371    }
372}
373
374// ── jsonb_path_match UDF ────────────────────────────────────────────
375
376/// `jsonb_path_match(jsonb, path_text) → boolean`
377///
378/// Returns the boolean result of evaluating a JSON path. The path must
379/// yield exactly one boolean value. Returns NULL if the path matches
380/// no values or the matched value is not boolean.
381#[derive(Debug)]
382pub struct JsonbPathMatchUdf {
383    signature: Signature,
384    path_cache: PathCache,
385}
386
387impl Default for JsonbPathMatchUdf {
388    fn default() -> Self {
389        Self::new()
390    }
391}
392
393impl JsonbPathMatchUdf {
394    /// Creates a new `jsonb_path_match` UDF.
395    #[must_use]
396    pub fn new() -> Self {
397        Self {
398            signature: Signature::new(
399                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
400                Volatility::Immutable,
401            ),
402            path_cache: Mutex::new(None),
403        }
404    }
405}
406
407impl PartialEq for JsonbPathMatchUdf {
408    fn eq(&self, _other: &Self) -> bool {
409        true
410    }
411}
412
413impl Eq for JsonbPathMatchUdf {}
414
415impl Hash for JsonbPathMatchUdf {
416    fn hash<H: Hasher>(&self, state: &mut H) {
417        "jsonb_path_match".hash(state);
418    }
419}
420
421/// Checks if a JSONB binary is a boolean true value.
422fn jsonb_is_true(data: &[u8]) -> Option<bool> {
423    if data.is_empty() {
424        return None;
425    }
426    match data[0] {
427        0x01 => Some(false), // BOOL_FALSE
428        0x02 => Some(true),  // BOOL_TRUE
429        _ => None,           // not a boolean
430    }
431}
432
433impl ScalarUDFImpl for JsonbPathMatchUdf {
434    fn as_any(&self) -> &dyn Any {
435        self
436    }
437
438    fn name(&self) -> &'static str {
439        "jsonb_path_match"
440    }
441
442    fn signature(&self) -> &Signature {
443        &self.signature
444    }
445
446    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
447        Ok(DataType::Boolean)
448    }
449
450    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
451        let first = &args.args[0];
452        let second = &args.args[1];
453
454        match (first, second) {
455            (
456                ColumnarValue::Array(bin_arr),
457                ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
458            ) => {
459                let compiled = compile_cached(&self.path_cache, path_str)
460                    .map_err(datafusion_common::DataFusionError::Execution)?;
461                let binary = bin_arr
462                    .as_any()
463                    .downcast_ref::<LargeBinaryArray>()
464                    .ok_or_else(|| {
465                        datafusion_common::DataFusionError::Execution(
466                            "expected LargeBinary array".into(),
467                        )
468                    })?;
469                let results: BooleanArray = (0..binary.len())
470                    .map(|i| {
471                        if binary.is_null(i) {
472                            None
473                        } else {
474                            let matched = compiled.evaluate(binary.value(i));
475                            if matched.len() == 1 {
476                                jsonb_is_true(matched[0])
477                            } else {
478                                None
479                            }
480                        }
481                    })
482                    .collect();
483                Ok(ColumnarValue::Array(Arc::new(results)))
484            }
485            (
486                ColumnarValue::Scalar(datafusion_common::ScalarValue::LargeBinary(Some(bytes))),
487                ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
488            ) => {
489                let compiled = compile_cached(&self.path_cache, path_str)
490                    .map_err(datafusion_common::DataFusionError::Execution)?;
491                let matched = compiled.evaluate(bytes);
492                let result = if matched.len() == 1 {
493                    jsonb_is_true(matched[0])
494                } else {
495                    None
496                };
497                Ok(ColumnarValue::Scalar(
498                    datafusion_common::ScalarValue::Boolean(result),
499                ))
500            }
501            _ => Ok(ColumnarValue::Scalar(
502                datafusion_common::ScalarValue::Boolean(None),
503            )),
504        }
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    // ── Path compiler tests ──
513
514    #[test]
515    fn test_compile_root_only() {
516        let path = CompiledJsonPath::compile("$").unwrap();
517        assert_eq!(path.steps, vec![JsonPathStep::Root]);
518    }
519
520    #[test]
521    fn test_compile_member_access() {
522        let path = CompiledJsonPath::compile("$.name").unwrap();
523        assert_eq!(
524            path.steps,
525            vec![JsonPathStep::Root, JsonPathStep::Member("name".into()),]
526        );
527    }
528
529    #[test]
530    fn test_compile_nested_members() {
531        let path = CompiledJsonPath::compile("$.user.address.city").unwrap();
532        assert_eq!(
533            path.steps,
534            vec![
535                JsonPathStep::Root,
536                JsonPathStep::Member("user".into()),
537                JsonPathStep::Member("address".into()),
538                JsonPathStep::Member("city".into()),
539            ]
540        );
541    }
542
543    #[test]
544    fn test_compile_array_index() {
545        let path = CompiledJsonPath::compile("$.items[0]").unwrap();
546        assert_eq!(
547            path.steps,
548            vec![
549                JsonPathStep::Root,
550                JsonPathStep::Member("items".into()),
551                JsonPathStep::ArrayIndex(0),
552            ]
553        );
554    }
555
556    #[test]
557    fn test_compile_wildcard() {
558        let path = CompiledJsonPath::compile("$.items[*].price").unwrap();
559        assert_eq!(
560            path.steps,
561            vec![
562                JsonPathStep::Root,
563                JsonPathStep::Member("items".into()),
564                JsonPathStep::ArrayWildcard,
565                JsonPathStep::Member("price".into()),
566            ]
567        );
568    }
569
570    #[test]
571    fn test_compile_quoted_member() {
572        let path = CompiledJsonPath::compile("$[\"spaced key\"]").unwrap();
573        assert_eq!(
574            path.steps,
575            vec![
576                JsonPathStep::Root,
577                JsonPathStep::Member("spaced key".into()),
578            ]
579        );
580    }
581
582    #[test]
583    fn test_compile_empty_path_error() {
584        assert!(CompiledJsonPath::compile("").is_err());
585    }
586
587    #[test]
588    fn test_compile_no_root_error() {
589        assert!(CompiledJsonPath::compile("name").is_err());
590    }
591
592    // ── Path evaluation tests ──
593
594    #[test]
595    fn test_evaluate_root() {
596        let json: serde_json::Value = serde_json::from_str(r#"{"a": 1}"#).unwrap();
597        let jsonb = json_types::encode_jsonb(&json);
598        let path = CompiledJsonPath::compile("$").unwrap();
599        let results = path.evaluate(&jsonb);
600        assert_eq!(results.len(), 1);
601    }
602
603    #[test]
604    fn test_evaluate_member() {
605        let json: serde_json::Value = serde_json::from_str(r#"{"name": "Alice"}"#).unwrap();
606        let jsonb = json_types::encode_jsonb(&json);
607        let path = CompiledJsonPath::compile("$.name").unwrap();
608        let results = path.evaluate(&jsonb);
609        assert_eq!(results.len(), 1);
610        // Should be the string "Alice" in JSONB format
611        assert_eq!(
612            json_types::jsonb_to_text(results[0]),
613            Some("Alice".to_string())
614        );
615    }
616
617    #[test]
618    fn test_evaluate_missing_member() {
619        let json: serde_json::Value = serde_json::from_str(r#"{"name": "Alice"}"#).unwrap();
620        let jsonb = json_types::encode_jsonb(&json);
621        let path = CompiledJsonPath::compile("$.age").unwrap();
622        let results = path.evaluate(&jsonb);
623        assert!(results.is_empty());
624    }
625
626    #[test]
627    fn test_evaluate_array_index() {
628        let json: serde_json::Value = serde_json::from_str(r#"{"items": [10, 20, 30]}"#).unwrap();
629        let jsonb = json_types::encode_jsonb(&json);
630        let path = CompiledJsonPath::compile("$.items[1]").unwrap();
631        let results = path.evaluate(&jsonb);
632        assert_eq!(results.len(), 1);
633        assert_eq!(
634            json_types::jsonb_to_text(results[0]),
635            Some("20".to_string())
636        );
637    }
638
639    #[test]
640    fn test_evaluate_wildcard() {
641        let json: serde_json::Value =
642            serde_json::from_str(r#"{"items": [{"price": 10}, {"price": 20}]}"#).unwrap();
643        let jsonb = json_types::encode_jsonb(&json);
644        let path = CompiledJsonPath::compile("$.items[*].price").unwrap();
645        let results = path.evaluate(&jsonb);
646        assert_eq!(results.len(), 2);
647    }
648
649    #[test]
650    fn test_exists_true() {
651        let json: serde_json::Value = serde_json::from_str(r#"{"users": [{"age": 30}]}"#).unwrap();
652        let jsonb = json_types::encode_jsonb(&json);
653        let path = CompiledJsonPath::compile("$.users[0].age").unwrap();
654        assert!(path.exists(&jsonb));
655    }
656
657    #[test]
658    fn test_exists_false() {
659        let json: serde_json::Value = serde_json::from_str(r#"{"users": [{"age": 30}]}"#).unwrap();
660        let jsonb = json_types::encode_jsonb(&json);
661        let path = CompiledJsonPath::compile("$.users[0].email").unwrap();
662        assert!(!path.exists(&jsonb));
663    }
664
665    // ── UDF tests ──
666
667    #[test]
668    fn test_path_exists_udf_scalar() {
669        let udf = JsonbPathExistsUdf::new();
670        assert_eq!(udf.name(), "jsonb_path_exists");
671        assert_eq!(udf.return_type(&[]).unwrap(), DataType::Boolean);
672    }
673
674    #[test]
675    fn test_path_match_udf_scalar() {
676        let udf = JsonbPathMatchUdf::new();
677        assert_eq!(udf.name(), "jsonb_path_match");
678        assert_eq!(udf.return_type(&[]).unwrap(), DataType::Boolean);
679    }
680}