Skip to main content

laminar_sql/datafusion/
json_udf.rs

1//! PostgreSQL-compatible JSON scalar UDFs (F-SCHEMA-011).
2//!
3//! Implements:
4//!
5//! - **Extraction**: `jsonb_get`, `jsonb_get_idx`, `jsonb_get_text`,
6//!   `jsonb_get_text_idx`, `jsonb_get_path`, `jsonb_get_path_text`
7//! - **Existence**: `jsonb_exists`, `jsonb_exists_any`, `jsonb_exists_all`
8//! - **Containment**: `jsonb_contains`, `jsonb_contained_by`
9//! - **Interrogation**: `json_typeof`
10//! - **Construction**: `json_build_object`, `json_build_array`, `to_jsonb`
11
12use std::any::Any;
13use std::hash::{Hash, Hasher};
14use std::sync::Arc;
15
16use arrow::datatypes::DataType;
17use arrow_array::{
18    builder::{LargeBinaryBuilder, StringBuilder},
19    Array, ArrayRef, BooleanArray, LargeBinaryArray, ListArray, StringArray,
20};
21use datafusion_common::Result;
22use datafusion_expr::{
23    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
24};
25
26use super::json_types;
27
28// ── Helpers ──────────────────────────────────────────────────────
29
30/// Determine the output length from args (handling scalar/array combos).
31fn output_len(args: &[ColumnarValue]) -> usize {
32    for a in args {
33        if let ColumnarValue::Array(arr) = a {
34            return arr.len();
35        }
36    }
37    1
38}
39
40/// Expand all args to arrays of the same length.
41///
42/// # Errors
43///
44/// Returns a `DataFusionError` if a scalar value cannot be expanded to the target length.
45pub fn expand_args(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
46    let len = output_len(args);
47    args.iter()
48        .map(|a| match a {
49            ColumnarValue::Array(arr) => Ok(Arc::clone(arr)),
50            ColumnarValue::Scalar(s) => s.to_array_of_size(len),
51        })
52        .collect()
53}
54
55// ══════════════════════════════════════════════════════════════════
56// jsonb_get(jsonb, text) -> jsonb  (SQL: ->)
57// ══════════════════════════════════════════════════════════════════
58
59/// `jsonb_get(jsonb, text) -> jsonb`
60///
61/// Extracts a JSON object field by key, returning JSONB.
62/// Maps to the `->` operator with a text key.
63#[derive(Debug)]
64pub struct JsonbGet {
65    signature: Signature,
66}
67
68impl JsonbGet {
69    /// Creates a new `jsonb_get` UDF.
70    #[must_use]
71    pub fn new() -> Self {
72        Self {
73            signature: Signature::new(
74                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
75                Volatility::Immutable,
76            ),
77        }
78    }
79}
80
81impl Default for JsonbGet {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl PartialEq for JsonbGet {
88    fn eq(&self, _other: &Self) -> bool {
89        true
90    }
91}
92
93impl Eq for JsonbGet {}
94
95impl Hash for JsonbGet {
96    fn hash<H: Hasher>(&self, state: &mut H) {
97        "jsonb_get".hash(state);
98    }
99}
100
101impl ScalarUDFImpl for JsonbGet {
102    fn as_any(&self) -> &dyn Any {
103        self
104    }
105
106    fn name(&self) -> &'static str {
107        "jsonb_get"
108    }
109
110    fn signature(&self) -> &Signature {
111        &self.signature
112    }
113
114    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
115        Ok(DataType::LargeBinary)
116    }
117
118    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
119        let expanded = expand_args(&args.args)?;
120        let jsonb_arr = expanded[0]
121            .as_any()
122            .downcast_ref::<LargeBinaryArray>()
123            .ok_or_else(|| {
124                datafusion_common::DataFusionError::Internal(
125                    "jsonb_get: first arg must be LargeBinary".into(),
126                )
127            })?;
128        let key_arr = expanded[1]
129            .as_any()
130            .downcast_ref::<StringArray>()
131            .ok_or_else(|| {
132                datafusion_common::DataFusionError::Internal(
133                    "jsonb_get: second arg must be Utf8".into(),
134                )
135            })?;
136
137        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
138        for i in 0..jsonb_arr.len() {
139            if jsonb_arr.is_null(i) || key_arr.is_null(i) {
140                builder.append_null();
141            } else {
142                let jsonb = jsonb_arr.value(i);
143                let key = key_arr.value(i);
144                match json_types::jsonb_get_field(jsonb, key) {
145                    Some(val) => builder.append_value(val),
146                    None => builder.append_null(),
147                }
148            }
149        }
150        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
151    }
152}
153
154// ══════════════════════════════════════════════════════════════════
155// jsonb_get_idx(jsonb, int32) -> jsonb  (SQL: -> with int)
156// ══════════════════════════════════════════════════════════════════
157
158/// `jsonb_get_idx(jsonb, int32) -> jsonb`
159///
160/// Extracts a JSON array element by index, returning JSONB.
161/// Maps to the `->` operator with an integer index.
162#[derive(Debug)]
163pub struct JsonbGetIdx {
164    signature: Signature,
165}
166
167impl JsonbGetIdx {
168    /// Creates a new `jsonb_get_idx` UDF.
169    #[must_use]
170    pub fn new() -> Self {
171        Self {
172            signature: Signature::new(
173                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Int32]),
174                Volatility::Immutable,
175            ),
176        }
177    }
178}
179
180impl Default for JsonbGetIdx {
181    fn default() -> Self {
182        Self::new()
183    }
184}
185
186impl PartialEq for JsonbGetIdx {
187    fn eq(&self, _other: &Self) -> bool {
188        true
189    }
190}
191
192impl Eq for JsonbGetIdx {}
193
194impl Hash for JsonbGetIdx {
195    fn hash<H: Hasher>(&self, state: &mut H) {
196        "jsonb_get_idx".hash(state);
197    }
198}
199
200impl ScalarUDFImpl for JsonbGetIdx {
201    fn as_any(&self) -> &dyn Any {
202        self
203    }
204
205    fn name(&self) -> &'static str {
206        "jsonb_get_idx"
207    }
208
209    fn signature(&self) -> &Signature {
210        &self.signature
211    }
212
213    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
214        Ok(DataType::LargeBinary)
215    }
216
217    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
218        let expanded = expand_args(&args.args)?;
219        let jsonb_arr = expanded[0]
220            .as_any()
221            .downcast_ref::<LargeBinaryArray>()
222            .ok_or_else(|| {
223                datafusion_common::DataFusionError::Internal(
224                    "jsonb_get_idx: first arg must be LargeBinary".into(),
225                )
226            })?;
227        let idx_arr = expanded[1]
228            .as_any()
229            .downcast_ref::<arrow_array::Int32Array>()
230            .ok_or_else(|| {
231                datafusion_common::DataFusionError::Internal(
232                    "jsonb_get_idx: second arg must be Int32".into(),
233                )
234            })?;
235
236        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
237        for i in 0..jsonb_arr.len() {
238            if jsonb_arr.is_null(i) || idx_arr.is_null(i) {
239                builder.append_null();
240            } else {
241                let jsonb = jsonb_arr.value(i);
242                let idx = idx_arr.value(i);
243                match usize::try_from(idx)
244                    .ok()
245                    .and_then(|u| json_types::jsonb_array_get(jsonb, u))
246                {
247                    Some(val) => builder.append_value(val),
248                    None => builder.append_null(),
249                }
250            }
251        }
252        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
253    }
254}
255
256// ══════════════════════════════════════════════════════════════════
257// jsonb_get_text(jsonb, text) -> text  (SQL: ->>)
258// ══════════════════════════════════════════════════════════════════
259
260/// `jsonb_get_text(jsonb, text) -> text`
261///
262/// Extracts a JSON object field by key, returning TEXT.
263/// Maps to the `->>` operator with a text key.
264#[derive(Debug)]
265pub struct JsonbGetText {
266    signature: Signature,
267}
268
269impl JsonbGetText {
270    /// Creates a new `jsonb_get_text` UDF.
271    #[must_use]
272    pub fn new() -> Self {
273        Self {
274            signature: Signature::new(
275                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
276                Volatility::Immutable,
277            ),
278        }
279    }
280}
281
282impl Default for JsonbGetText {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288impl PartialEq for JsonbGetText {
289    fn eq(&self, _other: &Self) -> bool {
290        true
291    }
292}
293
294impl Eq for JsonbGetText {}
295
296impl Hash for JsonbGetText {
297    fn hash<H: Hasher>(&self, state: &mut H) {
298        "jsonb_get_text".hash(state);
299    }
300}
301
302impl ScalarUDFImpl for JsonbGetText {
303    fn as_any(&self) -> &dyn Any {
304        self
305    }
306
307    fn name(&self) -> &'static str {
308        "jsonb_get_text"
309    }
310
311    fn signature(&self) -> &Signature {
312        &self.signature
313    }
314
315    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
316        Ok(DataType::Utf8)
317    }
318
319    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
320        let expanded = expand_args(&args.args)?;
321        let jsonb_arr = expanded[0]
322            .as_any()
323            .downcast_ref::<LargeBinaryArray>()
324            .ok_or_else(|| {
325                datafusion_common::DataFusionError::Internal(
326                    "jsonb_get_text: first arg must be LargeBinary".into(),
327                )
328            })?;
329        let key_arr = expanded[1]
330            .as_any()
331            .downcast_ref::<StringArray>()
332            .ok_or_else(|| {
333                datafusion_common::DataFusionError::Internal(
334                    "jsonb_get_text: second arg must be Utf8".into(),
335                )
336            })?;
337
338        let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
339        for i in 0..jsonb_arr.len() {
340            if jsonb_arr.is_null(i) || key_arr.is_null(i) {
341                builder.append_null();
342            } else {
343                let jsonb = jsonb_arr.value(i);
344                let key = key_arr.value(i);
345                match json_types::jsonb_get_field(jsonb, key) {
346                    Some(val) => match json_types::jsonb_to_text(val) {
347                        Some(text) => builder.append_value(&text),
348                        None => builder.append_null(),
349                    },
350                    None => builder.append_null(),
351                }
352            }
353        }
354        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
355    }
356}
357
358// ══════════════════════════════════════════════════════════════════
359// jsonb_get_text_idx(jsonb, int32) -> text  (SQL: ->> with int)
360// ══════════════════════════════════════════════════════════════════
361
362/// `jsonb_get_text_idx(jsonb, int32) -> text`
363///
364/// Extracts a JSON array element by index, returning TEXT.
365/// Maps to the `->>` operator with an integer index.
366#[derive(Debug)]
367pub struct JsonbGetTextIdx {
368    signature: Signature,
369}
370
371impl JsonbGetTextIdx {
372    /// Creates a new `jsonb_get_text_idx` UDF.
373    #[must_use]
374    pub fn new() -> Self {
375        Self {
376            signature: Signature::new(
377                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Int32]),
378                Volatility::Immutable,
379            ),
380        }
381    }
382}
383
384impl Default for JsonbGetTextIdx {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390impl PartialEq for JsonbGetTextIdx {
391    fn eq(&self, _other: &Self) -> bool {
392        true
393    }
394}
395
396impl Eq for JsonbGetTextIdx {}
397
398impl Hash for JsonbGetTextIdx {
399    fn hash<H: Hasher>(&self, state: &mut H) {
400        "jsonb_get_text_idx".hash(state);
401    }
402}
403
404impl ScalarUDFImpl for JsonbGetTextIdx {
405    fn as_any(&self) -> &dyn Any {
406        self
407    }
408
409    fn name(&self) -> &'static str {
410        "jsonb_get_text_idx"
411    }
412
413    fn signature(&self) -> &Signature {
414        &self.signature
415    }
416
417    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
418        Ok(DataType::Utf8)
419    }
420
421    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
422        let expanded = expand_args(&args.args)?;
423        let jsonb_arr = expanded[0]
424            .as_any()
425            .downcast_ref::<LargeBinaryArray>()
426            .ok_or_else(|| {
427                datafusion_common::DataFusionError::Internal(
428                    "jsonb_get_text_idx: first arg must be LargeBinary".into(),
429                )
430            })?;
431        let idx_arr = expanded[1]
432            .as_any()
433            .downcast_ref::<arrow_array::Int32Array>()
434            .ok_or_else(|| {
435                datafusion_common::DataFusionError::Internal(
436                    "jsonb_get_text_idx: second arg must be Int32".into(),
437                )
438            })?;
439
440        let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
441        for i in 0..jsonb_arr.len() {
442            if jsonb_arr.is_null(i) || idx_arr.is_null(i) {
443                builder.append_null();
444            } else {
445                let jsonb = jsonb_arr.value(i);
446                let idx = idx_arr.value(i);
447                match usize::try_from(idx)
448                    .ok()
449                    .and_then(|u| json_types::jsonb_array_get(jsonb, u))
450                {
451                    Some(val) => match json_types::jsonb_to_text(val) {
452                        Some(text) => builder.append_value(&text),
453                        None => builder.append_null(),
454                    },
455                    None => builder.append_null(),
456                }
457            }
458        }
459        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
460    }
461}
462
463// ══════════════════════════════════════════════════════════════════
464// jsonb_get_path(jsonb, text[]) -> jsonb  (SQL: #>)
465// ══════════════════════════════════════════════════════════════════
466
467/// `jsonb_get_path(jsonb, text[]) -> jsonb`
468///
469/// Extracts a JSONB value at a nested path given as a text array.
470/// Maps to the `#>` operator.
471#[derive(Debug)]
472pub struct JsonbGetPath {
473    signature: Signature,
474}
475
476impl JsonbGetPath {
477    /// Creates a new `jsonb_get_path` UDF.
478    #[must_use]
479    pub fn new() -> Self {
480        Self {
481            signature: Signature::new(
482                TypeSignature::Exact(vec![
483                    DataType::LargeBinary,
484                    DataType::List(Arc::new(arrow_schema::Field::new(
485                        "item",
486                        DataType::Utf8,
487                        true,
488                    ))),
489                ]),
490                Volatility::Immutable,
491            ),
492        }
493    }
494}
495
496impl Default for JsonbGetPath {
497    fn default() -> Self {
498        Self::new()
499    }
500}
501
502impl PartialEq for JsonbGetPath {
503    fn eq(&self, _other: &Self) -> bool {
504        true
505    }
506}
507
508impl Eq for JsonbGetPath {}
509
510impl Hash for JsonbGetPath {
511    fn hash<H: Hasher>(&self, state: &mut H) {
512        "jsonb_get_path".hash(state);
513    }
514}
515
516impl ScalarUDFImpl for JsonbGetPath {
517    fn as_any(&self) -> &dyn Any {
518        self
519    }
520
521    fn name(&self) -> &'static str {
522        "jsonb_get_path"
523    }
524
525    fn signature(&self) -> &Signature {
526        &self.signature
527    }
528
529    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
530        Ok(DataType::LargeBinary)
531    }
532
533    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
534        let expanded = expand_args(&args.args)?;
535        let jsonb_arr = expanded[0]
536            .as_any()
537            .downcast_ref::<LargeBinaryArray>()
538            .ok_or_else(|| {
539                datafusion_common::DataFusionError::Internal(
540                    "jsonb_get_path: first arg must be LargeBinary".into(),
541                )
542            })?;
543        let path_arr = expanded[1]
544            .as_any()
545            .downcast_ref::<ListArray>()
546            .ok_or_else(|| {
547                datafusion_common::DataFusionError::Internal(
548                    "jsonb_get_path: second arg must be List<Utf8>".into(),
549                )
550            })?;
551
552        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
553        for i in 0..jsonb_arr.len() {
554            if jsonb_arr.is_null(i) || path_arr.is_null(i) {
555                builder.append_null();
556            } else {
557                let jsonb = jsonb_arr.value(i);
558                let path_list = path_arr.value(i);
559                let path_strings = path_list
560                    .as_any()
561                    .downcast_ref::<StringArray>()
562                    .ok_or_else(|| {
563                        datafusion_common::DataFusionError::Internal(
564                            "jsonb_get_path: path elements must be Utf8".into(),
565                        )
566                    })?;
567                match walk_path(jsonb, path_strings) {
568                    Some(val) => builder.append_value(val),
569                    None => builder.append_null(),
570                }
571            }
572        }
573        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
574    }
575}
576
577/// Walk a JSONB value through a sequence of string keys.
578fn walk_path<'a>(mut jsonb: &'a [u8], path: &StringArray) -> Option<&'a [u8]> {
579    for i in 0..path.len() {
580        if path.is_null(i) {
581            return None;
582        }
583        let key = path.value(i);
584        // Try object field access first
585        if let Some(next) = json_types::jsonb_get_field(jsonb, key) {
586            jsonb = next;
587        } else if let Ok(idx) = key.parse::<usize>() {
588            // Fall back to array index
589            jsonb = json_types::jsonb_array_get(jsonb, idx)?;
590        } else {
591            return None;
592        }
593    }
594    Some(jsonb)
595}
596
597// ══════════════════════════════════════════════════════════════════
598// jsonb_get_path_text(jsonb, text[]) -> text  (SQL: #>>)
599// ══════════════════════════════════════════════════════════════════
600
601/// `jsonb_get_path_text(jsonb, text[]) -> text`
602///
603/// Extracts a text value at a nested path given as a text array.
604/// Maps to the `#>>` operator.
605#[derive(Debug)]
606pub struct JsonbGetPathText {
607    signature: Signature,
608}
609
610impl JsonbGetPathText {
611    /// Creates a new `jsonb_get_path_text` UDF.
612    #[must_use]
613    pub fn new() -> Self {
614        Self {
615            signature: Signature::new(
616                TypeSignature::Exact(vec![
617                    DataType::LargeBinary,
618                    DataType::List(Arc::new(arrow_schema::Field::new(
619                        "item",
620                        DataType::Utf8,
621                        true,
622                    ))),
623                ]),
624                Volatility::Immutable,
625            ),
626        }
627    }
628}
629
630impl Default for JsonbGetPathText {
631    fn default() -> Self {
632        Self::new()
633    }
634}
635
636impl PartialEq for JsonbGetPathText {
637    fn eq(&self, _other: &Self) -> bool {
638        true
639    }
640}
641
642impl Eq for JsonbGetPathText {}
643
644impl Hash for JsonbGetPathText {
645    fn hash<H: Hasher>(&self, state: &mut H) {
646        "jsonb_get_path_text".hash(state);
647    }
648}
649
650impl ScalarUDFImpl for JsonbGetPathText {
651    fn as_any(&self) -> &dyn Any {
652        self
653    }
654
655    fn name(&self) -> &'static str {
656        "jsonb_get_path_text"
657    }
658
659    fn signature(&self) -> &Signature {
660        &self.signature
661    }
662
663    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
664        Ok(DataType::Utf8)
665    }
666
667    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
668        let expanded = expand_args(&args.args)?;
669        let jsonb_arr = expanded[0]
670            .as_any()
671            .downcast_ref::<LargeBinaryArray>()
672            .ok_or_else(|| {
673                datafusion_common::DataFusionError::Internal(
674                    "jsonb_get_path_text: first arg must be LargeBinary".into(),
675                )
676            })?;
677        let path_arr = expanded[1]
678            .as_any()
679            .downcast_ref::<ListArray>()
680            .ok_or_else(|| {
681                datafusion_common::DataFusionError::Internal(
682                    "jsonb_get_path_text: second arg must be List<Utf8>".into(),
683                )
684            })?;
685
686        let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
687        for i in 0..jsonb_arr.len() {
688            if jsonb_arr.is_null(i) || path_arr.is_null(i) {
689                builder.append_null();
690            } else {
691                let jsonb = jsonb_arr.value(i);
692                let path_list = path_arr.value(i);
693                let path_strings = path_list
694                    .as_any()
695                    .downcast_ref::<StringArray>()
696                    .ok_or_else(|| {
697                        datafusion_common::DataFusionError::Internal(
698                            "jsonb_get_path_text: path elements must be Utf8".into(),
699                        )
700                    })?;
701                match walk_path(jsonb, path_strings) {
702                    Some(val) => match json_types::jsonb_to_text(val) {
703                        Some(text) => builder.append_value(&text),
704                        None => builder.append_null(),
705                    },
706                    None => builder.append_null(),
707                }
708            }
709        }
710        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
711    }
712}
713
714// ══════════════════════════════════════════════════════════════════
715// jsonb_exists(jsonb, text) -> bool  (SQL: ?)
716// ══════════════════════════════════════════════════════════════════
717
718/// `jsonb_exists(jsonb, text) -> bool`
719///
720/// Returns true if the JSONB object contains the given key.
721/// Maps to the `?` operator.
722#[derive(Debug)]
723pub struct JsonbExists {
724    signature: Signature,
725}
726
727impl JsonbExists {
728    /// Creates a new `jsonb_exists` UDF.
729    #[must_use]
730    pub fn new() -> Self {
731        Self {
732            signature: Signature::new(
733                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
734                Volatility::Immutable,
735            ),
736        }
737    }
738}
739
740impl Default for JsonbExists {
741    fn default() -> Self {
742        Self::new()
743    }
744}
745
746impl PartialEq for JsonbExists {
747    fn eq(&self, _other: &Self) -> bool {
748        true
749    }
750}
751
752impl Eq for JsonbExists {}
753
754impl Hash for JsonbExists {
755    fn hash<H: Hasher>(&self, state: &mut H) {
756        "jsonb_exists".hash(state);
757    }
758}
759
760impl ScalarUDFImpl for JsonbExists {
761    fn as_any(&self) -> &dyn Any {
762        self
763    }
764
765    fn name(&self) -> &'static str {
766        "jsonb_exists"
767    }
768
769    fn signature(&self) -> &Signature {
770        &self.signature
771    }
772
773    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
774        Ok(DataType::Boolean)
775    }
776
777    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
778        let expanded = expand_args(&args.args)?;
779        let jsonb_arr = expanded[0]
780            .as_any()
781            .downcast_ref::<LargeBinaryArray>()
782            .ok_or_else(|| {
783                datafusion_common::DataFusionError::Internal(
784                    "jsonb_exists: first arg must be LargeBinary".into(),
785                )
786            })?;
787        let key_arr = expanded[1]
788            .as_any()
789            .downcast_ref::<StringArray>()
790            .ok_or_else(|| {
791                datafusion_common::DataFusionError::Internal(
792                    "jsonb_exists: second arg must be Utf8".into(),
793                )
794            })?;
795
796        let result: BooleanArray = (0..jsonb_arr.len())
797            .map(|i| {
798                if jsonb_arr.is_null(i) || key_arr.is_null(i) {
799                    None
800                } else {
801                    Some(json_types::jsonb_has_key(
802                        jsonb_arr.value(i),
803                        key_arr.value(i),
804                    ))
805                }
806            })
807            .collect();
808        Ok(ColumnarValue::Array(Arc::new(result)))
809    }
810}
811
812// ══════════════════════════════════════════════════════════════════
813// jsonb_exists_any(jsonb, text[]) -> bool  (SQL: ?|)
814// ══════════════════════════════════════════════════════════════════
815
816/// `jsonb_exists_any(jsonb, text[]) -> bool`
817///
818/// Returns true if the JSONB object contains any of the given keys.
819/// Maps to the `?|` operator.
820#[derive(Debug)]
821pub struct JsonbExistsAny {
822    signature: Signature,
823}
824
825impl JsonbExistsAny {
826    /// Creates a new `jsonb_exists_any` UDF.
827    #[must_use]
828    pub fn new() -> Self {
829        Self {
830            signature: Signature::new(
831                TypeSignature::Exact(vec![
832                    DataType::LargeBinary,
833                    DataType::List(Arc::new(arrow_schema::Field::new(
834                        "item",
835                        DataType::Utf8,
836                        true,
837                    ))),
838                ]),
839                Volatility::Immutable,
840            ),
841        }
842    }
843}
844
845impl Default for JsonbExistsAny {
846    fn default() -> Self {
847        Self::new()
848    }
849}
850
851impl PartialEq for JsonbExistsAny {
852    fn eq(&self, _other: &Self) -> bool {
853        true
854    }
855}
856
857impl Eq for JsonbExistsAny {}
858
859impl Hash for JsonbExistsAny {
860    fn hash<H: Hasher>(&self, state: &mut H) {
861        "jsonb_exists_any".hash(state);
862    }
863}
864
865impl ScalarUDFImpl for JsonbExistsAny {
866    fn as_any(&self) -> &dyn Any {
867        self
868    }
869
870    fn name(&self) -> &'static str {
871        "jsonb_exists_any"
872    }
873
874    fn signature(&self) -> &Signature {
875        &self.signature
876    }
877
878    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
879        Ok(DataType::Boolean)
880    }
881
882    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
883        let expanded = expand_args(&args.args)?;
884        let jsonb_arr = expanded[0]
885            .as_any()
886            .downcast_ref::<LargeBinaryArray>()
887            .ok_or_else(|| {
888                datafusion_common::DataFusionError::Internal(
889                    "jsonb_exists_any: first arg must be LargeBinary".into(),
890                )
891            })?;
892        let keys_arr = expanded[1]
893            .as_any()
894            .downcast_ref::<ListArray>()
895            .ok_or_else(|| {
896                datafusion_common::DataFusionError::Internal(
897                    "jsonb_exists_any: second arg must be List<Utf8>".into(),
898                )
899            })?;
900
901        let result: BooleanArray =
902            (0..jsonb_arr.len())
903                .map(|i| {
904                    if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
905                        return None;
906                    }
907                    let jsonb = jsonb_arr.value(i);
908                    let keys_list = keys_arr.value(i);
909                    let keys = keys_list.as_any().downcast_ref::<StringArray>()?;
910                    Some((0..keys.len()).any(|k| {
911                        !keys.is_null(k) && json_types::jsonb_has_key(jsonb, keys.value(k))
912                    }))
913                })
914                .collect();
915        Ok(ColumnarValue::Array(Arc::new(result)))
916    }
917}
918
919// ══════════════════════════════════════════════════════════════════
920// jsonb_exists_all(jsonb, text[]) -> bool  (SQL: ?&)
921// ══════════════════════════════════════════════════════════════════
922
923/// `jsonb_exists_all(jsonb, text[]) -> bool`
924///
925/// Returns true if the JSONB object contains all of the given keys.
926/// Maps to the `?&` operator.
927#[derive(Debug)]
928pub struct JsonbExistsAll {
929    signature: Signature,
930}
931
932impl JsonbExistsAll {
933    /// Creates a new `jsonb_exists_all` UDF.
934    #[must_use]
935    pub fn new() -> Self {
936        Self {
937            signature: Signature::new(
938                TypeSignature::Exact(vec![
939                    DataType::LargeBinary,
940                    DataType::List(Arc::new(arrow_schema::Field::new(
941                        "item",
942                        DataType::Utf8,
943                        true,
944                    ))),
945                ]),
946                Volatility::Immutable,
947            ),
948        }
949    }
950}
951
952impl Default for JsonbExistsAll {
953    fn default() -> Self {
954        Self::new()
955    }
956}
957
958impl PartialEq for JsonbExistsAll {
959    fn eq(&self, _other: &Self) -> bool {
960        true
961    }
962}
963
964impl Eq for JsonbExistsAll {}
965
966impl Hash for JsonbExistsAll {
967    fn hash<H: Hasher>(&self, state: &mut H) {
968        "jsonb_exists_all".hash(state);
969    }
970}
971
972impl ScalarUDFImpl for JsonbExistsAll {
973    fn as_any(&self) -> &dyn Any {
974        self
975    }
976
977    fn name(&self) -> &'static str {
978        "jsonb_exists_all"
979    }
980
981    fn signature(&self) -> &Signature {
982        &self.signature
983    }
984
985    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
986        Ok(DataType::Boolean)
987    }
988
989    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
990        let expanded = expand_args(&args.args)?;
991        let jsonb_arr = expanded[0]
992            .as_any()
993            .downcast_ref::<LargeBinaryArray>()
994            .ok_or_else(|| {
995                datafusion_common::DataFusionError::Internal(
996                    "jsonb_exists_all: first arg must be LargeBinary".into(),
997                )
998            })?;
999        let keys_arr = expanded[1]
1000            .as_any()
1001            .downcast_ref::<ListArray>()
1002            .ok_or_else(|| {
1003                datafusion_common::DataFusionError::Internal(
1004                    "jsonb_exists_all: second arg must be List<Utf8>".into(),
1005                )
1006            })?;
1007
1008        let result: BooleanArray =
1009            (0..jsonb_arr.len())
1010                .map(|i| {
1011                    if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
1012                        return None;
1013                    }
1014                    let jsonb = jsonb_arr.value(i);
1015                    let keys_list = keys_arr.value(i);
1016                    let keys = keys_list.as_any().downcast_ref::<StringArray>()?;
1017                    Some((0..keys.len()).all(|k| {
1018                        !keys.is_null(k) && json_types::jsonb_has_key(jsonb, keys.value(k))
1019                    }))
1020                })
1021                .collect();
1022        Ok(ColumnarValue::Array(Arc::new(result)))
1023    }
1024}
1025
1026// ══════════════════════════════════════════════════════════════════
1027// jsonb_contains(jsonb, jsonb) -> bool  (SQL: @>)
1028// ══════════════════════════════════════════════════════════════════
1029
1030/// `jsonb_contains(jsonb, jsonb) -> bool`
1031///
1032/// Returns true if the left JSONB value contains the right.
1033/// Maps to the `@>` operator.
1034#[derive(Debug)]
1035pub struct JsonbContains {
1036    signature: Signature,
1037}
1038
1039impl JsonbContains {
1040    /// Creates a new `jsonb_contains` UDF.
1041    #[must_use]
1042    pub fn new() -> Self {
1043        Self {
1044            signature: Signature::new(
1045                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
1046                Volatility::Immutable,
1047            ),
1048        }
1049    }
1050}
1051
1052impl Default for JsonbContains {
1053    fn default() -> Self {
1054        Self::new()
1055    }
1056}
1057
1058impl PartialEq for JsonbContains {
1059    fn eq(&self, _other: &Self) -> bool {
1060        true
1061    }
1062}
1063
1064impl Eq for JsonbContains {}
1065
1066impl Hash for JsonbContains {
1067    fn hash<H: Hasher>(&self, state: &mut H) {
1068        "jsonb_contains".hash(state);
1069    }
1070}
1071
1072impl ScalarUDFImpl for JsonbContains {
1073    fn as_any(&self) -> &dyn Any {
1074        self
1075    }
1076
1077    fn name(&self) -> &'static str {
1078        "jsonb_contains"
1079    }
1080
1081    fn signature(&self) -> &Signature {
1082        &self.signature
1083    }
1084
1085    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1086        Ok(DataType::Boolean)
1087    }
1088
1089    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1090        let expanded = expand_args(&args.args)?;
1091        let left_arr = expanded[0]
1092            .as_any()
1093            .downcast_ref::<LargeBinaryArray>()
1094            .ok_or_else(|| {
1095                datafusion_common::DataFusionError::Internal(
1096                    "jsonb_contains: first arg must be LargeBinary".into(),
1097                )
1098            })?;
1099        let right_arr = expanded[1]
1100            .as_any()
1101            .downcast_ref::<LargeBinaryArray>()
1102            .ok_or_else(|| {
1103                datafusion_common::DataFusionError::Internal(
1104                    "jsonb_contains: second arg must be LargeBinary".into(),
1105                )
1106            })?;
1107
1108        let result: BooleanArray = (0..left_arr.len())
1109            .map(|i| {
1110                if left_arr.is_null(i) || right_arr.is_null(i) {
1111                    None
1112                } else {
1113                    json_types::jsonb_contains(left_arr.value(i), right_arr.value(i))
1114                }
1115            })
1116            .collect();
1117        Ok(ColumnarValue::Array(Arc::new(result)))
1118    }
1119}
1120
1121// ══════════════════════════════════════════════════════════════════
1122// jsonb_contained_by(jsonb, jsonb) -> bool  (SQL: <@)
1123// ══════════════════════════════════════════════════════════════════
1124
1125/// `jsonb_contained_by(jsonb, jsonb) -> bool`
1126///
1127/// Returns true if the left JSONB value is contained by the right.
1128/// Maps to the `<@` operator.
1129#[derive(Debug)]
1130pub struct JsonbContainedBy {
1131    signature: Signature,
1132}
1133
1134impl JsonbContainedBy {
1135    /// Creates a new `jsonb_contained_by` UDF.
1136    #[must_use]
1137    pub fn new() -> Self {
1138        Self {
1139            signature: Signature::new(
1140                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
1141                Volatility::Immutable,
1142            ),
1143        }
1144    }
1145}
1146
1147impl Default for JsonbContainedBy {
1148    fn default() -> Self {
1149        Self::new()
1150    }
1151}
1152
1153impl PartialEq for JsonbContainedBy {
1154    fn eq(&self, _other: &Self) -> bool {
1155        true
1156    }
1157}
1158
1159impl Eq for JsonbContainedBy {}
1160
1161impl Hash for JsonbContainedBy {
1162    fn hash<H: Hasher>(&self, state: &mut H) {
1163        "jsonb_contained_by".hash(state);
1164    }
1165}
1166
1167impl ScalarUDFImpl for JsonbContainedBy {
1168    fn as_any(&self) -> &dyn Any {
1169        self
1170    }
1171
1172    fn name(&self) -> &'static str {
1173        "jsonb_contained_by"
1174    }
1175
1176    fn signature(&self) -> &Signature {
1177        &self.signature
1178    }
1179
1180    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1181        Ok(DataType::Boolean)
1182    }
1183
1184    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1185        let expanded = expand_args(&args.args)?;
1186        let left_arr = expanded[0]
1187            .as_any()
1188            .downcast_ref::<LargeBinaryArray>()
1189            .ok_or_else(|| {
1190                datafusion_common::DataFusionError::Internal(
1191                    "jsonb_contained_by: first arg must be LargeBinary".into(),
1192                )
1193            })?;
1194        let right_arr = expanded[1]
1195            .as_any()
1196            .downcast_ref::<LargeBinaryArray>()
1197            .ok_or_else(|| {
1198                datafusion_common::DataFusionError::Internal(
1199                    "jsonb_contained_by: second arg must be LargeBinary".into(),
1200                )
1201            })?;
1202
1203        // <@ is just @> with swapped args
1204        let result: BooleanArray = (0..left_arr.len())
1205            .map(|i| {
1206                if left_arr.is_null(i) || right_arr.is_null(i) {
1207                    None
1208                } else {
1209                    json_types::jsonb_contains(right_arr.value(i), left_arr.value(i))
1210                }
1211            })
1212            .collect();
1213        Ok(ColumnarValue::Array(Arc::new(result)))
1214    }
1215}
1216
1217// ══════════════════════════════════════════════════════════════════
1218// json_typeof(jsonb) -> text
1219// ══════════════════════════════════════════════════════════════════
1220
1221/// `json_typeof(jsonb) -> text`
1222///
1223/// Returns the type of the outermost JSON value as text:
1224/// "object", "array", "string", "number", "boolean", or "null".
1225///
1226/// Reads only the type tag byte — O(1).
1227#[derive(Debug)]
1228pub struct JsonTypeof {
1229    signature: Signature,
1230}
1231
1232impl JsonTypeof {
1233    /// Creates a new `json_typeof` UDF.
1234    #[must_use]
1235    pub fn new() -> Self {
1236        Self {
1237            signature: Signature::new(
1238                TypeSignature::Exact(vec![DataType::LargeBinary]),
1239                Volatility::Immutable,
1240            ),
1241        }
1242    }
1243}
1244
1245impl Default for JsonTypeof {
1246    fn default() -> Self {
1247        Self::new()
1248    }
1249}
1250
1251impl PartialEq for JsonTypeof {
1252    fn eq(&self, _other: &Self) -> bool {
1253        true
1254    }
1255}
1256
1257impl Eq for JsonTypeof {}
1258
1259impl Hash for JsonTypeof {
1260    fn hash<H: Hasher>(&self, state: &mut H) {
1261        "json_typeof".hash(state);
1262    }
1263}
1264
1265impl ScalarUDFImpl for JsonTypeof {
1266    fn as_any(&self) -> &dyn Any {
1267        self
1268    }
1269
1270    fn name(&self) -> &'static str {
1271        "json_typeof"
1272    }
1273
1274    fn signature(&self) -> &Signature {
1275        &self.signature
1276    }
1277
1278    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1279        Ok(DataType::Utf8)
1280    }
1281
1282    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1283        let expanded = expand_args(&args.args)?;
1284        let jsonb_arr = expanded[0]
1285            .as_any()
1286            .downcast_ref::<LargeBinaryArray>()
1287            .ok_or_else(|| {
1288                datafusion_common::DataFusionError::Internal(
1289                    "json_typeof: arg must be LargeBinary".into(),
1290                )
1291            })?;
1292
1293        let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), jsonb_arr.len() * 8);
1294        for i in 0..jsonb_arr.len() {
1295            if jsonb_arr.is_null(i) {
1296                builder.append_null();
1297            } else {
1298                match json_types::jsonb_type_name(jsonb_arr.value(i)) {
1299                    Some(name) => builder.append_value(name),
1300                    None => builder.append_null(),
1301                }
1302            }
1303        }
1304        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1305    }
1306}
1307
1308// ══════════════════════════════════════════════════════════════════
1309// json_build_object(k1, v1, k2, v2, ...) -> jsonb
1310// ══════════════════════════════════════════════════════════════════
1311
1312/// `json_build_object(key1, value1, key2, value2, ...) -> jsonb`
1313///
1314/// Constructs a JSONB object from alternating key-value pairs.
1315/// Executes in Ring 1 (allocates JSONB binary buffer).
1316#[derive(Debug)]
1317pub struct JsonBuildObject {
1318    signature: Signature,
1319}
1320
1321impl JsonBuildObject {
1322    /// Creates a new `json_build_object` UDF.
1323    #[must_use]
1324    pub fn new() -> Self {
1325        Self {
1326            signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
1327        }
1328    }
1329}
1330
1331impl Default for JsonBuildObject {
1332    fn default() -> Self {
1333        Self::new()
1334    }
1335}
1336
1337impl PartialEq for JsonBuildObject {
1338    fn eq(&self, _other: &Self) -> bool {
1339        true
1340    }
1341}
1342
1343impl Eq for JsonBuildObject {}
1344
1345impl Hash for JsonBuildObject {
1346    fn hash<H: Hasher>(&self, state: &mut H) {
1347        "json_build_object".hash(state);
1348    }
1349}
1350
1351impl ScalarUDFImpl for JsonBuildObject {
1352    fn as_any(&self) -> &dyn Any {
1353        self
1354    }
1355
1356    fn name(&self) -> &'static str {
1357        "json_build_object"
1358    }
1359
1360    fn signature(&self) -> &Signature {
1361        &self.signature
1362    }
1363
1364    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1365        Ok(DataType::LargeBinary)
1366    }
1367
1368    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1369        if !args.args.len().is_multiple_of(2) {
1370            return Err(datafusion_common::DataFusionError::Execution(
1371                "json_build_object requires an even number of arguments".into(),
1372            ));
1373        }
1374
1375        let expanded = expand_args(&args.args)?;
1376        let len = expanded.first().map_or(1, Array::len);
1377
1378        let mut builder = LargeBinaryBuilder::with_capacity(len, 256);
1379        for row in 0..len {
1380            let mut obj = serde_json::Map::new();
1381            let mut is_null = false;
1382            for pair in expanded.chunks(2) {
1383                let key_arr = &pair[0];
1384                let val_arr = &pair[1];
1385                if key_arr.is_null(row) {
1386                    is_null = true;
1387                    break;
1388                }
1389                let key = scalar_to_json_key(key_arr, row)?;
1390                let val = scalar_to_json_value(val_arr, row);
1391                obj.insert(key, val);
1392            }
1393            if is_null {
1394                builder.append_null();
1395            } else {
1396                let jsonb = json_types::encode_jsonb(&serde_json::Value::Object(obj));
1397                builder.append_value(&jsonb);
1398            }
1399        }
1400        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1401    }
1402}
1403
1404// ══════════════════════════════════════════════════════════════════
1405// json_build_array(v1, v2, ...) -> jsonb
1406// ══════════════════════════════════════════════════════════════════
1407
1408/// `json_build_array(v1, v2, ...) -> jsonb`
1409///
1410/// Constructs a JSONB array from the given values.
1411/// Executes in Ring 1 (allocates JSONB binary buffer).
1412#[derive(Debug)]
1413pub struct JsonBuildArray {
1414    signature: Signature,
1415}
1416
1417impl JsonBuildArray {
1418    /// Creates a new `json_build_array` UDF.
1419    #[must_use]
1420    pub fn new() -> Self {
1421        Self {
1422            signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
1423        }
1424    }
1425}
1426
1427impl Default for JsonBuildArray {
1428    fn default() -> Self {
1429        Self::new()
1430    }
1431}
1432
1433impl PartialEq for JsonBuildArray {
1434    fn eq(&self, _other: &Self) -> bool {
1435        true
1436    }
1437}
1438
1439impl Eq for JsonBuildArray {}
1440
1441impl Hash for JsonBuildArray {
1442    fn hash<H: Hasher>(&self, state: &mut H) {
1443        "json_build_array".hash(state);
1444    }
1445}
1446
1447impl ScalarUDFImpl for JsonBuildArray {
1448    fn as_any(&self) -> &dyn Any {
1449        self
1450    }
1451
1452    fn name(&self) -> &'static str {
1453        "json_build_array"
1454    }
1455
1456    fn signature(&self) -> &Signature {
1457        &self.signature
1458    }
1459
1460    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1461        Ok(DataType::LargeBinary)
1462    }
1463
1464    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1465        let expanded = expand_args(&args.args)?;
1466        let len = expanded.first().map_or(1, Array::len);
1467
1468        let mut builder = LargeBinaryBuilder::with_capacity(len, 256);
1469        for row in 0..len {
1470            let mut arr = Vec::with_capacity(expanded.len());
1471            for col in &expanded {
1472                arr.push(scalar_to_json_value(col, row));
1473            }
1474            let jsonb = json_types::encode_jsonb(&serde_json::Value::Array(arr));
1475            builder.append_value(&jsonb);
1476        }
1477        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1478    }
1479}
1480
1481// ══════════════════════════════════════════════════════════════════
1482// to_jsonb(any) -> jsonb
1483// ══════════════════════════════════════════════════════════════════
1484
1485/// `to_jsonb(value) -> jsonb`
1486///
1487/// Converts any SQL value to JSONB binary format.
1488#[derive(Debug)]
1489pub struct ToJsonb {
1490    signature: Signature,
1491}
1492
1493impl ToJsonb {
1494    /// Creates a new `to_jsonb` UDF.
1495    #[must_use]
1496    pub fn new() -> Self {
1497        Self {
1498            signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
1499        }
1500    }
1501}
1502
1503impl Default for ToJsonb {
1504    fn default() -> Self {
1505        Self::new()
1506    }
1507}
1508
1509impl PartialEq for ToJsonb {
1510    fn eq(&self, _other: &Self) -> bool {
1511        true
1512    }
1513}
1514
1515impl Eq for ToJsonb {}
1516
1517impl Hash for ToJsonb {
1518    fn hash<H: Hasher>(&self, state: &mut H) {
1519        "to_jsonb".hash(state);
1520    }
1521}
1522
1523impl ScalarUDFImpl for ToJsonb {
1524    fn as_any(&self) -> &dyn Any {
1525        self
1526    }
1527
1528    fn name(&self) -> &'static str {
1529        "to_jsonb"
1530    }
1531
1532    fn signature(&self) -> &Signature {
1533        &self.signature
1534    }
1535
1536    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1537        Ok(DataType::LargeBinary)
1538    }
1539
1540    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1541        let expanded = expand_args(&args.args)?;
1542        let arr = &expanded[0];
1543        let len = arr.len();
1544
1545        let mut builder = LargeBinaryBuilder::with_capacity(len, 64);
1546        for row in 0..len {
1547            if arr.is_null(row) {
1548                let jsonb = json_types::encode_jsonb(&serde_json::Value::Null);
1549                builder.append_value(&jsonb);
1550            } else {
1551                let val = scalar_to_json_value(arr, row);
1552                let jsonb = json_types::encode_jsonb(&val);
1553                builder.append_value(&jsonb);
1554            }
1555        }
1556        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1557    }
1558}
1559
1560// ── Scalar conversion helpers ────────────────────────────────────
1561
1562/// Convert an Arrow array value at `row` to a JSON key string.
1563fn scalar_to_json_key(arr: &ArrayRef, row: usize) -> Result<String> {
1564    if let Some(s) = arr.as_any().downcast_ref::<StringArray>() {
1565        return Ok(s.value(row).to_owned());
1566    }
1567    // Fallback: convert to string representation
1568    Err(datafusion_common::DataFusionError::Execution(
1569        "json_build_object keys must be text".into(),
1570    ))
1571}
1572
1573/// Convert an Arrow array value at `row` to a `serde_json::Value`.
1574fn scalar_to_json_value(arr: &ArrayRef, row: usize) -> serde_json::Value {
1575    if arr.is_null(row) {
1576        return serde_json::Value::Null;
1577    }
1578
1579    // Try common types
1580    if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
1581        return serde_json::Value::String(a.value(row).to_owned());
1582    }
1583    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int64Array>() {
1584        return serde_json::Value::Number(a.value(row).into());
1585    }
1586    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int32Array>() {
1587        return serde_json::Value::Number(i64::from(a.value(row)).into());
1588    }
1589    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Float64Array>() {
1590        if let Some(n) = serde_json::Number::from_f64(a.value(row)) {
1591            return serde_json::Value::Number(n);
1592        }
1593        return serde_json::Value::Null;
1594    }
1595    if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
1596        return serde_json::Value::Bool(a.value(row));
1597    }
1598    if let Some(a) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
1599        // Already JSONB — convert to JSON value for re-encoding
1600        let bytes = a.value(row);
1601        if let Some(text) = json_types::jsonb_to_text(bytes) {
1602            // Try to parse the text as JSON
1603            if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text) {
1604                return val;
1605            }
1606            return serde_json::Value::String(text);
1607        }
1608        return serde_json::Value::Null;
1609    }
1610
1611    // Fallback: use display
1612    let scalar = datafusion_common::ScalarValue::try_from_array(arr, row).ok();
1613    match scalar {
1614        Some(s) => serde_json::Value::String(s.to_string()),
1615        None => serde_json::Value::Null,
1616    }
1617}
1618
1619// ══════════════════════════════════════════════════════════════════
1620// Tests
1621// ══════════════════════════════════════════════════════════════════
1622
1623#[cfg(test)]
1624mod tests {
1625    use super::*;
1626    use arrow_schema::Field;
1627    use datafusion_common::config::ConfigOptions;
1628    use datafusion_expr::ScalarUDF;
1629
1630    fn enc(v: &serde_json::Value) -> Vec<u8> {
1631        json_types::encode_jsonb(v)
1632    }
1633
1634    fn make_jsonb_array(vals: &[serde_json::Value]) -> LargeBinaryArray {
1635        let encoded: Vec<Vec<u8>> = vals.iter().map(enc).collect();
1636        let refs: Vec<&[u8]> = encoded.iter().map(Vec::as_slice).collect();
1637        LargeBinaryArray::from_iter_values(refs)
1638    }
1639
1640    fn make_string_array(vals: &[&str]) -> StringArray {
1641        StringArray::from(vals.to_vec())
1642    }
1643
1644    fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
1645        ScalarFunctionArgs {
1646            args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1647            arg_fields: vec![],
1648            number_rows: 0,
1649            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1650            config_options: Arc::new(ConfigOptions::default()),
1651        }
1652    }
1653
1654    fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
1655        ScalarFunctionArgs {
1656            args: vec![ColumnarValue::Array(a)],
1657            arg_fields: vec![],
1658            number_rows: 0,
1659            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1660            config_options: Arc::new(ConfigOptions::default()),
1661        }
1662    }
1663
1664    // ── jsonb_get tests ──────────────────────────────────────
1665
1666    #[test]
1667    fn test_jsonb_get_object_field() {
1668        let udf = JsonbGet::new();
1669        let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice", "age": 30})]);
1670        let keys = make_string_array(&["name"]);
1671        let result = udf
1672            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1673            .unwrap();
1674
1675        let ColumnarValue::Array(arr) = result else {
1676            panic!("expected array")
1677        };
1678        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1679        assert!(!bin.is_null(0));
1680        let val = bin.value(0);
1681        assert_eq!(json_types::jsonb_to_text(val), Some("Alice".to_owned()));
1682    }
1683
1684    #[test]
1685    fn test_jsonb_get_missing_key() {
1686        let udf = JsonbGet::new();
1687        let jsonb = make_jsonb_array(&[serde_json::json!({"a": 1})]);
1688        let keys = make_string_array(&["missing"]);
1689        let result = udf
1690            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1691            .unwrap();
1692        let ColumnarValue::Array(arr) = result else {
1693            panic!("expected array")
1694        };
1695        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1696        assert!(bin.is_null(0));
1697    }
1698
1699    // ── jsonb_get_idx tests ──────────────────────────────────
1700
1701    #[test]
1702    fn test_jsonb_get_idx() {
1703        let udf = JsonbGetIdx::new();
1704        let jsonb = make_jsonb_array(&[serde_json::json!([10, 20, 30])]);
1705        let idxs = arrow_array::Int32Array::from(vec![1]);
1706        let result = udf
1707            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(idxs)))
1708            .unwrap();
1709        let ColumnarValue::Array(arr) = result else {
1710            panic!("expected array")
1711        };
1712        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1713        let val = bin.value(0);
1714        assert_eq!(json_types::jsonb_to_text(val), Some("20".to_owned()));
1715    }
1716
1717    #[test]
1718    fn test_jsonb_get_idx_out_of_bounds() {
1719        let udf = JsonbGetIdx::new();
1720        let jsonb = make_jsonb_array(&[serde_json::json!([1, 2, 3])]);
1721        let idxs = arrow_array::Int32Array::from(vec![10]);
1722        let result = udf
1723            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(idxs)))
1724            .unwrap();
1725        let ColumnarValue::Array(arr) = result else {
1726            panic!("expected array")
1727        };
1728        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1729        assert!(bin.is_null(0));
1730    }
1731
1732    // ── jsonb_get_text tests ─────────────────────────────────
1733
1734    #[test]
1735    fn test_jsonb_get_text_string() {
1736        let udf = JsonbGetText::new();
1737        let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice"})]);
1738        let keys = make_string_array(&["name"]);
1739        let result = udf
1740            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1741            .unwrap();
1742        let ColumnarValue::Array(arr) = result else {
1743            panic!("expected array")
1744        };
1745        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1746        assert_eq!(str_arr.value(0), "Alice");
1747    }
1748
1749    #[test]
1750    fn test_jsonb_get_text_number() {
1751        let udf = JsonbGetText::new();
1752        let jsonb = make_jsonb_array(&[serde_json::json!({"age": 30})]);
1753        let keys = make_string_array(&["age"]);
1754        let result = udf
1755            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1756            .unwrap();
1757        let ColumnarValue::Array(arr) = result else {
1758            panic!("expected array")
1759        };
1760        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1761        assert_eq!(str_arr.value(0), "30");
1762    }
1763
1764    // ── jsonb_get_text_idx tests ─────────────────────────────
1765
1766    #[test]
1767    fn test_jsonb_get_text_idx() {
1768        let udf = JsonbGetTextIdx::new();
1769        let jsonb = make_jsonb_array(&[serde_json::json!([10, 20, 30])]);
1770        let idxs = arrow_array::Int32Array::from(vec![2]);
1771        let result = udf
1772            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(idxs)))
1773            .unwrap();
1774        let ColumnarValue::Array(arr) = result else {
1775            panic!("expected array")
1776        };
1777        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1778        assert_eq!(str_arr.value(0), "30");
1779    }
1780
1781    // ── jsonb_exists tests ───────────────────────────────────
1782
1783    #[test]
1784    fn test_jsonb_exists_true() {
1785        let udf = JsonbExists::new();
1786        let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice", "age": 30})]);
1787        let keys = make_string_array(&["name"]);
1788        let result = udf
1789            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1790            .unwrap();
1791        let ColumnarValue::Array(arr) = result else {
1792            panic!("expected array")
1793        };
1794        let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1795        assert!(bool_arr.value(0));
1796    }
1797
1798    #[test]
1799    fn test_jsonb_exists_false() {
1800        let udf = JsonbExists::new();
1801        let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice"})]);
1802        let keys = make_string_array(&["missing"]);
1803        let result = udf
1804            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1805            .unwrap();
1806        let ColumnarValue::Array(arr) = result else {
1807            panic!("expected array")
1808        };
1809        let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1810        assert!(!bool_arr.value(0));
1811    }
1812
1813    // ── jsonb_contains tests ─────────────────────────────────
1814
1815    #[test]
1816    fn test_jsonb_contains_true() {
1817        let udf = JsonbContains::new();
1818        let left = make_jsonb_array(&[serde_json::json!({"a": 1, "b": 2, "c": 3})]);
1819        let right = make_jsonb_array(&[serde_json::json!({"a": 1, "c": 3})]);
1820        let result = udf
1821            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1822            .unwrap();
1823        let ColumnarValue::Array(arr) = result else {
1824            panic!("expected array")
1825        };
1826        let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1827        assert!(bool_arr.value(0));
1828    }
1829
1830    #[test]
1831    fn test_jsonb_contains_false() {
1832        let udf = JsonbContains::new();
1833        let left = make_jsonb_array(&[serde_json::json!({"a": 1})]);
1834        let right = make_jsonb_array(&[serde_json::json!({"a": 1, "b": 2})]);
1835        let result = udf
1836            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1837            .unwrap();
1838        let ColumnarValue::Array(arr) = result else {
1839            panic!("expected array")
1840        };
1841        let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1842        assert!(!bool_arr.value(0));
1843    }
1844
1845    // ── jsonb_contained_by tests ─────────────────────────────
1846
1847    #[test]
1848    fn test_jsonb_contained_by() {
1849        let udf = JsonbContainedBy::new();
1850        let left = make_jsonb_array(&[serde_json::json!({"a": 1})]);
1851        let right = make_jsonb_array(&[serde_json::json!({"a": 1, "b": 2})]);
1852        let result = udf
1853            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1854            .unwrap();
1855        let ColumnarValue::Array(arr) = result else {
1856            panic!("expected array")
1857        };
1858        let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1859        assert!(bool_arr.value(0));
1860    }
1861
1862    // ── json_typeof tests ────────────────────────────────────
1863
1864    #[test]
1865    fn test_json_typeof_all_types() {
1866        let udf = JsonTypeof::new();
1867        let jsonb = make_jsonb_array(&[
1868            serde_json::json!({"a": 1}),
1869            serde_json::json!([1, 2]),
1870            serde_json::json!("hello"),
1871            serde_json::json!(42),
1872            serde_json::json!(true),
1873            serde_json::json!(null),
1874        ]);
1875        let result = udf.invoke_with_args(make_args_1(Arc::new(jsonb))).unwrap();
1876        let ColumnarValue::Array(arr) = result else {
1877            panic!("expected array")
1878        };
1879        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1880        assert_eq!(str_arr.value(0), "object");
1881        assert_eq!(str_arr.value(1), "array");
1882        assert_eq!(str_arr.value(2), "string");
1883        assert_eq!(str_arr.value(3), "number");
1884        assert_eq!(str_arr.value(4), "boolean");
1885        assert_eq!(str_arr.value(5), "null");
1886    }
1887
1888    // ── json_build_object tests ──────────────────────────────
1889
1890    #[test]
1891    fn test_json_build_object() {
1892        let udf = JsonBuildObject::new();
1893        let keys = Arc::new(make_string_array(&["name"])) as ArrayRef;
1894        let vals = Arc::new(make_string_array(&["Alice"])) as ArrayRef;
1895        let args = ScalarFunctionArgs {
1896            args: vec![ColumnarValue::Array(keys), ColumnarValue::Array(vals)],
1897            arg_fields: vec![],
1898            number_rows: 0,
1899            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1900            config_options: Arc::new(ConfigOptions::default()),
1901        };
1902        let result = udf.invoke_with_args(args).unwrap();
1903        let ColumnarValue::Array(arr) = result else {
1904            panic!("expected array")
1905        };
1906        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1907        let val = bin.value(0);
1908        // Verify we can read back the field
1909        let name = json_types::jsonb_get_field(val, "name").unwrap();
1910        assert_eq!(json_types::jsonb_to_text(name), Some("Alice".to_owned()));
1911    }
1912
1913    #[test]
1914    fn test_json_build_object_odd_args() {
1915        let udf = JsonBuildObject::new();
1916        let a = Arc::new(make_string_array(&["key"])) as ArrayRef;
1917        let args = ScalarFunctionArgs {
1918            args: vec![ColumnarValue::Array(a)],
1919            arg_fields: vec![],
1920            number_rows: 0,
1921            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1922            config_options: Arc::new(ConfigOptions::default()),
1923        };
1924        assert!(udf.invoke_with_args(args).is_err());
1925    }
1926
1927    // ── json_build_array tests ───────────────────────────────
1928
1929    #[test]
1930    fn test_json_build_array() {
1931        let udf = JsonBuildArray::new();
1932        let a = Arc::new(arrow_array::Int64Array::from(vec![1])) as ArrayRef;
1933        let b = Arc::new(make_string_array(&["two"])) as ArrayRef;
1934        let args = ScalarFunctionArgs {
1935            args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1936            arg_fields: vec![],
1937            number_rows: 0,
1938            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1939            config_options: Arc::new(ConfigOptions::default()),
1940        };
1941        let result = udf.invoke_with_args(args).unwrap();
1942        let ColumnarValue::Array(arr) = result else {
1943            panic!("expected array")
1944        };
1945        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1946        let val = bin.value(0);
1947        assert_eq!(json_types::jsonb_type_name(val), Some("array"));
1948        let elem0 = json_types::jsonb_array_get(val, 0).unwrap();
1949        assert_eq!(json_types::jsonb_to_text(elem0), Some("1".to_owned()));
1950        let elem1 = json_types::jsonb_array_get(val, 1).unwrap();
1951        assert_eq!(json_types::jsonb_to_text(elem1), Some("two".to_owned()));
1952    }
1953
1954    // ── to_jsonb tests ───────────────────────────────────────
1955
1956    #[test]
1957    fn test_to_jsonb_int() {
1958        let udf = ToJsonb::new();
1959        let a = Arc::new(arrow_array::Int64Array::from(vec![42])) as ArrayRef;
1960        let result = udf.invoke_with_args(make_args_1(a)).unwrap();
1961        let ColumnarValue::Array(arr) = result else {
1962            panic!("expected array")
1963        };
1964        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1965        let val = bin.value(0);
1966        assert_eq!(json_types::jsonb_to_text(val), Some("42".to_owned()));
1967    }
1968
1969    #[test]
1970    fn test_to_jsonb_string() {
1971        let udf = ToJsonb::new();
1972        let a = Arc::new(make_string_array(&["hello"])) as ArrayRef;
1973        let result = udf.invoke_with_args(make_args_1(a)).unwrap();
1974        let ColumnarValue::Array(arr) = result else {
1975            panic!("expected array")
1976        };
1977        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1978        let val = bin.value(0);
1979        assert_eq!(json_types::jsonb_to_text(val), Some("hello".to_owned()));
1980    }
1981
1982    // ── Registration tests ───────────────────────────────────
1983
1984    #[test]
1985    fn test_all_udfs_register() {
1986        let names = [
1987            ScalarUDF::new_from_impl(JsonbGet::new()).name().to_owned(),
1988            ScalarUDF::new_from_impl(JsonbGetIdx::new())
1989                .name()
1990                .to_owned(),
1991            ScalarUDF::new_from_impl(JsonbGetText::new())
1992                .name()
1993                .to_owned(),
1994            ScalarUDF::new_from_impl(JsonbGetTextIdx::new())
1995                .name()
1996                .to_owned(),
1997            ScalarUDF::new_from_impl(JsonbExists::new())
1998                .name()
1999                .to_owned(),
2000            ScalarUDF::new_from_impl(JsonbContains::new())
2001                .name()
2002                .to_owned(),
2003            ScalarUDF::new_from_impl(JsonbContainedBy::new())
2004                .name()
2005                .to_owned(),
2006            ScalarUDF::new_from_impl(JsonTypeof::new())
2007                .name()
2008                .to_owned(),
2009            ScalarUDF::new_from_impl(JsonBuildObject::new())
2010                .name()
2011                .to_owned(),
2012            ScalarUDF::new_from_impl(JsonBuildArray::new())
2013                .name()
2014                .to_owned(),
2015            ScalarUDF::new_from_impl(ToJsonb::new()).name().to_owned(),
2016        ];
2017        for name in &names {
2018            assert!(!name.is_empty(), "UDF has empty name");
2019        }
2020        assert_eq!(names.len(), 11);
2021    }
2022
2023    // ── Nested extraction tests ──────────────────────────────
2024
2025    #[test]
2026    fn test_nested_extraction() {
2027        // payload -> 'user' -> 'address' ->> 'city'
2028        let data = serde_json::json!({
2029            "user": {"address": {"city": "London"}}
2030        });
2031        let jsonb_bytes = enc(&data);
2032
2033        // First: get 'user'
2034        let user = json_types::jsonb_get_field(&jsonb_bytes, "user").unwrap();
2035        // Then: get 'address'
2036        let addr = json_types::jsonb_get_field(user, "address").unwrap();
2037        // Then: get_text 'city'
2038        let city = json_types::jsonb_to_text(json_types::jsonb_get_field(addr, "city").unwrap());
2039        assert_eq!(city, Some("London".to_owned()));
2040    }
2041
2042    // ── Multiple rows tests ──────────────────────────────────
2043
2044    #[test]
2045    fn test_jsonb_get_multiple_rows() {
2046        let udf = JsonbGet::new();
2047        let jsonb = make_jsonb_array(&[
2048            serde_json::json!({"name": "Alice"}),
2049            serde_json::json!({"name": "Bob"}),
2050            serde_json::json!({"age": 30}),
2051        ]);
2052        let keys = make_string_array(&["name", "name", "name"]);
2053        let result = udf
2054            .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
2055            .unwrap();
2056        let ColumnarValue::Array(arr) = result else {
2057            panic!("expected array")
2058        };
2059        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
2060        assert!(!bin.is_null(0)); // Alice
2061        assert!(!bin.is_null(1)); // Bob
2062        assert!(bin.is_null(2)); // no "name" field
2063    }
2064}