laminar_db/pipeline/mod.rs
1//! Streaming connector pipeline. Each source connector runs as a tokio task
2//! pushing batches via crossfire mpsc to the `StreamingCoordinator`, which
3//! drives SQL execution cycles, routes results to sinks, and manages
4//! checkpoint barriers. See the `streaming_coordinator` submodule for the
5//! runtime topology.
6
7pub mod callback;
8pub mod config;
9pub mod streaming_coordinator;
10
11pub use callback::{PipelineCallback, SourceRegistration};
12pub use config::PipelineConfig;
13pub use streaming_coordinator::StreamingCoordinator;
14
15use arrow::datatypes::SchemaRef;
16use laminar_sql::parser::EmitClause;
17use laminar_sql::translator::{OrderOperatorConfig, WindowOperatorConfig};
18
19/// Control message sent from `LaminarDB` DDL handlers to the running
20/// `StreamingCoordinator` for live schema changes (add/drop streams).
21#[derive(Debug)]
22#[allow(clippy::large_enum_variant)]
23pub enum ControlMsg {
24 /// Add a new streaming query to the running pipeline.
25 AddStream {
26 /// Stream name.
27 name: String,
28 /// SQL query text.
29 sql: String,
30 /// EMIT clause (e.g., `OnWindowClose`).
31 emit_clause: Option<EmitClause>,
32 /// Window configuration (tumbling, hopping, session).
33 window_config: Option<WindowOperatorConfig>,
34 /// ORDER BY configuration.
35 order_config: Option<OrderOperatorConfig>,
36 },
37 /// Remove a streaming query from the running pipeline.
38 DropStream {
39 /// Stream name to remove.
40 name: String,
41 },
42 /// Register a new source schema (for queries that depend on a source
43 /// created after `start()`).
44 AddSourceSchema {
45 /// Source name.
46 name: String,
47 /// Arrow schema.
48 schema: SchemaRef,
49 },
50}