Skip to main content

laminar_db/pipeline/
mod.rs

1//! Streaming connector pipeline. Each source connector runs as a tokio task
2//! pushing batches via crossfire mpsc to the `StreamingCoordinator`, which
3//! drives SQL execution cycles, routes results to sinks, and manages
4//! checkpoint barriers. See the `streaming_coordinator` submodule for the
5//! runtime topology.
6
7pub mod callback;
8pub mod config;
9pub mod streaming_coordinator;
10
11pub use callback::{PipelineCallback, SourceRegistration};
12pub use config::PipelineConfig;
13pub use streaming_coordinator::StreamingCoordinator;
14
15use arrow::datatypes::SchemaRef;
16use laminar_sql::parser::EmitClause;
17use laminar_sql::translator::{OrderOperatorConfig, WindowOperatorConfig};
18
19/// Control message sent from `LaminarDB` DDL handlers to the running
20/// `StreamingCoordinator` for live schema changes (add/drop streams).
21#[derive(Debug)]
22#[allow(clippy::large_enum_variant)]
23pub enum ControlMsg {
24    /// Add a new streaming query to the running pipeline.
25    AddStream {
26        /// Stream name.
27        name: String,
28        /// SQL query text.
29        sql: String,
30        /// EMIT clause (e.g., `OnWindowClose`).
31        emit_clause: Option<EmitClause>,
32        /// Window configuration (tumbling, hopping, session).
33        window_config: Option<WindowOperatorConfig>,
34        /// ORDER BY configuration.
35        order_config: Option<OrderOperatorConfig>,
36    },
37    /// Remove a streaming query from the running pipeline.
38    DropStream {
39        /// Stream name to remove.
40        name: String,
41    },
42    /// Register a new source schema (for queries that depend on a source
43    /// created after `start()`).
44    AddSourceSchema {
45        /// Source name.
46        name: String,
47        /// Arrow schema.
48        schema: SchemaRef,
49    },
50}