1use std::any::Any;
13#[allow(clippy::disallowed_types)] use std::collections::HashSet;
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17
18use arrow::datatypes::DataType;
19use arrow_array::{
20 builder::{LargeBinaryBuilder, StringBuilder},
21 Array, LargeBinaryArray, ListArray, MapArray, StringArray,
22};
23use datafusion_common::Result;
24use datafusion_expr::{
25 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
26};
27
28use super::json_types;
29use super::json_udf::expand_args;
30
31const MAX_DEPTH: usize = 64;
33
34#[derive(Debug)]
43pub struct JsonbMerge {
44 signature: Signature,
45}
46
47impl JsonbMerge {
48 #[must_use]
50 pub fn new() -> Self {
51 Self {
52 signature: Signature::new(
53 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
54 Volatility::Immutable,
55 ),
56 }
57 }
58}
59
60impl Default for JsonbMerge {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl PartialEq for JsonbMerge {
67 fn eq(&self, _other: &Self) -> bool {
68 true
69 }
70}
71
72impl Eq for JsonbMerge {}
73
74impl Hash for JsonbMerge {
75 fn hash<H: Hasher>(&self, state: &mut H) {
76 "jsonb_merge".hash(state);
77 }
78}
79
80impl ScalarUDFImpl for JsonbMerge {
81 fn as_any(&self) -> &dyn Any {
82 self
83 }
84
85 fn name(&self) -> &'static str {
86 "jsonb_merge"
87 }
88
89 fn signature(&self) -> &Signature {
90 &self.signature
91 }
92
93 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
94 Ok(DataType::LargeBinary)
95 }
96
97 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
98 let expanded = expand_args(&args.args)?;
99 let left_arr = expanded[0]
100 .as_any()
101 .downcast_ref::<LargeBinaryArray>()
102 .ok_or_else(|| {
103 datafusion_common::DataFusionError::Internal(
104 "jsonb_merge: first arg must be LargeBinary".into(),
105 )
106 })?;
107 let right_arr = expanded[1]
108 .as_any()
109 .downcast_ref::<LargeBinaryArray>()
110 .ok_or_else(|| {
111 datafusion_common::DataFusionError::Internal(
112 "jsonb_merge: second arg must be LargeBinary".into(),
113 )
114 })?;
115
116 let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
117 for i in 0..left_arr.len() {
118 if left_arr.is_null(i) || right_arr.is_null(i) {
119 builder.append_null();
120 } else {
121 let left_val = json_types::jsonb_to_value(left_arr.value(i));
122 let right_val = json_types::jsonb_to_value(right_arr.value(i));
123 match (left_val, right_val) {
124 (
125 Some(serde_json::Value::Object(mut l)),
126 Some(serde_json::Value::Object(r)),
127 ) => {
128 for (k, v) in r {
129 l.insert(k, v);
130 }
131 builder
132 .append_value(json_types::encode_jsonb(&serde_json::Value::Object(l)));
133 }
134 (_, Some(r)) => {
135 builder.append_value(json_types::encode_jsonb(&r));
136 }
137 _ => builder.append_null(),
138 }
139 }
140 }
141 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
142 }
143}
144
145#[derive(Debug)]
154pub struct JsonbDeepMerge {
155 signature: Signature,
156}
157
158impl JsonbDeepMerge {
159 #[must_use]
161 pub fn new() -> Self {
162 Self {
163 signature: Signature::new(
164 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
165 Volatility::Immutable,
166 ),
167 }
168 }
169}
170
171impl Default for JsonbDeepMerge {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177impl PartialEq for JsonbDeepMerge {
178 fn eq(&self, _other: &Self) -> bool {
179 true
180 }
181}
182
183impl Eq for JsonbDeepMerge {}
184
185impl Hash for JsonbDeepMerge {
186 fn hash<H: Hasher>(&self, state: &mut H) {
187 "jsonb_deep_merge".hash(state);
188 }
189}
190
191fn deep_merge(
192 left: serde_json::Value,
193 right: serde_json::Value,
194 depth: usize,
195) -> std::result::Result<serde_json::Value, String> {
196 if depth > MAX_DEPTH {
197 return Err("jsonb_deep_merge: max depth exceeded".into());
198 }
199 match (left, right) {
200 (serde_json::Value::Object(mut l), serde_json::Value::Object(r)) => {
201 for (k, rv) in r {
202 let merged = if let Some(lv) = l.remove(&k) {
203 deep_merge(lv, rv, depth + 1)?
204 } else {
205 rv
206 };
207 l.insert(k, merged);
208 }
209 Ok(serde_json::Value::Object(l))
210 }
211 (_, r) => Ok(r),
212 }
213}
214
215impl ScalarUDFImpl for JsonbDeepMerge {
216 fn as_any(&self) -> &dyn Any {
217 self
218 }
219
220 fn name(&self) -> &'static str {
221 "jsonb_deep_merge"
222 }
223
224 fn signature(&self) -> &Signature {
225 &self.signature
226 }
227
228 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
229 Ok(DataType::LargeBinary)
230 }
231
232 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
233 let expanded = expand_args(&args.args)?;
234 let left_arr = expanded[0]
235 .as_any()
236 .downcast_ref::<LargeBinaryArray>()
237 .ok_or_else(|| {
238 datafusion_common::DataFusionError::Internal(
239 "jsonb_deep_merge: first arg must be LargeBinary".into(),
240 )
241 })?;
242 let right_arr = expanded[1]
243 .as_any()
244 .downcast_ref::<LargeBinaryArray>()
245 .ok_or_else(|| {
246 datafusion_common::DataFusionError::Internal(
247 "jsonb_deep_merge: second arg must be LargeBinary".into(),
248 )
249 })?;
250
251 let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
252 for i in 0..left_arr.len() {
253 if left_arr.is_null(i) || right_arr.is_null(i) {
254 builder.append_null();
255 } else {
256 let left_val = json_types::jsonb_to_value(left_arr.value(i));
257 let right_val = json_types::jsonb_to_value(right_arr.value(i));
258 match (left_val, right_val) {
259 (Some(l), Some(r)) => {
260 let merged = deep_merge(l, r, 0)
261 .map_err(datafusion_common::DataFusionError::Execution)?;
262 builder.append_value(json_types::encode_jsonb(&merged));
263 }
264 _ => builder.append_null(),
265 }
266 }
267 }
268 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
269 }
270}
271
272#[derive(Debug)]
281pub struct JsonbStripNulls {
282 signature: Signature,
283}
284
285impl JsonbStripNulls {
286 #[must_use]
288 pub fn new() -> Self {
289 Self {
290 signature: Signature::new(
291 TypeSignature::Exact(vec![DataType::LargeBinary]),
292 Volatility::Immutable,
293 ),
294 }
295 }
296}
297
298impl Default for JsonbStripNulls {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304impl PartialEq for JsonbStripNulls {
305 fn eq(&self, _other: &Self) -> bool {
306 true
307 }
308}
309
310impl Eq for JsonbStripNulls {}
311
312impl Hash for JsonbStripNulls {
313 fn hash<H: Hasher>(&self, state: &mut H) {
314 "jsonb_strip_nulls".hash(state);
315 }
316}
317
318fn strip_nulls(val: serde_json::Value) -> serde_json::Value {
319 match val {
320 serde_json::Value::Object(obj) => {
321 let filtered: serde_json::Map<String, serde_json::Value> = obj
322 .into_iter()
323 .filter(|(_, v)| !v.is_null())
324 .map(|(k, v)| (k, strip_nulls(v)))
325 .collect();
326 serde_json::Value::Object(filtered)
327 }
328 serde_json::Value::Array(arr) => {
329 serde_json::Value::Array(arr.into_iter().map(strip_nulls).collect())
330 }
331 other => other,
332 }
333}
334
335impl ScalarUDFImpl for JsonbStripNulls {
336 fn as_any(&self) -> &dyn Any {
337 self
338 }
339
340 fn name(&self) -> &'static str {
341 "jsonb_strip_nulls"
342 }
343
344 fn signature(&self) -> &Signature {
345 &self.signature
346 }
347
348 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
349 Ok(DataType::LargeBinary)
350 }
351
352 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
353 let expanded = expand_args(&args.args)?;
354 let jsonb_arr = expanded[0]
355 .as_any()
356 .downcast_ref::<LargeBinaryArray>()
357 .ok_or_else(|| {
358 datafusion_common::DataFusionError::Internal(
359 "jsonb_strip_nulls: arg must be LargeBinary".into(),
360 )
361 })?;
362
363 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
364 for i in 0..jsonb_arr.len() {
365 if jsonb_arr.is_null(i) {
366 builder.append_null();
367 } else {
368 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
369 Some(val) => {
370 builder.append_value(json_types::encode_jsonb(&strip_nulls(val)));
371 }
372 None => builder.append_null(),
373 }
374 }
375 }
376 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
377 }
378}
379
380#[derive(Debug)]
389pub struct JsonbRenameKeys {
390 signature: Signature,
391}
392
393impl JsonbRenameKeys {
394 #[must_use]
396 pub fn new() -> Self {
397 Self {
398 signature: Signature::new(
399 TypeSignature::Exact(vec![
400 DataType::LargeBinary,
401 DataType::Map(
402 Arc::new(arrow_schema::Field::new(
403 "entries",
404 DataType::Struct(
405 vec![
406 arrow_schema::Field::new("key", DataType::Utf8, false),
407 arrow_schema::Field::new("value", DataType::Utf8, true),
408 ]
409 .into(),
410 ),
411 false,
412 )),
413 false,
414 ),
415 ]),
416 Volatility::Immutable,
417 ),
418 }
419 }
420}
421
422impl Default for JsonbRenameKeys {
423 fn default() -> Self {
424 Self::new()
425 }
426}
427
428impl PartialEq for JsonbRenameKeys {
429 fn eq(&self, _other: &Self) -> bool {
430 true
431 }
432}
433
434impl Eq for JsonbRenameKeys {}
435
436impl Hash for JsonbRenameKeys {
437 fn hash<H: Hasher>(&self, state: &mut H) {
438 "jsonb_rename_keys".hash(state);
439 }
440}
441
442impl ScalarUDFImpl for JsonbRenameKeys {
443 fn as_any(&self) -> &dyn Any {
444 self
445 }
446
447 fn name(&self) -> &'static str {
448 "jsonb_rename_keys"
449 }
450
451 fn signature(&self) -> &Signature {
452 &self.signature
453 }
454
455 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
456 Ok(DataType::LargeBinary)
457 }
458
459 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
460 let expanded = expand_args(&args.args)?;
461 let jsonb_arr = expanded[0]
462 .as_any()
463 .downcast_ref::<LargeBinaryArray>()
464 .ok_or_else(|| {
465 datafusion_common::DataFusionError::Internal(
466 "jsonb_rename_keys: first arg must be LargeBinary".into(),
467 )
468 })?;
469 let map_arr = expanded[1]
470 .as_any()
471 .downcast_ref::<MapArray>()
472 .ok_or_else(|| {
473 datafusion_common::DataFusionError::Internal(
474 "jsonb_rename_keys: second arg must be Map<Utf8,Utf8>".into(),
475 )
476 })?;
477
478 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
479 for i in 0..jsonb_arr.len() {
480 if jsonb_arr.is_null(i) || map_arr.is_null(i) {
481 builder.append_null();
482 } else {
483 let rename_map = extract_string_map(map_arr, i);
484 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
485 Some(serde_json::Value::Object(obj)) => {
486 let renamed: serde_json::Map<String, serde_json::Value> = obj
487 .into_iter()
488 .map(|(k, v)| {
489 let new_key = rename_map.get(k.as_str()).cloned().unwrap_or(k);
490 (new_key, v)
491 })
492 .collect();
493 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
494 renamed,
495 )));
496 }
497 Some(other) => {
498 builder.append_value(json_types::encode_jsonb(&other));
499 }
500 None => builder.append_null(),
501 }
502 }
503 }
504 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
505 }
506}
507
508#[allow(clippy::disallowed_types)] fn extract_string_map(map_arr: &MapArray, row: usize) -> std::collections::HashMap<String, String> {
511 let mut result = std::collections::HashMap::new();
512 let entries = map_arr.value(row);
513 let struct_arr = entries
514 .as_any()
515 .downcast_ref::<arrow_array::StructArray>()
516 .unwrap();
517 let keys = struct_arr
518 .column(0)
519 .as_any()
520 .downcast_ref::<StringArray>()
521 .unwrap();
522 let vals = struct_arr
523 .column(1)
524 .as_any()
525 .downcast_ref::<StringArray>()
526 .unwrap();
527 for j in 0..keys.len() {
528 if !keys.is_null(j) && !vals.is_null(j) {
529 result.insert(keys.value(j).to_owned(), vals.value(j).to_owned());
530 }
531 }
532 result
533}
534
535#[derive(Debug)]
543pub struct JsonbPick {
544 signature: Signature,
545}
546
547impl JsonbPick {
548 #[must_use]
550 pub fn new() -> Self {
551 Self {
552 signature: Signature::new(
553 TypeSignature::Exact(vec![
554 DataType::LargeBinary,
555 DataType::List(Arc::new(arrow_schema::Field::new(
556 "item",
557 DataType::Utf8,
558 true,
559 ))),
560 ]),
561 Volatility::Immutable,
562 ),
563 }
564 }
565}
566
567impl Default for JsonbPick {
568 fn default() -> Self {
569 Self::new()
570 }
571}
572
573impl PartialEq for JsonbPick {
574 fn eq(&self, _other: &Self) -> bool {
575 true
576 }
577}
578
579impl Eq for JsonbPick {}
580
581impl Hash for JsonbPick {
582 fn hash<H: Hasher>(&self, state: &mut H) {
583 "jsonb_pick".hash(state);
584 }
585}
586
587impl ScalarUDFImpl for JsonbPick {
588 fn as_any(&self) -> &dyn Any {
589 self
590 }
591
592 fn name(&self) -> &'static str {
593 "jsonb_pick"
594 }
595
596 fn signature(&self) -> &Signature {
597 &self.signature
598 }
599
600 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
601 Ok(DataType::LargeBinary)
602 }
603
604 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
605 let expanded = expand_args(&args.args)?;
606 let jsonb_arr = expanded[0]
607 .as_any()
608 .downcast_ref::<LargeBinaryArray>()
609 .ok_or_else(|| {
610 datafusion_common::DataFusionError::Internal(
611 "jsonb_pick: first arg must be LargeBinary".into(),
612 )
613 })?;
614 let keys_arr = expanded[1]
615 .as_any()
616 .downcast_ref::<ListArray>()
617 .ok_or_else(|| {
618 datafusion_common::DataFusionError::Internal(
619 "jsonb_pick: second arg must be List<Utf8>".into(),
620 )
621 })?;
622
623 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
624 for i in 0..jsonb_arr.len() {
625 if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
626 builder.append_null();
627 } else {
628 let key_set = extract_string_set(keys_arr, i);
629 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
630 Some(serde_json::Value::Object(obj)) => {
631 let picked: serde_json::Map<String, serde_json::Value> = obj
632 .into_iter()
633 .filter(|(k, _)| key_set.contains(k.as_str()))
634 .collect();
635 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
636 picked,
637 )));
638 }
639 Some(other) => {
640 builder.append_value(json_types::encode_jsonb(&other));
641 }
642 None => builder.append_null(),
643 }
644 }
645 }
646 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
647 }
648}
649
650fn extract_string_set(list_arr: &ListArray, row: usize) -> HashSet<String> {
652 let values = list_arr.value(row);
653 let str_arr = values.as_any().downcast_ref::<StringArray>();
654 let mut result = HashSet::new();
655 if let Some(arr) = str_arr {
656 for j in 0..arr.len() {
657 if !arr.is_null(j) {
658 result.insert(arr.value(j).to_owned());
659 }
660 }
661 }
662 result
663}
664
665#[derive(Debug)]
673pub struct JsonbExcept {
674 signature: Signature,
675}
676
677impl JsonbExcept {
678 #[must_use]
680 pub fn new() -> Self {
681 Self {
682 signature: Signature::new(
683 TypeSignature::Exact(vec![
684 DataType::LargeBinary,
685 DataType::List(Arc::new(arrow_schema::Field::new(
686 "item",
687 DataType::Utf8,
688 true,
689 ))),
690 ]),
691 Volatility::Immutable,
692 ),
693 }
694 }
695}
696
697impl Default for JsonbExcept {
698 fn default() -> Self {
699 Self::new()
700 }
701}
702
703impl PartialEq for JsonbExcept {
704 fn eq(&self, _other: &Self) -> bool {
705 true
706 }
707}
708
709impl Eq for JsonbExcept {}
710
711impl Hash for JsonbExcept {
712 fn hash<H: Hasher>(&self, state: &mut H) {
713 "jsonb_except".hash(state);
714 }
715}
716
717impl ScalarUDFImpl for JsonbExcept {
718 fn as_any(&self) -> &dyn Any {
719 self
720 }
721
722 fn name(&self) -> &'static str {
723 "jsonb_except"
724 }
725
726 fn signature(&self) -> &Signature {
727 &self.signature
728 }
729
730 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
731 Ok(DataType::LargeBinary)
732 }
733
734 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
735 let expanded = expand_args(&args.args)?;
736 let jsonb_arr = expanded[0]
737 .as_any()
738 .downcast_ref::<LargeBinaryArray>()
739 .ok_or_else(|| {
740 datafusion_common::DataFusionError::Internal(
741 "jsonb_except: first arg must be LargeBinary".into(),
742 )
743 })?;
744 let keys_arr = expanded[1]
745 .as_any()
746 .downcast_ref::<ListArray>()
747 .ok_or_else(|| {
748 datafusion_common::DataFusionError::Internal(
749 "jsonb_except: second arg must be List<Utf8>".into(),
750 )
751 })?;
752
753 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
754 for i in 0..jsonb_arr.len() {
755 if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
756 builder.append_null();
757 } else {
758 let exclude_set = extract_string_set(keys_arr, i);
759 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
760 Some(serde_json::Value::Object(obj)) => {
761 let filtered: serde_json::Map<String, serde_json::Value> = obj
762 .into_iter()
763 .filter(|(k, _)| !exclude_set.contains(k.as_str()))
764 .collect();
765 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
766 filtered,
767 )));
768 }
769 Some(other) => {
770 builder.append_value(json_types::encode_jsonb(&other));
771 }
772 None => builder.append_null(),
773 }
774 }
775 }
776 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
777 }
778}
779
780#[derive(Debug)]
789pub struct JsonbFlatten {
790 signature: Signature,
791}
792
793impl JsonbFlatten {
794 #[must_use]
796 pub fn new() -> Self {
797 Self {
798 signature: Signature::new(
799 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
800 Volatility::Immutable,
801 ),
802 }
803 }
804}
805
806impl Default for JsonbFlatten {
807 fn default() -> Self {
808 Self::new()
809 }
810}
811
812impl PartialEq for JsonbFlatten {
813 fn eq(&self, _other: &Self) -> bool {
814 true
815 }
816}
817
818impl Eq for JsonbFlatten {}
819
820impl Hash for JsonbFlatten {
821 fn hash<H: Hasher>(&self, state: &mut H) {
822 "jsonb_flatten".hash(state);
823 }
824}
825
826fn flatten_value(
827 val: &serde_json::Value,
828 prefix: &str,
829 sep: &str,
830 out: &mut serde_json::Map<String, serde_json::Value>,
831 depth: usize,
832) -> std::result::Result<(), String> {
833 if depth > MAX_DEPTH {
834 return Err("jsonb_flatten: max depth exceeded".into());
835 }
836 match val {
837 serde_json::Value::Object(obj) => {
838 for (k, v) in obj {
839 let new_key = if prefix.is_empty() {
840 k.clone()
841 } else {
842 format!("{prefix}{sep}{k}")
843 };
844 flatten_value(v, &new_key, sep, out, depth + 1)?;
845 }
846 }
847 serde_json::Value::Array(arr) => {
848 for (idx, v) in arr.iter().enumerate() {
849 let new_key = if prefix.is_empty() {
850 idx.to_string()
851 } else {
852 format!("{prefix}{sep}{idx}")
853 };
854 flatten_value(v, &new_key, sep, out, depth + 1)?;
855 }
856 }
857 _ => {
858 out.insert(prefix.to_owned(), val.clone());
859 }
860 }
861 Ok(())
862}
863
864impl ScalarUDFImpl for JsonbFlatten {
865 fn as_any(&self) -> &dyn Any {
866 self
867 }
868
869 fn name(&self) -> &'static str {
870 "jsonb_flatten"
871 }
872
873 fn signature(&self) -> &Signature {
874 &self.signature
875 }
876
877 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
878 Ok(DataType::LargeBinary)
879 }
880
881 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
882 let expanded = expand_args(&args.args)?;
883 let jsonb_arr = expanded[0]
884 .as_any()
885 .downcast_ref::<LargeBinaryArray>()
886 .ok_or_else(|| {
887 datafusion_common::DataFusionError::Internal(
888 "jsonb_flatten: first arg must be LargeBinary".into(),
889 )
890 })?;
891 let sep_arr = expanded[1]
892 .as_any()
893 .downcast_ref::<StringArray>()
894 .ok_or_else(|| {
895 datafusion_common::DataFusionError::Internal(
896 "jsonb_flatten: second arg must be Utf8".into(),
897 )
898 })?;
899
900 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
901 for i in 0..jsonb_arr.len() {
902 if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
903 builder.append_null();
904 } else {
905 let sep = sep_arr.value(i);
906 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
907 Some(val) => {
908 let mut flat = serde_json::Map::new();
909 flatten_value(&val, "", sep, &mut flat, 0)
910 .map_err(datafusion_common::DataFusionError::Execution)?;
911 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
912 flat,
913 )));
914 }
915 None => builder.append_null(),
916 }
917 }
918 }
919 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
920 }
921}
922
923#[derive(Debug)]
933pub struct JsonbUnflatten {
934 signature: Signature,
935}
936
937impl JsonbUnflatten {
938 #[must_use]
940 pub fn new() -> Self {
941 Self {
942 signature: Signature::new(
943 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
944 Volatility::Immutable,
945 ),
946 }
947 }
948}
949
950impl Default for JsonbUnflatten {
951 fn default() -> Self {
952 Self::new()
953 }
954}
955
956impl PartialEq for JsonbUnflatten {
957 fn eq(&self, _other: &Self) -> bool {
958 true
959 }
960}
961
962impl Eq for JsonbUnflatten {}
963
964impl Hash for JsonbUnflatten {
965 fn hash<H: Hasher>(&self, state: &mut H) {
966 "jsonb_unflatten".hash(state);
967 }
968}
969
970fn unflatten_insert(root: &mut serde_json::Value, parts: &[&str], value: serde_json::Value) {
971 if parts.is_empty() {
972 return;
973 }
974 if parts.len() == 1 {
975 if let serde_json::Value::Object(obj) = root {
976 obj.insert(parts[0].to_owned(), value);
977 }
978 return;
979 }
980 if let serde_json::Value::Object(obj) = root {
981 let child = obj
982 .entry(parts[0].to_owned())
983 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
984 unflatten_insert(child, &parts[1..], value);
985 }
986}
987
988impl ScalarUDFImpl for JsonbUnflatten {
989 fn as_any(&self) -> &dyn Any {
990 self
991 }
992
993 fn name(&self) -> &'static str {
994 "jsonb_unflatten"
995 }
996
997 fn signature(&self) -> &Signature {
998 &self.signature
999 }
1000
1001 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1002 Ok(DataType::LargeBinary)
1003 }
1004
1005 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1006 let expanded = expand_args(&args.args)?;
1007 let jsonb_arr = expanded[0]
1008 .as_any()
1009 .downcast_ref::<LargeBinaryArray>()
1010 .ok_or_else(|| {
1011 datafusion_common::DataFusionError::Internal(
1012 "jsonb_unflatten: first arg must be LargeBinary".into(),
1013 )
1014 })?;
1015 let sep_arr = expanded[1]
1016 .as_any()
1017 .downcast_ref::<StringArray>()
1018 .ok_or_else(|| {
1019 datafusion_common::DataFusionError::Internal(
1020 "jsonb_unflatten: second arg must be Utf8".into(),
1021 )
1022 })?;
1023
1024 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1025 for i in 0..jsonb_arr.len() {
1026 if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
1027 builder.append_null();
1028 } else {
1029 let sep = sep_arr.value(i);
1030 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1031 Some(serde_json::Value::Object(flat)) => {
1032 let mut root = serde_json::Value::Object(serde_json::Map::new());
1033 for (key, val) in flat {
1034 let parts: Vec<&str> = key.split(sep).collect();
1035 unflatten_insert(&mut root, &parts, val);
1036 }
1037 builder.append_value(json_types::encode_jsonb(&root));
1038 }
1039 Some(other) => {
1040 builder.append_value(json_types::encode_jsonb(&other));
1041 }
1042 None => builder.append_null(),
1043 }
1044 }
1045 }
1046 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1047 }
1048}
1049
1050#[derive(Debug)]
1061pub struct JsonToColumns {
1062 signature: Signature,
1063}
1064
1065impl JsonToColumns {
1066 #[must_use]
1068 pub fn new() -> Self {
1069 Self {
1070 signature: Signature::new(
1071 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
1072 Volatility::Immutable,
1073 ),
1074 }
1075 }
1076}
1077
1078impl Default for JsonToColumns {
1079 fn default() -> Self {
1080 Self::new()
1081 }
1082}
1083
1084impl PartialEq for JsonToColumns {
1085 fn eq(&self, _other: &Self) -> bool {
1086 true
1087 }
1088}
1089
1090impl Eq for JsonToColumns {}
1091
1092impl Hash for JsonToColumns {
1093 fn hash<H: Hasher>(&self, state: &mut H) {
1094 "json_to_columns".hash(state);
1095 }
1096}
1097
1098fn parse_type_spec_fields(spec: &str) -> Vec<String> {
1101 spec.split(',')
1102 .filter_map(|part| {
1103 let trimmed = part.trim();
1104 trimmed.split_whitespace().next().map(ToOwned::to_owned)
1105 })
1106 .collect()
1107}
1108
1109impl ScalarUDFImpl for JsonToColumns {
1110 fn as_any(&self) -> &dyn Any {
1111 self
1112 }
1113
1114 fn name(&self) -> &'static str {
1115 "json_to_columns"
1116 }
1117
1118 fn signature(&self) -> &Signature {
1119 &self.signature
1120 }
1121
1122 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1123 Ok(DataType::LargeBinary)
1124 }
1125
1126 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1127 let expanded = expand_args(&args.args)?;
1128 let jsonb_arr = expanded[0]
1129 .as_any()
1130 .downcast_ref::<LargeBinaryArray>()
1131 .ok_or_else(|| {
1132 datafusion_common::DataFusionError::Internal(
1133 "json_to_columns: first arg must be LargeBinary".into(),
1134 )
1135 })?;
1136 let spec_arr = expanded[1]
1137 .as_any()
1138 .downcast_ref::<StringArray>()
1139 .ok_or_else(|| {
1140 datafusion_common::DataFusionError::Internal(
1141 "json_to_columns: second arg must be Utf8".into(),
1142 )
1143 })?;
1144
1145 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1146 for i in 0..jsonb_arr.len() {
1147 if jsonb_arr.is_null(i) || spec_arr.is_null(i) {
1148 builder.append_null();
1149 } else {
1150 let fields = parse_type_spec_fields(spec_arr.value(i));
1151 let field_set: HashSet<&str> = fields.iter().map(String::as_str).collect();
1152 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1153 Some(serde_json::Value::Object(obj)) => {
1154 let picked: serde_json::Map<String, serde_json::Value> = obj
1155 .into_iter()
1156 .filter(|(k, _)| field_set.contains(k.as_str()))
1157 .collect();
1158 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
1159 picked,
1160 )));
1161 }
1162 Some(other) => {
1163 builder.append_value(json_types::encode_jsonb(&other));
1164 }
1165 None => builder.append_null(),
1166 }
1167 }
1168 }
1169 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1170 }
1171}
1172
1173#[derive(Debug)]
1182pub struct JsonInferSchema {
1183 signature: Signature,
1184}
1185
1186impl JsonInferSchema {
1187 #[must_use]
1189 pub fn new() -> Self {
1190 Self {
1191 signature: Signature::new(
1192 TypeSignature::Exact(vec![DataType::LargeBinary]),
1193 Volatility::Immutable,
1194 ),
1195 }
1196 }
1197}
1198
1199impl Default for JsonInferSchema {
1200 fn default() -> Self {
1201 Self::new()
1202 }
1203}
1204
1205impl PartialEq for JsonInferSchema {
1206 fn eq(&self, _other: &Self) -> bool {
1207 true
1208 }
1209}
1210
1211impl Eq for JsonInferSchema {}
1212
1213impl Hash for JsonInferSchema {
1214 fn hash<H: Hasher>(&self, state: &mut H) {
1215 "json_infer_schema".hash(state);
1216 }
1217}
1218
1219fn infer_type(val: &serde_json::Value) -> String {
1220 match val {
1221 serde_json::Value::Null => "NULL".to_owned(),
1222 serde_json::Value::Bool(_) => "BOOLEAN".to_owned(),
1223 serde_json::Value::Number(n) => {
1224 if n.is_i64() || n.is_u64() {
1225 "BIGINT".to_owned()
1226 } else {
1227 "DOUBLE".to_owned()
1228 }
1229 }
1230 serde_json::Value::String(_) => "VARCHAR".to_owned(),
1231 serde_json::Value::Array(arr) => {
1232 let inner = arr.first().map_or("NULL".to_owned(), infer_type);
1233 format!("ARRAY<{inner}>")
1234 }
1235 serde_json::Value::Object(obj) => {
1236 let fields: Vec<String> = obj
1237 .iter()
1238 .map(|(k, v)| format!("{k} {}", infer_type(v)))
1239 .collect();
1240 format!("STRUCT({})", fields.join(", "))
1241 }
1242 }
1243}
1244
1245impl ScalarUDFImpl for JsonInferSchema {
1246 fn as_any(&self) -> &dyn Any {
1247 self
1248 }
1249
1250 fn name(&self) -> &'static str {
1251 "json_infer_schema"
1252 }
1253
1254 fn signature(&self) -> &Signature {
1255 &self.signature
1256 }
1257
1258 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1259 Ok(DataType::Utf8)
1260 }
1261
1262 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1263 let expanded = expand_args(&args.args)?;
1264 let jsonb_arr = expanded[0]
1265 .as_any()
1266 .downcast_ref::<LargeBinaryArray>()
1267 .ok_or_else(|| {
1268 datafusion_common::DataFusionError::Internal(
1269 "json_infer_schema: arg must be LargeBinary".into(),
1270 )
1271 })?;
1272
1273 let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
1274 for i in 0..jsonb_arr.len() {
1275 if jsonb_arr.is_null(i) {
1276 builder.append_null();
1277 } else {
1278 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1279 Some(serde_json::Value::Object(obj)) => {
1280 let schema: serde_json::Map<String, serde_json::Value> = obj
1281 .iter()
1282 .map(|(k, v)| (k.clone(), serde_json::Value::String(infer_type(v))))
1283 .collect();
1284 builder.append_value(
1285 serde_json::to_string(&serde_json::Value::Object(schema))
1286 .unwrap_or_default(),
1287 );
1288 }
1289 Some(val) => {
1290 builder.append_value(infer_type(&val));
1291 }
1292 None => builder.append_null(),
1293 }
1294 }
1295 }
1296 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1297 }
1298}
1299
1300pub fn register_json_extensions(ctx: &datafusion::prelude::SessionContext) {
1306 use datafusion_expr::ScalarUDF;
1307
1308 ctx.register_udf(ScalarUDF::new_from_impl(JsonbMerge::new()));
1309 ctx.register_udf(ScalarUDF::new_from_impl(JsonbDeepMerge::new()));
1310 ctx.register_udf(ScalarUDF::new_from_impl(JsonbStripNulls::new()));
1311 ctx.register_udf(ScalarUDF::new_from_impl(JsonbRenameKeys::new()));
1312 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPick::new()));
1313 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExcept::new()));
1314 ctx.register_udf(ScalarUDF::new_from_impl(JsonbFlatten::new()));
1315 ctx.register_udf(ScalarUDF::new_from_impl(JsonbUnflatten::new()));
1316 ctx.register_udf(ScalarUDF::new_from_impl(JsonToColumns::new()));
1317 ctx.register_udf(ScalarUDF::new_from_impl(JsonInferSchema::new()));
1318}
1319
1320#[cfg(test)]
1325mod tests {
1326 use super::*;
1327 use arrow_array::builder::{MapBuilder, StringBuilder as MapSB};
1328 use arrow_array::ArrayRef;
1329 use arrow_schema::Field;
1330 use datafusion_common::config::ConfigOptions;
1331 use serde_json::json;
1332
1333 fn enc(v: &serde_json::Value) -> Vec<u8> {
1334 json_types::encode_jsonb(v)
1335 }
1336
1337 fn make_jsonb_array(vals: &[serde_json::Value]) -> LargeBinaryArray {
1338 let encoded: Vec<Vec<u8>> = vals.iter().map(enc).collect();
1339 let refs: Vec<&[u8]> = encoded.iter().map(Vec::as_slice).collect();
1340 LargeBinaryArray::from_iter_values(refs)
1341 }
1342
1343 fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
1344 ScalarFunctionArgs {
1345 args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1346 arg_fields: vec![],
1347 number_rows: 0,
1348 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1349 config_options: Arc::new(ConfigOptions::default()),
1350 }
1351 }
1352
1353 fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
1354 ScalarFunctionArgs {
1355 args: vec![ColumnarValue::Array(a)],
1356 arg_fields: vec![],
1357 number_rows: 0,
1358 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1359 config_options: Arc::new(ConfigOptions::default()),
1360 }
1361 }
1362
1363 fn decode_jsonb_result(result: ColumnarValue, row: usize) -> serde_json::Value {
1364 let ColumnarValue::Array(arr) = result else {
1365 panic!("expected array")
1366 };
1367 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1368 assert!(!bin.is_null(row), "unexpected null at row {row}");
1369 json_types::jsonb_to_value(bin.value(row)).expect("invalid jsonb")
1370 }
1371
1372 fn decode_text_result(result: ColumnarValue, row: usize) -> String {
1373 let ColumnarValue::Array(arr) = result else {
1374 panic!("expected array")
1375 };
1376 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1377 str_arr.value(row).to_owned()
1378 }
1379
1380 #[test]
1383 fn test_json_ext_merge_objects() {
1384 let udf = JsonbMerge::new();
1385 let left = make_jsonb_array(&[json!({"a": 1, "b": 2})]);
1386 let right = make_jsonb_array(&[json!({"b": 99, "c": 3})]);
1387 let result = udf
1388 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1389 .unwrap();
1390 assert_eq!(
1391 decode_jsonb_result(result, 0),
1392 json!({"a": 1, "b": 99, "c": 3})
1393 );
1394 }
1395
1396 #[test]
1397 fn test_json_ext_merge_non_object() {
1398 let udf = JsonbMerge::new();
1399 let left = make_jsonb_array(&[json!(42)]);
1400 let right = make_jsonb_array(&[json!("hello")]);
1401 let result = udf
1402 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1403 .unwrap();
1404 assert_eq!(decode_jsonb_result(result, 0), json!("hello"));
1405 }
1406
1407 #[test]
1410 fn test_json_ext_deep_merge() {
1411 let udf = JsonbDeepMerge::new();
1412 let left = make_jsonb_array(&[json!({"a": {"x": 1, "y": 2}, "b": 10})]);
1413 let right = make_jsonb_array(&[json!({"a": {"y": 99, "z": 3}, "c": 20})]);
1414 let result = udf
1415 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1416 .unwrap();
1417 assert_eq!(
1418 decode_jsonb_result(result, 0),
1419 json!({"a": {"x": 1, "y": 99, "z": 3}, "b": 10, "c": 20})
1420 );
1421 }
1422
1423 #[test]
1424 fn test_json_ext_deep_merge_non_object_override() {
1425 let udf = JsonbDeepMerge::new();
1426 let left = make_jsonb_array(&[json!({"a": {"x": 1}})]);
1427 let right = make_jsonb_array(&[json!({"a": 42})]);
1428 let result = udf
1429 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1430 .unwrap();
1431 assert_eq!(decode_jsonb_result(result, 0), json!({"a": 42}));
1432 }
1433
1434 #[test]
1437 fn test_json_ext_strip_nulls() {
1438 let udf = JsonbStripNulls::new();
1439 let input = make_jsonb_array(&[json!({"a": 1, "b": null, "c": {"d": null, "e": 2}})]);
1440 let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1441 assert_eq!(
1442 decode_jsonb_result(result, 0),
1443 json!({"a": 1, "c": {"e": 2}})
1444 );
1445 }
1446
1447 #[test]
1448 fn test_json_ext_strip_nulls_array_preserved() {
1449 let udf = JsonbStripNulls::new();
1450 let input = make_jsonb_array(&[json!({"arr": [1, null, 3]})]);
1451 let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1452 assert_eq!(decode_jsonb_result(result, 0), json!({"arr": [1, null, 3]}));
1453 }
1454
1455 #[test]
1458 fn test_json_ext_rename_keys() {
1459 let udf = JsonbRenameKeys::new();
1460 let input = make_jsonb_array(&[json!({"old_name": 1, "keep": 2})]);
1461
1462 let key_builder = MapSB::new();
1464 let val_builder = MapSB::new();
1465 let mut map_builder = MapBuilder::new(None, key_builder, val_builder);
1466 map_builder.keys().append_value("old_name");
1467 map_builder.values().append_value("new_name");
1468 map_builder.append(true).unwrap();
1469 let map_arr = map_builder.finish();
1470
1471 let result = udf
1472 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(map_arr)))
1473 .unwrap();
1474 assert_eq!(
1475 decode_jsonb_result(result, 0),
1476 json!({"keep": 2, "new_name": 1})
1477 );
1478 }
1479
1480 #[test]
1483 fn test_json_ext_pick() {
1484 let udf = JsonbPick::new();
1485 let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1486 let keys = make_string_list(&[&["a", "c"]]);
1487 let result = udf
1488 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1489 .unwrap();
1490 assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1491 }
1492
1493 #[test]
1496 fn test_json_ext_except() {
1497 let udf = JsonbExcept::new();
1498 let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1499 let keys = make_string_list(&[&["b"]]);
1500 let result = udf
1501 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1502 .unwrap();
1503 assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1504 }
1505
1506 #[test]
1509 fn test_json_ext_flatten() {
1510 let udf = JsonbFlatten::new();
1511 let input = make_jsonb_array(&[json!({"a": {"b": 1, "c": [2, 3]}})]);
1512 let sep = StringArray::from(vec!["."]);
1513 let result = udf
1514 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1515 .unwrap();
1516 assert_eq!(
1517 decode_jsonb_result(result, 0),
1518 json!({"a.b": 1, "a.c.0": 2, "a.c.1": 3})
1519 );
1520 }
1521
1522 #[test]
1523 fn test_json_ext_flatten_custom_sep() {
1524 let udf = JsonbFlatten::new();
1525 let input = make_jsonb_array(&[json!({"x": {"y": 42}})]);
1526 let sep = StringArray::from(vec!["/"]);
1527 let result = udf
1528 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1529 .unwrap();
1530 assert_eq!(decode_jsonb_result(result, 0), json!({"x/y": 42}));
1531 }
1532
1533 #[test]
1536 fn test_json_ext_unflatten() {
1537 let udf = JsonbUnflatten::new();
1538 let input = make_jsonb_array(&[json!({"a.b": 1, "a.c": 2, "d": 3})]);
1539 let sep = StringArray::from(vec!["."]);
1540 let result = udf
1541 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1542 .unwrap();
1543 assert_eq!(
1544 decode_jsonb_result(result, 0),
1545 json!({"a": {"b": 1, "c": 2}, "d": 3})
1546 );
1547 }
1548
1549 #[test]
1552 fn test_json_ext_to_columns() {
1553 let udf = JsonToColumns::new();
1554 let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1555 let spec = StringArray::from(vec!["name VARCHAR, age BIGINT"]);
1556 let result = udf
1557 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(spec)))
1558 .unwrap();
1559 assert_eq!(
1560 decode_jsonb_result(result, 0),
1561 json!({"age": 30, "name": "Alice"})
1562 );
1563 }
1564
1565 #[test]
1568 fn test_json_ext_infer_schema_object() {
1569 let udf = JsonInferSchema::new();
1570 let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1571 let args = ScalarFunctionArgs {
1572 args: vec![ColumnarValue::Array(Arc::new(input))],
1573 arg_fields: vec![],
1574 number_rows: 0,
1575 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1576 config_options: Arc::new(ConfigOptions::default()),
1577 };
1578 let result = udf.invoke_with_args(args).unwrap();
1579 let text = decode_text_result(result, 0);
1580 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1581 assert_eq!(parsed["name"], "VARCHAR");
1582 assert_eq!(parsed["age"], "BIGINT");
1583 assert_eq!(parsed["active"], "BOOLEAN");
1584 }
1585
1586 #[test]
1587 fn test_json_ext_infer_schema_nested() {
1588 let udf = JsonInferSchema::new();
1589 let input = make_jsonb_array(&[json!({"tags": [1, 2], "meta": {"x": 1.5}})]);
1590 let args = ScalarFunctionArgs {
1591 args: vec![ColumnarValue::Array(Arc::new(input))],
1592 arg_fields: vec![],
1593 number_rows: 0,
1594 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1595 config_options: Arc::new(ConfigOptions::default()),
1596 };
1597 let result = udf.invoke_with_args(args).unwrap();
1598 let text = decode_text_result(result, 0);
1599 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1600 assert_eq!(parsed["tags"], "ARRAY<BIGINT>");
1601 assert_eq!(parsed["meta"], "STRUCT(x DOUBLE)");
1602 }
1603
1604 #[test]
1605 fn test_json_ext_infer_schema_scalar() {
1606 let udf = JsonInferSchema::new();
1607 let input = make_jsonb_array(&[json!(42)]);
1608 let args = ScalarFunctionArgs {
1609 args: vec![ColumnarValue::Array(Arc::new(input))],
1610 arg_fields: vec![],
1611 number_rows: 0,
1612 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1613 config_options: Arc::new(ConfigOptions::default()),
1614 };
1615 let result = udf.invoke_with_args(args).unwrap();
1616 assert_eq!(decode_text_result(result, 0), "BIGINT");
1617 }
1618
1619 #[test]
1622 fn test_json_ext_all_udfs_register() {
1623 use datafusion_expr::ScalarUDF;
1624
1625 let names: Vec<String> = vec![
1626 ScalarUDF::new_from_impl(JsonbMerge::new())
1627 .name()
1628 .to_owned(),
1629 ScalarUDF::new_from_impl(JsonbDeepMerge::new())
1630 .name()
1631 .to_owned(),
1632 ScalarUDF::new_from_impl(JsonbStripNulls::new())
1633 .name()
1634 .to_owned(),
1635 ScalarUDF::new_from_impl(JsonbRenameKeys::new())
1636 .name()
1637 .to_owned(),
1638 ScalarUDF::new_from_impl(JsonbPick::new()).name().to_owned(),
1639 ScalarUDF::new_from_impl(JsonbExcept::new())
1640 .name()
1641 .to_owned(),
1642 ScalarUDF::new_from_impl(JsonbFlatten::new())
1643 .name()
1644 .to_owned(),
1645 ScalarUDF::new_from_impl(JsonbUnflatten::new())
1646 .name()
1647 .to_owned(),
1648 ScalarUDF::new_from_impl(JsonToColumns::new())
1649 .name()
1650 .to_owned(),
1651 ScalarUDF::new_from_impl(JsonInferSchema::new())
1652 .name()
1653 .to_owned(),
1654 ];
1655 for name in &names {
1656 assert!(!name.is_empty(), "UDF has empty name");
1657 }
1658 assert_eq!(names.len(), 10);
1659 }
1660
1661 #[test]
1664 fn test_json_ext_merge_null_input() {
1665 let udf = JsonbMerge::new();
1666 let left = LargeBinaryArray::new_null(1);
1667 let right = make_jsonb_array(&[json!({"a": 1})]);
1668 let result = udf
1669 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1670 .unwrap();
1671 let ColumnarValue::Array(arr) = result else {
1672 panic!("expected array")
1673 };
1674 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1675 assert!(bin.is_null(0));
1676 }
1677
1678 fn make_string_list(rows: &[&[&str]]) -> ListArray {
1681 use arrow_array::builder::{ListBuilder, StringBuilder};
1682
1683 let mut builder = ListBuilder::new(StringBuilder::new());
1684 for row in rows {
1685 for &s in *row {
1686 builder.values().append_value(s);
1687 }
1688 builder.append(true);
1689 }
1690 builder.finish()
1691 }
1692}