1pub mod ai_udf;
5mod bridge;
6mod channel_source;
7#[cfg(feature = "cluster")]
10pub mod cluster_repartition;
11pub mod complex_type_lambda;
13pub mod complex_type_udf;
15#[cfg(feature = "cluster")]
18pub mod distributed_scan;
19mod exec;
20pub mod execute;
22pub mod format_bridge_udf;
24pub mod json_extensions;
26pub mod json_path;
28pub mod json_tvf;
30pub mod json_types;
32pub mod json_udaf;
34pub mod json_udf;
36pub mod live_source;
38pub mod lookup_join;
40pub mod lookup_join_exec;
42pub mod proctime_udf;
44mod source;
45mod table_provider;
46pub mod watermark_udf;
49pub mod window_udf;
51
52pub use ai_udf::{ai_function_markers, AiFunctionMarker};
53pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
54pub use channel_source::ChannelStreamSource;
55pub use complex_type_lambda::{
56 register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
57 MapTransformValues,
58};
59pub use complex_type_udf::{
60 register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
61 StructExtract, StructMerge, StructRename, StructSet,
62};
63#[cfg(feature = "cluster")]
64pub use distributed_scan::{DistributedScanExec, DistributedTableProvider};
65pub use exec::StreamingScanExec;
66pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
67pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
68pub use json_extensions::{
69 register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
70 JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
71};
72pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
73pub use json_tvf::{
74 register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
75 JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
76};
77pub use json_udaf::{JsonAgg, JsonObjectAgg};
78pub use json_udf::{
79 JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
80 JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
81 JsonbGetText, JsonbGetTextIdx, ToJsonb,
82};
83pub use live_source::{LiveSourceHandle, LiveSourceProvider};
84pub use lookup_join_exec::{
85 LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
86 PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
87 VersionedLookupState,
88};
89pub use proctime_udf::ProcTimeUdf;
90pub use source::{SortColumn, StreamSource, StreamSourceRef};
91pub use table_provider::StreamingTableProvider;
92pub use watermark_udf::WatermarkUdf;
93pub use window_udf::{
94 CumulateWindowEnd, CumulateWindowStart, HopWindowEnd, HopWindowStart, SessionWindowStart,
95 TumbleWindowEnd, TumbleWindowStart,
96};
97
98use std::sync::atomic::AtomicI64;
99use std::sync::Arc;
100
101use datafusion::execution::SessionStateBuilder;
102use datafusion::prelude::*;
103use datafusion_expr::ScalarUDF;
104
105use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
106
107#[must_use]
114pub fn base_session_config() -> SessionConfig {
115 let mut config = SessionConfig::new();
116 config.options_mut().sql_parser.enable_ident_normalization = false;
117 config = config.with_target_partitions(1);
121 config
122}
123
124#[must_use]
130pub fn create_session_context() -> SessionContext {
131 SessionContext::new_with_config(base_session_config())
132}
133
134#[must_use]
155pub fn create_streaming_context() -> SessionContext {
156 create_streaming_context_with_validator(StreamingValidatorMode::Reject)
157}
158
159#[must_use]
167pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
168 let config = base_session_config().with_batch_size(8192);
169
170 let ctx = if matches!(mode, StreamingValidatorMode::Off) {
171 SessionContext::new_with_config(config)
172 } else {
173 let default_state = SessionStateBuilder::new()
177 .with_config(config.clone())
178 .with_default_features()
179 .build();
180 let mut rules: Vec<
181 Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
182 > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
183 #[cfg(feature = "cluster")]
184 rules.push(Arc::new(cluster_repartition::DistributedJoinRule));
185 rules.extend(default_state.physical_optimizers().iter().cloned());
186
187 let state = SessionStateBuilder::new()
188 .with_config(config)
189 .with_default_features()
190 .with_physical_optimizer_rules(rules)
191 .build();
192 SessionContext::new_with_state(state)
193 };
194
195 register_streaming_functions(&ctx);
196 ctx
197}
198
199fn register_non_watermark_udfs(ctx: &SessionContext) {
204 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
205 ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowEnd::new()));
206 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
207 ctx.register_udf(ScalarUDF::new_from_impl(HopWindowEnd::new()));
208 ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
209 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
210 ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowEnd::new()));
211 ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
212 for marker in ai_function_markers() {
213 ctx.register_udf(marker);
214 }
215 register_json_functions(ctx);
216 register_json_extensions(ctx);
217 register_complex_type_functions(ctx);
218 register_lambda_functions(ctx);
219}
220
221pub fn register_streaming_functions(ctx: &SessionContext) {
226 register_non_watermark_udfs(ctx);
227 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
228}
229
230pub fn register_streaming_functions_with_watermark(
235 ctx: &SessionContext,
236 watermark_ms: Arc<AtomicI64>,
237) {
238 register_non_watermark_udfs(ctx);
239 ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
240}
241
242pub fn register_json_functions(ctx: &SessionContext) {
245 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
247 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
248 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
249 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
250 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
251 ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
252
253 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
255 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
256 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
257
258 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
260 ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
261
262 ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
264 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
265 ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
266 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
267
268 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
270 ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
271 JsonObjectAgg::new(),
272 ));
273
274 ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
276 ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
277 ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
278 ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
279
280 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
282 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
283
284 register_json_table_functions(ctx);
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use arrow_array::{Float64Array, Int64Array, RecordBatch};
292 use arrow_schema::{DataType, Field, Schema};
293 use datafusion::execution::FunctionRegistry;
294 use futures::StreamExt;
295 use std::sync::Arc;
296
297 fn test_schema() -> Arc<Schema> {
298 Arc::new(Schema::new(vec![
299 Field::new("id", DataType::Int64, false),
300 Field::new("value", DataType::Float64, true),
301 ]))
302 }
303
304 fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
306 source.take_sender().expect("sender already taken")
307 }
308
309 fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
310 RecordBatch::try_new(
311 Arc::clone(schema),
312 vec![
313 Arc::new(Int64Array::from(ids)),
314 Arc::new(Float64Array::from(values)),
315 ],
316 )
317 .unwrap()
318 }
319
320 #[test]
321 fn test_create_streaming_context() {
322 let ctx = create_streaming_context();
323 let state = ctx.state();
324 let config = state.config();
325
326 assert_eq!(config.batch_size(), 8192);
327 assert_eq!(config.target_partitions(), 1);
328 }
329
330 #[tokio::test]
331 async fn test_full_query_pipeline() {
332 let ctx = create_streaming_context();
333 let schema = test_schema();
334
335 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
337 let sender = take_test_sender(&source);
338 let provider = StreamingTableProvider::new("events", source);
339 ctx.register_table("events", Arc::new(provider)).unwrap();
340
341 sender
343 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
344 .await
345 .unwrap();
346 sender
347 .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
348 .await
349 .unwrap();
350 drop(sender); let df = ctx.sql("SELECT * FROM events").await.unwrap();
354 let batches = df.collect().await.unwrap();
355
356 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
358 assert_eq!(total_rows, 5);
359 }
360
361 #[tokio::test]
362 async fn test_query_with_projection() {
363 let ctx = create_streaming_context();
364 let schema = test_schema();
365
366 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
367 let sender = take_test_sender(&source);
368 let provider = StreamingTableProvider::new("events", source);
369 ctx.register_table("events", Arc::new(provider)).unwrap();
370
371 sender
372 .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
373 .await
374 .unwrap();
375 drop(sender);
376
377 let df = ctx.sql("SELECT id FROM events").await.unwrap();
379 let batches = df.collect().await.unwrap();
380
381 assert_eq!(batches.len(), 1);
382 assert_eq!(batches[0].num_columns(), 1);
383 assert_eq!(batches[0].schema().field(0).name(), "id");
384 }
385
386 #[tokio::test]
387 async fn test_query_with_filter() {
388 let ctx = create_streaming_context();
389 let schema = test_schema();
390
391 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
392 let sender = take_test_sender(&source);
393 let provider = StreamingTableProvider::new("events", source);
394 ctx.register_table("events", Arc::new(provider)).unwrap();
395
396 sender
397 .send(test_batch(
398 &schema,
399 vec![1, 2, 3, 4, 5],
400 vec![10.0, 20.0, 30.0, 40.0, 50.0],
401 ))
402 .await
403 .unwrap();
404 drop(sender);
405
406 let df = ctx
408 .sql("SELECT * FROM events WHERE value > 25")
409 .await
410 .unwrap();
411 let batches = df.collect().await.unwrap();
412
413 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
414 assert_eq!(total_rows, 3); }
416
417 #[tokio::test]
418 async fn test_unbounded_aggregation_rejected() {
419 let ctx = create_streaming_context();
422 let schema = test_schema();
423
424 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
425 let sender = take_test_sender(&source);
426 let provider = StreamingTableProvider::new("events", source);
427 ctx.register_table("events", Arc::new(provider)).unwrap();
428
429 sender
430 .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
431 .await
432 .unwrap();
433 drop(sender);
434
435 let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
437
438 let result = df.collect().await;
440 assert!(
441 result.is_err(),
442 "Aggregation on unbounded stream should fail"
443 );
444 }
445
446 #[tokio::test]
447 async fn test_query_with_order_by() {
448 let ctx = create_streaming_context();
449 let schema = test_schema();
450
451 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
452 let sender = take_test_sender(&source);
453 let provider = StreamingTableProvider::new("events", source);
454 ctx.register_table("events", Arc::new(provider)).unwrap();
455
456 sender
457 .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
458 .await
459 .unwrap();
460 drop(sender);
461
462 let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
464 let batches = df.collect().await.unwrap();
465
466 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
468 assert_eq!(total_rows, 3);
469 }
470
471 #[tokio::test]
472 async fn test_bridge_throughput() {
473 let schema = test_schema();
475 let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
476 let sender = bridge.sender();
477 let mut stream = bridge.into_stream();
478
479 let batch_count = 1000;
480 let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
481
482 let send_task = tokio::spawn(async move {
484 for _ in 0..batch_count {
485 sender.send(batch.clone()).await.unwrap();
486 }
487 });
488
489 let mut received = 0;
491 while let Some(result) = stream.next().await {
492 result.unwrap();
493 received += 1;
494 if received == batch_count {
495 break;
496 }
497 }
498
499 send_task.await.unwrap();
500 assert_eq!(received, batch_count);
501 }
502
503 #[test]
506 fn test_streaming_functions_registered() {
507 let ctx = create_streaming_context();
508 assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
510 assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
511 assert!(ctx.udf("session").is_ok(), "session UDF not registered");
512 assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
513 }
514
515 #[test]
516 fn test_streaming_functions_with_watermark() {
517 use std::sync::atomic::AtomicI64;
518
519 let ctx = create_session_context();
520 let wm = Arc::new(AtomicI64::new(42_000));
521 register_streaming_functions_with_watermark(&ctx, wm);
522
523 assert!(ctx.udf("tumble").is_ok());
524 assert!(ctx.udf("watermark").is_ok());
525 }
526
527 #[tokio::test]
528 async fn test_tumble_udf_via_datafusion() {
529 use arrow_array::{TimestampMicrosecondArray, TimestampMillisecondArray};
530 use arrow_schema::TimeUnit;
531
532 let ctx = create_streaming_context();
533
534 let schema = Arc::new(Schema::new(vec![
536 Field::new(
537 "event_time",
538 DataType::Timestamp(TimeUnit::Millisecond, None),
539 false,
540 ),
541 Field::new("value", DataType::Float64, false),
542 ]));
543
544 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
545 let sender = take_test_sender(&source);
546 let provider = StreamingTableProvider::new("events", source);
547 ctx.register_table("events", Arc::new(provider)).unwrap();
548
549 let batch = RecordBatch::try_new(
553 Arc::clone(&schema),
554 vec![
555 Arc::new(TimestampMillisecondArray::from(vec![
556 60_000i64, 120_000, 360_000,
557 ])),
558 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
559 ],
560 )
561 .unwrap();
562 sender.send(batch).await.unwrap();
563 drop(sender);
564
565 let df = ctx
568 .sql(
569 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
570 value \
571 FROM events",
572 )
573 .await
574 .unwrap();
575
576 let batches = df.collect().await.unwrap();
577 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
578 assert_eq!(total_rows, 3);
579
580 let ws_col = batches[0]
583 .column(0)
584 .as_any()
585 .downcast_ref::<TimestampMicrosecondArray>()
586 .expect("window_start should be TimestampMicrosecond");
587 assert_eq!(ws_col.value(0), 0);
589 assert_eq!(ws_col.value(1), 0);
590 assert_eq!(ws_col.value(2), 300_000_000);
592 }
593
594 #[tokio::test]
595 async fn test_logical_plan_from_windowed_query() {
596 use arrow_schema::TimeUnit;
597
598 let ctx = create_streaming_context();
599
600 let schema = Arc::new(Schema::new(vec![
601 Field::new(
602 "event_time",
603 DataType::Timestamp(TimeUnit::Millisecond, None),
604 false,
605 ),
606 Field::new("value", DataType::Float64, false),
607 ]));
608
609 let source = Arc::new(ChannelStreamSource::new(schema));
610 let _sender = source.take_sender();
611 let provider = StreamingTableProvider::new("events", source);
612 ctx.register_table("events", Arc::new(provider)).unwrap();
613
614 let df = ctx
616 .sql(
617 "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
618 COUNT(*) as cnt \
619 FROM events \
620 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
621 )
622 .await;
623
624 assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
626 }
627
628 #[tokio::test]
629 async fn test_end_to_end_execute_streaming_sql() {
630 use crate::planner::StreamingPlanner;
631
632 let ctx = create_streaming_context();
633
634 let schema = Arc::new(Schema::new(vec![
635 Field::new("id", DataType::Int64, false),
636 Field::new("name", DataType::Utf8, true),
637 ]));
638
639 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
640 let sender = take_test_sender(&source);
641 let provider = StreamingTableProvider::new("items", source);
642 ctx.register_table("items", Arc::new(provider)).unwrap();
643
644 let batch = RecordBatch::try_new(
645 Arc::clone(&schema),
646 vec![
647 Arc::new(Int64Array::from(vec![1, 2, 3])),
648 Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
649 ],
650 )
651 .unwrap();
652 sender.send(batch).await.unwrap();
653 drop(sender);
654
655 let mut planner = StreamingPlanner::new();
656 let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
657 .await
658 .unwrap();
659
660 match result {
661 StreamingSqlResult::Query(qr) => {
662 let mut stream = qr.stream;
663 let mut total = 0;
664 while let Some(batch) = stream.next().await {
665 total += batch.unwrap().num_rows();
666 }
667 assert_eq!(total, 2); }
669 StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
670 }
671 }
672
673 #[tokio::test]
674 async fn test_watermark_function_in_filter() {
675 use arrow_array::TimestampMillisecondArray;
676 use arrow_schema::TimeUnit;
677 use std::sync::atomic::AtomicI64;
678
679 let config = base_session_config()
681 .with_batch_size(8192)
682 .with_target_partitions(1);
683 let ctx = SessionContext::new_with_config(config);
684 let wm = Arc::new(AtomicI64::new(200_000)); register_streaming_functions_with_watermark(&ctx, wm);
686
687 let schema = Arc::new(Schema::new(vec![
688 Field::new(
689 "event_time",
690 DataType::Timestamp(TimeUnit::Millisecond, None),
691 false,
692 ),
693 Field::new("value", DataType::Float64, false),
694 ]));
695
696 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
697 let sender = take_test_sender(&source);
698 let provider = StreamingTableProvider::new("events", source);
699 ctx.register_table("events", Arc::new(provider)).unwrap();
700
701 let batch = RecordBatch::try_new(
703 Arc::clone(&schema),
704 vec![
705 Arc::new(TimestampMillisecondArray::from(vec![
706 100_000i64, 200_000, 300_000,
707 ])),
708 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
709 ],
710 )
711 .unwrap();
712 sender.send(batch).await.unwrap();
713 drop(sender);
714
715 let df = ctx
717 .sql("SELECT value FROM events WHERE event_time > watermark()")
718 .await
719 .unwrap();
720 let batches = df.collect().await.unwrap();
721 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
722 assert_eq!(total_rows, 1);
724 }
725
726 #[tokio::test]
727 async fn test_date_trunc_available() {
728 let ctx = create_streaming_context();
729 let df = ctx
730 .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
731 .await
732 .unwrap();
733 let batches = df.collect().await.unwrap();
734 assert_eq!(batches.len(), 1);
735 assert_eq!(batches[0].num_rows(), 1);
736 }
737
738 #[tokio::test]
739 async fn test_date_bin_available() {
740 let ctx = create_streaming_context();
741 let df = ctx
742 .sql(
743 "SELECT date_bin(\
744 INTERVAL '15 minutes', \
745 TIMESTAMP '2026-01-15 14:32:00', \
746 TIMESTAMP '2026-01-01 00:00:00')",
747 )
748 .await
749 .unwrap();
750 let batches = df.collect().await.unwrap();
751 assert_eq!(batches.len(), 1);
752 assert_eq!(batches[0].num_rows(), 1);
753 }
754
755 #[tokio::test]
756 async fn test_unnest_literal_array() {
757 let ctx = create_streaming_context();
758 let df = ctx
759 .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
760 .await
761 .unwrap();
762 let batches = df.collect().await.unwrap();
763 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
764 assert_eq!(total_rows, 3);
765 }
766
767 #[tokio::test]
768 async fn test_unnest_from_table_with_array_col() {
769 let ctx = create_streaming_context();
770 ctx.sql(
772 "CREATE TABLE arr_table (id INT, tags INT[]) \
773 AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
774 )
775 .await
776 .unwrap();
777 let df = ctx
778 .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
779 .await
780 .unwrap();
781 let batches = df.collect().await.unwrap();
782 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
783 assert_eq!(total_rows, 3);
785 }
786}