Skip to main content

laminar_sql/datafusion/
mod.rs

1//! DataFusion integration for SQL processing.
2
3/// Marker UDFs for the `ai_*` SQL functions (rewritten by the AI operator).
4pub mod ai_udf;
5mod bridge;
6mod channel_source;
7/// Cross-instance hash repartition for distributed GROUP BY. Gated on
8/// `cluster` because it pulls in the shuffle transport.
9#[cfg(feature = "cluster")]
10pub mod cluster_repartition;
11/// Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3)
12pub mod complex_type_lambda;
13/// Array, Struct, and Map scalar UDFs (F-SCHEMA-015)
14pub mod complex_type_udf;
15/// Pull-path distributed table scan that unions every node's local rows.
16/// Gated on `cluster` because it pulls in the cross-node query client.
17#[cfg(feature = "cluster")]
18pub mod distributed_scan;
19mod exec;
20/// End-to-end streaming SQL execution
21pub mod execute;
22/// Format bridge UDFs for inline format conversion
23pub mod format_bridge_udf;
24/// LaminarDB streaming JSON extension UDFs (F-SCHEMA-013)
25pub mod json_extensions;
26/// SQL/JSON path query compiler and scalar UDFs
27pub mod json_path;
28/// JSON table-valued functions (array/object expansion)
29pub mod json_tvf;
30/// JSONB binary format types for JSON UDF evaluation
31pub mod json_types;
32/// PostgreSQL-compatible JSON aggregate UDAFs
33pub mod json_udaf;
34/// PostgreSQL-compatible JSON scalar UDFs
35pub mod json_udf;
36/// Live source provider for streaming execution with plan caching
37pub mod live_source;
38/// Lookup join plan node for DataFusion.
39pub mod lookup_join;
40/// Physical execution plan and extension planner for lookup joins.
41pub mod lookup_join_exec;
42/// Processing-time UDF for `PROCTIME()` support
43pub mod proctime_udf;
44mod source;
45mod table_provider;
46/// Dynamic watermark filter for scan-level late-data pruning
47/// Watermark UDF for current watermark access
48pub mod watermark_udf;
49/// Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE)
50pub mod window_udf;
51
52pub use ai_udf::{ai_function_markers, AiFunctionMarker};
53pub use bridge::{BridgeSendError, BridgeSender, BridgeStream, BridgeTrySendError, StreamBridge};
54pub use channel_source::ChannelStreamSource;
55pub use complex_type_lambda::{
56    register_lambda_functions, ArrayFilter, ArrayReduce, ArrayTransform, MapFilter,
57    MapTransformValues,
58};
59pub use complex_type_udf::{
60    register_complex_type_functions, MapContainsKey, MapFromArrays, MapKeys, MapValues, StructDrop,
61    StructExtract, StructMerge, StructRename, StructSet,
62};
63#[cfg(feature = "cluster")]
64pub use distributed_scan::{DistributedScanExec, DistributedTableProvider};
65pub use exec::StreamingScanExec;
66pub use execute::{execute_streaming_sql, DdlResult, QueryResult, StreamingSqlResult};
67pub use format_bridge_udf::{FromJsonUdf, ParseEpochUdf, ParseTimestampUdf, ToJsonUdf};
68pub use json_extensions::{
69    register_json_extensions, JsonInferSchema, JsonToColumns, JsonbDeepMerge, JsonbExcept,
70    JsonbFlatten, JsonbMerge, JsonbPick, JsonbRenameKeys, JsonbStripNulls, JsonbUnflatten,
71};
72pub use json_path::{CompiledJsonPath, JsonPathStep, JsonbPathExistsUdf, JsonbPathMatchUdf};
73pub use json_tvf::{
74    register_json_table_functions, JsonbArrayElementsTextTvf, JsonbArrayElementsTvf,
75    JsonbEachTextTvf, JsonbEachTvf, JsonbObjectKeysTvf,
76};
77pub use json_udaf::{JsonAgg, JsonObjectAgg};
78pub use json_udf::{
79    JsonBuildArray, JsonBuildObject, JsonTypeof, JsonbContainedBy, JsonbContains, JsonbExists,
80    JsonbExistsAll, JsonbExistsAny, JsonbGet, JsonbGetIdx, JsonbGetPath, JsonbGetPathText,
81    JsonbGetText, JsonbGetTextIdx, ToJsonb,
82};
83pub use live_source::{LiveSourceHandle, LiveSourceProvider};
84pub use lookup_join_exec::{
85    LookupJoinExec, LookupJoinExtensionPlanner, LookupSnapshot, LookupTableRegistry,
86    PartialLookupJoinExec, PartialLookupState, RegisteredLookup, VersionedLookupJoinExec,
87    VersionedLookupState,
88};
89pub use proctime_udf::ProcTimeUdf;
90pub use source::{SortColumn, StreamSource, StreamSourceRef};
91pub use table_provider::StreamingTableProvider;
92pub use watermark_udf::WatermarkUdf;
93pub use window_udf::{
94    CumulateWindowEnd, CumulateWindowStart, HopWindowEnd, HopWindowStart, SessionWindowStart,
95    TumbleWindowEnd, TumbleWindowStart,
96};
97
98use std::sync::atomic::AtomicI64;
99use std::sync::Arc;
100
101use datafusion::execution::SessionStateBuilder;
102use datafusion::prelude::*;
103use datafusion_expr::ScalarUDF;
104
105use crate::planner::streaming_optimizer::{StreamingPhysicalValidator, StreamingValidatorMode};
106
107/// Returns a base `SessionConfig` with identifier normalization disabled.
108///
109/// DataFusion's default behaviour lowercases all unquoted SQL identifiers
110/// (per the SQL standard). LaminarDB disables this so that mixed-case
111/// column names from external sources (Kafka, CDC, WebSocket) can be
112/// referenced without double-quoting.
113#[must_use]
114pub fn base_session_config() -> SessionConfig {
115    let mut config = SessionConfig::new();
116    config.options_mut().sql_parser.enable_ident_normalization = false;
117    // Single partition for streaming micro-batch execution. Multi-partition
118    // plans contain stateful operators (RepartitionExec) that cannot be
119    // reused across cycles, causing panics on cached physical plans.
120    config = config.with_target_partitions(1);
121    config
122}
123
124/// Creates a `DataFusion` session context with identifier normalization
125/// disabled.
126///
127/// Suitable for ad-hoc / non-streaming queries (filters, lookups).
128/// For streaming workloads prefer [`create_streaming_context`].
129#[must_use]
130pub fn create_session_context() -> SessionContext {
131    SessionContext::new_with_config(base_session_config())
132}
133
134/// Creates a `DataFusion` session context configured for streaming queries.
135///
136/// The context is configured with:
137/// - Batch size of 8192 (balanced for streaming throughput)
138/// - Single partition (streaming sources are typically not partitioned)
139/// - Identifier normalization disabled (mixed-case columns work unquoted)
140/// - All streaming UDFs registered (TUMBLE, HOP, SESSION, WATERMARK)
141/// - `StreamingPhysicalValidator` in `Reject` mode (blocks unsafe plans)
142///
143/// The watermark UDF is initialized with no watermark set (returns NULL).
144/// Use [`register_streaming_functions_with_watermark`] to provide a live
145/// watermark source.
146///
147/// # Example
148///
149/// ```rust,ignore
150/// let ctx = create_streaming_context();
151/// ctx.register_table("events", provider)?;
152/// let df = ctx.sql("SELECT * FROM events").await?;
153/// ```
154#[must_use]
155pub fn create_streaming_context() -> SessionContext {
156    create_streaming_context_with_validator(StreamingValidatorMode::Reject)
157}
158
159/// Creates a streaming context with a configurable validator mode.
160///
161/// Same as [`create_streaming_context`] but allows choosing how the
162/// [`StreamingPhysicalValidator`] handles plan violations.
163///
164/// Use [`StreamingValidatorMode::Off`] to get the previous behaviour
165/// (no plan-time validation).
166#[must_use]
167pub fn create_streaming_context_with_validator(mode: StreamingValidatorMode) -> SessionContext {
168    let config = base_session_config().with_batch_size(8192);
169
170    let ctx = if matches!(mode, StreamingValidatorMode::Off) {
171        SessionContext::new_with_config(config)
172    } else {
173        // Build a default state to get the standard optimizer rules, then
174        // prepend our streaming validator so it fires before DataFusion's
175        // built-in SanityCheckPlan (which produces generic error messages).
176        let default_state = SessionStateBuilder::new()
177            .with_config(config.clone())
178            .with_default_features()
179            .build();
180        let mut rules: Vec<
181            Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
182        > = vec![Arc::new(StreamingPhysicalValidator::new(mode))];
183        #[cfg(feature = "cluster")]
184        rules.push(Arc::new(cluster_repartition::DistributedJoinRule));
185        rules.extend(default_state.physical_optimizers().iter().cloned());
186
187        let state = SessionStateBuilder::new()
188            .with_config(config)
189            .with_default_features()
190            .with_physical_optimizer_rules(rules)
191            .build();
192        SessionContext::new_with_state(state)
193    };
194
195    register_streaming_functions(&ctx);
196    ctx
197}
198
199/// Window-time, JSON, complex-type, lambda, and `proctime()` UDFs —
200/// every streaming UDF except `watermark()`. Pulled out of the public
201/// `register_streaming_functions*` entry points so they share a single
202/// list and stay in sync.
203fn register_non_watermark_udfs(ctx: &SessionContext) {
204    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowStart::new()));
205    ctx.register_udf(ScalarUDF::new_from_impl(TumbleWindowEnd::new()));
206    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowStart::new()));
207    ctx.register_udf(ScalarUDF::new_from_impl(HopWindowEnd::new()));
208    ctx.register_udf(ScalarUDF::new_from_impl(SessionWindowStart::new()));
209    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowStart::new()));
210    ctx.register_udf(ScalarUDF::new_from_impl(CumulateWindowEnd::new()));
211    ctx.register_udf(ScalarUDF::new_from_impl(ProcTimeUdf::new()));
212    for marker in ai_function_markers() {
213        ctx.register_udf(marker);
214    }
215    register_json_functions(ctx);
216    register_json_extensions(ctx);
217    register_complex_type_functions(ctx);
218    register_lambda_functions(ctx);
219}
220
221/// Registers `LaminarDB` streaming UDFs with a session context. The
222/// `watermark()` UDF is registered in unset mode (always returns NULL);
223/// use [`register_streaming_functions_with_watermark`] to provide a
224/// live watermark source from Ring 0.
225pub fn register_streaming_functions(ctx: &SessionContext) {
226    register_non_watermark_udfs(ctx);
227    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::unset()));
228}
229
230/// Registers streaming UDFs with a live watermark source — same as
231/// [`register_streaming_functions`] but `watermark()` reads
232/// `watermark_ms` (in milliseconds since epoch; values < 0 mean "no
233/// watermark", returning NULL).
234pub fn register_streaming_functions_with_watermark(
235    ctx: &SessionContext,
236    watermark_ms: Arc<AtomicI64>,
237) {
238    register_non_watermark_udfs(ctx);
239    ctx.register_udf(ScalarUDF::new_from_impl(WatermarkUdf::new(watermark_ms)));
240}
241
242/// Registers all PostgreSQL-compatible JSON UDFs and UDAFs
243/// with the given `SessionContext`.
244pub fn register_json_functions(ctx: &SessionContext) {
245    // Extraction operators
246    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGet::new()));
247    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetIdx::new()));
248    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetText::new()));
249    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetTextIdx::new()));
250    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPath::new()));
251    ctx.register_udf(ScalarUDF::new_from_impl(JsonbGetPathText::new()));
252
253    // Existence operators
254    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExists::new()));
255    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAny::new()));
256    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExistsAll::new()));
257
258    // Containment operators
259    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContains::new()));
260    ctx.register_udf(ScalarUDF::new_from_impl(JsonbContainedBy::new()));
261
262    // Interrogation / construction
263    ctx.register_udf(ScalarUDF::new_from_impl(JsonTypeof::new()));
264    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildObject::new()));
265    ctx.register_udf(ScalarUDF::new_from_impl(JsonBuildArray::new()));
266    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonb::new()));
267
268    // Aggregates
269    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(JsonAgg::new()));
270    ctx.register_udaf(datafusion_expr::AggregateUDF::new_from_impl(
271        JsonObjectAgg::new(),
272    ));
273
274    // Format bridge functions
275    ctx.register_udf(ScalarUDF::new_from_impl(ParseEpochUdf::new()));
276    ctx.register_udf(ScalarUDF::new_from_impl(ParseTimestampUdf::new()));
277    ctx.register_udf(ScalarUDF::new_from_impl(ToJsonUdf::new()));
278    ctx.register_udf(ScalarUDF::new_from_impl(FromJsonUdf::new()));
279
280    // JSON path query functions (scalar)
281    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathExistsUdf::new()));
282    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPathMatchUdf::new()));
283
284    // JSON table-valued functions
285    register_json_table_functions(ctx);
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use arrow_array::{Float64Array, Int64Array, RecordBatch};
292    use arrow_schema::{DataType, Field, Schema};
293    use datafusion::execution::FunctionRegistry;
294    use futures::StreamExt;
295    use std::sync::Arc;
296
297    fn test_schema() -> Arc<Schema> {
298        Arc::new(Schema::new(vec![
299            Field::new("id", DataType::Int64, false),
300            Field::new("value", DataType::Float64, true),
301        ]))
302    }
303
304    /// Take the sender from a `ChannelStreamSource`, panicking if already taken.
305    fn take_test_sender(source: &ChannelStreamSource) -> super::bridge::BridgeSender {
306        source.take_sender().expect("sender already taken")
307    }
308
309    fn test_batch(schema: &Arc<Schema>, ids: Vec<i64>, values: Vec<f64>) -> RecordBatch {
310        RecordBatch::try_new(
311            Arc::clone(schema),
312            vec![
313                Arc::new(Int64Array::from(ids)),
314                Arc::new(Float64Array::from(values)),
315            ],
316        )
317        .unwrap()
318    }
319
320    #[test]
321    fn test_create_streaming_context() {
322        let ctx = create_streaming_context();
323        let state = ctx.state();
324        let config = state.config();
325
326        assert_eq!(config.batch_size(), 8192);
327        assert_eq!(config.target_partitions(), 1);
328    }
329
330    #[tokio::test]
331    async fn test_full_query_pipeline() {
332        let ctx = create_streaming_context();
333        let schema = test_schema();
334
335        // Create source and take the sender (important for channel closure)
336        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
337        let sender = take_test_sender(&source);
338        let provider = StreamingTableProvider::new("events", source);
339        ctx.register_table("events", Arc::new(provider)).unwrap();
340
341        // Send test data
342        sender
343            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
344            .await
345            .unwrap();
346        sender
347            .send(test_batch(&schema, vec![4, 5], vec![40.0, 50.0]))
348            .await
349            .unwrap();
350        drop(sender); // Close the channel
351
352        // Execute query
353        let df = ctx.sql("SELECT * FROM events").await.unwrap();
354        let batches = df.collect().await.unwrap();
355
356        // Verify results
357        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
358        assert_eq!(total_rows, 5);
359    }
360
361    #[tokio::test]
362    async fn test_query_with_projection() {
363        let ctx = create_streaming_context();
364        let schema = test_schema();
365
366        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
367        let sender = take_test_sender(&source);
368        let provider = StreamingTableProvider::new("events", source);
369        ctx.register_table("events", Arc::new(provider)).unwrap();
370
371        sender
372            .send(test_batch(&schema, vec![1, 2], vec![100.0, 200.0]))
373            .await
374            .unwrap();
375        drop(sender);
376
377        // Query only the id column
378        let df = ctx.sql("SELECT id FROM events").await.unwrap();
379        let batches = df.collect().await.unwrap();
380
381        assert_eq!(batches.len(), 1);
382        assert_eq!(batches[0].num_columns(), 1);
383        assert_eq!(batches[0].schema().field(0).name(), "id");
384    }
385
386    #[tokio::test]
387    async fn test_query_with_filter() {
388        let ctx = create_streaming_context();
389        let schema = test_schema();
390
391        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
392        let sender = take_test_sender(&source);
393        let provider = StreamingTableProvider::new("events", source);
394        ctx.register_table("events", Arc::new(provider)).unwrap();
395
396        sender
397            .send(test_batch(
398                &schema,
399                vec![1, 2, 3, 4, 5],
400                vec![10.0, 20.0, 30.0, 40.0, 50.0],
401            ))
402            .await
403            .unwrap();
404        drop(sender);
405
406        // Filter for value > 25
407        let df = ctx
408            .sql("SELECT * FROM events WHERE value > 25")
409            .await
410            .unwrap();
411        let batches = df.collect().await.unwrap();
412
413        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
414        assert_eq!(total_rows, 3); // 30, 40, 50
415    }
416
417    #[tokio::test]
418    async fn test_unbounded_aggregation_rejected() {
419        // Aggregations on unbounded streams should be rejected by `DataFusion`.
420        // Streaming aggregations require windows, which are implemented.
421        let ctx = create_streaming_context();
422        let schema = test_schema();
423
424        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
425        let sender = take_test_sender(&source);
426        let provider = StreamingTableProvider::new("events", source);
427        ctx.register_table("events", Arc::new(provider)).unwrap();
428
429        sender
430            .send(test_batch(&schema, vec![1, 2, 3], vec![10.0, 20.0, 30.0]))
431            .await
432            .unwrap();
433        drop(sender);
434
435        // Aggregate query on unbounded stream should fail at execution
436        let df = ctx.sql("SELECT COUNT(*) as cnt FROM events").await.unwrap();
437
438        // Execution should fail because we can't aggregate an infinite stream
439        let result = df.collect().await;
440        assert!(
441            result.is_err(),
442            "Aggregation on unbounded stream should fail"
443        );
444    }
445
446    #[tokio::test]
447    async fn test_query_with_order_by() {
448        let ctx = create_streaming_context();
449        let schema = test_schema();
450
451        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
452        let sender = take_test_sender(&source);
453        let provider = StreamingTableProvider::new("events", source);
454        ctx.register_table("events", Arc::new(provider)).unwrap();
455
456        sender
457            .send(test_batch(&schema, vec![3, 1, 2], vec![30.0, 10.0, 20.0]))
458            .await
459            .unwrap();
460        drop(sender);
461
462        // Query with ORDER BY (`DataFusion` handles this with Sort operator)
463        let df = ctx.sql("SELECT id, value FROM events").await.unwrap();
464        let batches = df.collect().await.unwrap();
465
466        // Verify we got results (ordering may vary due to streaming nature)
467        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
468        assert_eq!(total_rows, 3);
469    }
470
471    #[tokio::test]
472    async fn test_bridge_throughput() {
473        // Benchmark-style test for bridge performance
474        let schema = test_schema();
475        let bridge = StreamBridge::new(Arc::clone(&schema), 10000);
476        let sender = bridge.sender();
477        let mut stream = bridge.into_stream();
478
479        let batch_count = 1000;
480        let batch = test_batch(&schema, vec![1, 2, 3, 4, 5], vec![1.0, 2.0, 3.0, 4.0, 5.0]);
481
482        // Spawn sender task
483        let send_task = tokio::spawn(async move {
484            for _ in 0..batch_count {
485                sender.send(batch.clone()).await.unwrap();
486            }
487        });
488
489        // Receive all batches
490        let mut received = 0;
491        while let Some(result) = stream.next().await {
492            result.unwrap();
493            received += 1;
494            if received == batch_count {
495                break;
496            }
497        }
498
499        send_task.await.unwrap();
500        assert_eq!(received, batch_count);
501    }
502
503    // ── Integration Tests ──────────────────────────────────────────
504
505    #[test]
506    fn test_streaming_functions_registered() {
507        let ctx = create_streaming_context();
508        // Verify all 4 UDFs are registered
509        assert!(ctx.udf("tumble").is_ok(), "tumble UDF not registered");
510        assert!(ctx.udf("hop").is_ok(), "hop UDF not registered");
511        assert!(ctx.udf("session").is_ok(), "session UDF not registered");
512        assert!(ctx.udf("watermark").is_ok(), "watermark UDF not registered");
513    }
514
515    #[test]
516    fn test_streaming_functions_with_watermark() {
517        use std::sync::atomic::AtomicI64;
518
519        let ctx = create_session_context();
520        let wm = Arc::new(AtomicI64::new(42_000));
521        register_streaming_functions_with_watermark(&ctx, wm);
522
523        assert!(ctx.udf("tumble").is_ok());
524        assert!(ctx.udf("watermark").is_ok());
525    }
526
527    #[tokio::test]
528    async fn test_tumble_udf_via_datafusion() {
529        use arrow_array::{TimestampMicrosecondArray, TimestampMillisecondArray};
530        use arrow_schema::TimeUnit;
531
532        let ctx = create_streaming_context();
533
534        // Create schema with timestamp and value columns
535        let schema = Arc::new(Schema::new(vec![
536            Field::new(
537                "event_time",
538                DataType::Timestamp(TimeUnit::Millisecond, None),
539                false,
540            ),
541            Field::new("value", DataType::Float64, false),
542        ]));
543
544        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
545        let sender = take_test_sender(&source);
546        let provider = StreamingTableProvider::new("events", source);
547        ctx.register_table("events", Arc::new(provider)).unwrap();
548
549        // Send events across two 5-minute windows:
550        // Window [0, 300_000): timestamps 60_000, 120_000
551        // Window [300_000, 600_000): timestamps 360_000
552        let batch = RecordBatch::try_new(
553            Arc::clone(&schema),
554            vec![
555                Arc::new(TimestampMillisecondArray::from(vec![
556                    60_000i64, 120_000, 360_000,
557                ])),
558                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
559            ],
560        )
561        .unwrap();
562        sender.send(batch).await.unwrap();
563        drop(sender);
564
565        // Verify the tumble UDF computes correct window starts via DataFusion
566        // (GROUP BY aggregation and ORDER BY on unbounded streams are handled by Ring 0)
567        let df = ctx
568            .sql(
569                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as window_start, \
570                 value \
571                 FROM events",
572            )
573            .await
574            .unwrap();
575
576        let batches = df.collect().await.unwrap();
577        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
578        assert_eq!(total_rows, 3);
579
580        // tumble() returns microsecond timestamps for lakehouse-sink compat;
581        // expected window starts are scaled accordingly (× 1000).
582        let ws_col = batches[0]
583            .column(0)
584            .as_any()
585            .downcast_ref::<TimestampMicrosecondArray>()
586            .expect("window_start should be TimestampMicrosecond");
587        // 60_000 and 120_000 ms → window [0, 300_000) ms, start = 0 µs.
588        assert_eq!(ws_col.value(0), 0);
589        assert_eq!(ws_col.value(1), 0);
590        // 360_000 ms → window [300_000, 600_000) ms, start = 300_000_000 µs.
591        assert_eq!(ws_col.value(2), 300_000_000);
592    }
593
594    #[tokio::test]
595    async fn test_logical_plan_from_windowed_query() {
596        use arrow_schema::TimeUnit;
597
598        let ctx = create_streaming_context();
599
600        let schema = Arc::new(Schema::new(vec![
601            Field::new(
602                "event_time",
603                DataType::Timestamp(TimeUnit::Millisecond, None),
604                false,
605            ),
606            Field::new("value", DataType::Float64, false),
607        ]));
608
609        let source = Arc::new(ChannelStreamSource::new(schema));
610        let _sender = source.take_sender();
611        let provider = StreamingTableProvider::new("events", source);
612        ctx.register_table("events", Arc::new(provider)).unwrap();
613
614        // Create a LogicalPlan for a windowed query
615        let df = ctx
616            .sql(
617                "SELECT tumble(event_time, INTERVAL '5' MINUTE) as w, \
618                 COUNT(*) as cnt \
619                 FROM events \
620                 GROUP BY tumble(event_time, INTERVAL '5' MINUTE)",
621            )
622            .await;
623
624        // Should succeed in creating the logical plan (UDFs are registered)
625        assert!(df.is_ok(), "Failed to create logical plan: {df:?}");
626    }
627
628    #[tokio::test]
629    async fn test_end_to_end_execute_streaming_sql() {
630        use crate::planner::StreamingPlanner;
631
632        let ctx = create_streaming_context();
633
634        let schema = Arc::new(Schema::new(vec![
635            Field::new("id", DataType::Int64, false),
636            Field::new("name", DataType::Utf8, true),
637        ]));
638
639        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
640        let sender = take_test_sender(&source);
641        let provider = StreamingTableProvider::new("items", source);
642        ctx.register_table("items", Arc::new(provider)).unwrap();
643
644        let batch = RecordBatch::try_new(
645            Arc::clone(&schema),
646            vec![
647                Arc::new(Int64Array::from(vec![1, 2, 3])),
648                Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
649            ],
650        )
651        .unwrap();
652        sender.send(batch).await.unwrap();
653        drop(sender);
654
655        let mut planner = StreamingPlanner::new();
656        let result = execute_streaming_sql("SELECT id FROM items WHERE id > 1", &ctx, &mut planner)
657            .await
658            .unwrap();
659
660        match result {
661            StreamingSqlResult::Query(qr) => {
662                let mut stream = qr.stream;
663                let mut total = 0;
664                while let Some(batch) = stream.next().await {
665                    total += batch.unwrap().num_rows();
666                }
667                assert_eq!(total, 2); // id=2, id=3
668            }
669            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
670        }
671    }
672
673    #[tokio::test]
674    async fn test_watermark_function_in_filter() {
675        use arrow_array::TimestampMillisecondArray;
676        use arrow_schema::TimeUnit;
677        use std::sync::atomic::AtomicI64;
678
679        // Create context with a specific watermark value
680        let config = base_session_config()
681            .with_batch_size(8192)
682            .with_target_partitions(1);
683        let ctx = SessionContext::new_with_config(config);
684        let wm = Arc::new(AtomicI64::new(200_000)); // watermark at 200s
685        register_streaming_functions_with_watermark(&ctx, wm);
686
687        let schema = Arc::new(Schema::new(vec![
688            Field::new(
689                "event_time",
690                DataType::Timestamp(TimeUnit::Millisecond, None),
691                false,
692            ),
693            Field::new("value", DataType::Float64, false),
694        ]));
695
696        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
697        let sender = take_test_sender(&source);
698        let provider = StreamingTableProvider::new("events", source);
699        ctx.register_table("events", Arc::new(provider)).unwrap();
700
701        // Events: 100s, 200s, 300s - watermark is at 200s
702        let batch = RecordBatch::try_new(
703            Arc::clone(&schema),
704            vec![
705                Arc::new(TimestampMillisecondArray::from(vec![
706                    100_000i64, 200_000, 300_000,
707                ])),
708                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
709            ],
710        )
711        .unwrap();
712        sender.send(batch).await.unwrap();
713        drop(sender);
714
715        // Filter events after watermark
716        let df = ctx
717            .sql("SELECT value FROM events WHERE event_time > watermark()")
718            .await
719            .unwrap();
720        let batches = df.collect().await.unwrap();
721        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
722        // Only event at 300s is after watermark (200s)
723        assert_eq!(total_rows, 1);
724    }
725
726    #[tokio::test]
727    async fn test_date_trunc_available() {
728        let ctx = create_streaming_context();
729        let df = ctx
730            .sql("SELECT date_trunc('hour', TIMESTAMP '2026-01-15 14:30:00')")
731            .await
732            .unwrap();
733        let batches = df.collect().await.unwrap();
734        assert_eq!(batches.len(), 1);
735        assert_eq!(batches[0].num_rows(), 1);
736    }
737
738    #[tokio::test]
739    async fn test_date_bin_available() {
740        let ctx = create_streaming_context();
741        let df = ctx
742            .sql(
743                "SELECT date_bin(\
744                 INTERVAL '15 minutes', \
745                 TIMESTAMP '2026-01-15 14:32:00', \
746                 TIMESTAMP '2026-01-01 00:00:00')",
747            )
748            .await
749            .unwrap();
750        let batches = df.collect().await.unwrap();
751        assert_eq!(batches.len(), 1);
752        assert_eq!(batches[0].num_rows(), 1);
753    }
754
755    #[tokio::test]
756    async fn test_unnest_literal_array() {
757        let ctx = create_streaming_context();
758        let df = ctx
759            .sql("SELECT unnest(make_array(1, 2, 3)) AS val")
760            .await
761            .unwrap();
762        let batches = df.collect().await.unwrap();
763        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
764        assert_eq!(total_rows, 3);
765    }
766
767    #[tokio::test]
768    async fn test_unnest_from_table_with_array_col() {
769        let ctx = create_streaming_context();
770        // Register a table with an array column
771        ctx.sql(
772            "CREATE TABLE arr_table (id INT, tags INT[]) \
773             AS VALUES (1, make_array(10, 20)), (2, make_array(30))",
774        )
775        .await
776        .unwrap();
777        let df = ctx
778            .sql("SELECT id, unnest(tags) AS tag FROM arr_table")
779            .await
780            .unwrap();
781        let batches = df.collect().await.unwrap();
782        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
783        // Row 1: [10,20] → 2 rows, Row 2: [30] → 1 row = 3 total
784        assert_eq!(total_rows, 3);
785    }
786}