Skip to main content

laminar_db/
handle.rs

1//! Handle types for query results, source access, and subscriptions.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5use std::time::Duration;
6
7use arrow::array::RecordBatch;
8use arrow::datatypes::SchemaRef;
9
10use laminar_core::streaming::{Record, Subscription};
11
12use crate::catalog::{ArrowRecord, SourceEntry};
13use crate::DbError;
14
15/// Result of executing a SQL statement.
16#[derive(Debug)]
17pub enum ExecuteResult {
18    /// DDL statement completed (CREATE, DROP, ALTER).
19    Ddl(DdlInfo),
20    /// Query is running, subscribe to results.
21    Query(QueryHandle),
22    /// Rows were affected (INSERT INTO).
23    RowsAffected(u64),
24    /// Metadata result (SHOW, DESCRIBE).
25    Metadata(RecordBatch),
26}
27
28impl ExecuteResult {
29    /// Convert to `QueryHandle`, or error if this is not a query result.
30    ///
31    /// # Errors
32    /// Returns `DbError::InvalidOperation` if result is not a query.
33    pub fn into_query(self) -> Result<QueryHandle, DbError> {
34        match self {
35            Self::Query(q) => Ok(q),
36            _ => Err(DbError::InvalidOperation(
37                "Expected a query result".to_string(),
38            )),
39        }
40    }
41}
42
43/// Information about a completed DDL statement.
44#[derive(Debug, Clone)]
45pub struct DdlInfo {
46    /// e.g. `"CREATE SOURCE"`.
47    pub statement_type: String,
48    /// Object affected.
49    pub object_name: String,
50}
51
52/// Handle to a running streaming query.
53#[derive(Debug)]
54pub struct QueryHandle {
55    pub(crate) id: u64,
56    pub(crate) schema: SchemaRef,
57    pub(crate) sql: String,
58    pub(crate) subscription: Option<Subscription<ArrowRecord>>,
59    pub(crate) active: bool,
60    pub(crate) cancel_token: tokio_util::sync::CancellationToken,
61}
62
63impl QueryHandle {
64    /// Output schema.
65    #[must_use]
66    pub fn schema(&self) -> &SchemaRef {
67        &self.schema
68    }
69
70    /// SQL text.
71    #[must_use]
72    pub fn sql(&self) -> &str {
73        &self.sql
74    }
75
76    /// Query ID.
77    #[must_use]
78    pub fn id(&self) -> u64 {
79        self.id
80    }
81
82    /// Whether the query is still active.
83    #[must_use]
84    pub fn is_active(&self) -> bool {
85        self.active
86    }
87
88    /// Take the raw `RecordBatch` subscription for this query.
89    ///
90    /// The subscription is consumed on first call; subsequent calls error.
91    ///
92    /// # Errors
93    /// Returns `DbError::InvalidOperation` if the subscription was already consumed.
94    pub fn subscribe_raw(&mut self) -> Result<Subscription<ArrowRecord>, DbError> {
95        self.subscription
96            .take()
97            .ok_or_else(|| DbError::InvalidOperation("Subscription already consumed".to_string()))
98    }
99
100    /// Subscribe to typed results. `T` must derive `FromRecordBatch`.
101    ///
102    /// # Errors
103    /// Returns error if subscription was already consumed.
104    pub fn subscribe<T: FromBatch>(&mut self) -> Result<TypedSubscription<T>, DbError> {
105        let sub = self.subscribe_raw()?;
106        Ok(TypedSubscription {
107            inner: sub,
108            _phantom: PhantomData,
109        })
110    }
111
112    /// Cancel the query and drop its subscription.
113    pub fn cancel(&mut self) {
114        self.active = false;
115        self.subscription = None;
116        self.cancel_token.cancel();
117    }
118}
119
120impl Drop for QueryHandle {
121    fn drop(&mut self) {
122        self.cancel_token.cancel();
123    }
124}
125
126/// Deserialize rows from a `RecordBatch`. Auto-generated by `#[derive(FromRecordBatch)]`.
127pub trait FromBatch: Sized {
128    /// Single row.
129    fn from_batch(batch: &RecordBatch, row: usize) -> Self;
130    /// All rows.
131    fn from_batch_all(batch: &RecordBatch) -> Vec<Self>;
132}
133
134/// Typed subscription that deserializes `RecordBatch` rows.
135pub struct TypedSubscription<T: FromBatch> {
136    inner: Subscription<ArrowRecord>,
137    _phantom: PhantomData<T>,
138}
139
140impl<T: FromBatch> TypedSubscription<T> {
141    pub(crate) fn from_raw(sub: Subscription<ArrowRecord>) -> Self {
142        Self {
143            inner: sub,
144            _phantom: PhantomData,
145        }
146    }
147
148    /// Non-blocking poll for the next batch.
149    pub fn poll(&mut self) -> Option<Vec<T>> {
150        self.inner.poll().map(|batch| T::from_batch_all(&batch))
151    }
152
153    /// Blocking receive.
154    ///
155    /// # Errors
156    /// Returns `RecvError` if the channel is closed.
157    pub fn recv(&mut self) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
158        self.inner.recv().map(|batch| T::from_batch_all(&batch))
159    }
160
161    /// Receive with timeout.
162    ///
163    /// # Errors
164    /// Returns `RecvError` on timeout or closed channel.
165    pub fn recv_timeout(
166        &mut self,
167        timeout: Duration,
168    ) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
169        self.inner
170            .recv_timeout(timeout)
171            .map(|batch| T::from_batch_all(&batch))
172    }
173
174    /// Callback-based drain. Return `false` from `f` to stop.
175    pub fn poll_each<F: FnMut(T) -> bool>(&mut self, max_batches: usize, mut f: F) -> usize {
176        let mut count = 0;
177        for _ in 0..max_batches {
178            match self.inner.poll() {
179                Some(batch) => {
180                    let items = T::from_batch_all(&batch);
181                    for item in items {
182                        count += 1;
183                        if !f(item) {
184                            return count;
185                        }
186                    }
187                }
188                None => break,
189            }
190        }
191        count
192    }
193}
194
195impl<T: FromBatch> std::fmt::Debug for TypedSubscription<T> {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        f.debug_struct("TypedSubscription").finish()
198    }
199}
200
201/// Typed handle for pushing data into a registered source.
202pub struct SourceHandle<T: Record> {
203    entry: Arc<SourceEntry>,
204    _phantom: PhantomData<T>,
205}
206
207impl<T: Record> SourceHandle<T> {
208    /// Validates that `T`'s schema matches the source schema.
209    pub(crate) fn new(entry: Arc<SourceEntry>) -> Result<Self, DbError> {
210        let rust_schema = T::schema();
211        let sql_schema = &entry.schema;
212
213        // Validate field count matches
214        if rust_schema.fields().len() != sql_schema.fields().len() {
215            return Err(DbError::SchemaMismatch(format!(
216                "Rust type has {} fields but source '{}' has {} columns",
217                rust_schema.fields().len(),
218                entry.name,
219                sql_schema.fields().len()
220            )));
221        }
222
223        Ok(Self {
224            entry,
225            _phantom: PhantomData,
226        })
227    }
228
229    /// Push a single record.
230    ///
231    /// # Errors
232    /// Returns `StreamingError` if the pipeline is not running.
233    #[allow(clippy::needless_pass_by_value)]
234    pub fn push(&self, record: T) -> Result<(), laminar_core::streaming::StreamingError> {
235        let batch = record.to_record_batch();
236        self.entry.push_and_buffer(batch)
237    }
238
239    /// Push multiple records, returns count successfully sent.
240    pub fn push_batch(&self, records: impl IntoIterator<Item = T>) -> usize {
241        const BATCH_SIZE: usize = 1024;
242        let mut count = 0;
243        let mut buffer = Vec::with_capacity(BATCH_SIZE);
244
245        for record in records {
246            buffer.push(record);
247            if buffer.len() >= BATCH_SIZE {
248                let batch = T::to_record_batch_from_iter(buffer.drain(..));
249                if self.push_arrow(batch).is_err() {
250                    return count;
251                }
252                count += BATCH_SIZE;
253            }
254        }
255
256        if !buffer.is_empty() {
257            let len = buffer.len();
258            let batch = T::to_record_batch_from_iter(buffer);
259            if self.push_arrow(batch).is_ok() {
260                count += len;
261            }
262        }
263        count
264    }
265
266    /// Push a raw `RecordBatch` (sent to pipeline and buffered for snapshots).
267    ///
268    /// # Errors
269    /// Returns `StreamingError` if the pipeline is not running.
270    pub fn push_arrow(
271        &self,
272        batch: RecordBatch,
273    ) -> Result<(), laminar_core::streaming::StreamingError> {
274        self.entry.push_and_buffer(batch)
275    }
276
277    /// Emit a watermark.
278    pub fn watermark(&self, timestamp: i64) {
279        self.entry.source.watermark(timestamp);
280    }
281
282    /// Current watermark.
283    #[must_use]
284    pub fn current_watermark(&self) -> i64 {
285        self.entry.source.current_watermark()
286    }
287
288    /// Buffered record count.
289    #[must_use]
290    pub fn pending(&self) -> usize {
291        self.entry.source.pending()
292    }
293
294    /// Buffer capacity.
295    #[must_use]
296    pub fn capacity(&self) -> usize {
297        self.entry.source.capacity()
298    }
299
300    /// True when buffer is >80% full.
301    #[must_use]
302    pub fn is_backpressured(&self) -> bool {
303        crate::metrics::is_backpressured(self.pending(), self.capacity())
304    }
305
306    /// Source name.
307    #[must_use]
308    pub fn name(&self) -> &str {
309        &self.entry.name
310    }
311
312    /// Schema.
313    #[must_use]
314    pub fn schema(&self) -> &SchemaRef {
315        &self.entry.schema
316    }
317
318    /// Max out-of-orderness, if configured.
319    #[must_use]
320    pub fn max_out_of_orderness(&self) -> Option<Duration> {
321        self.entry.max_out_of_orderness
322    }
323
324    /// Set the event-time column for watermark-based late-row filtering.
325    pub fn set_event_time_column(&self, column: &str) {
326        self.entry.source.set_event_time_column(column);
327    }
328}
329
330impl<T: Record> std::fmt::Debug for SourceHandle<T> {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        f.debug_struct("SourceHandle")
333            .field("name", &self.entry.name)
334            .field("pending", &self.pending())
335            .finish()
336    }
337}
338
339/// Untyped handle for pushing raw `RecordBatch` data.
340pub struct UntypedSourceHandle {
341    entry: Arc<SourceEntry>,
342}
343
344impl UntypedSourceHandle {
345    pub(crate) fn new(entry: Arc<SourceEntry>) -> Self {
346        Self { entry }
347    }
348
349    /// Push a raw `RecordBatch` (sent to pipeline and buffered for snapshots).
350    ///
351    /// # Errors
352    /// Returns `StreamingError` if the pipeline is not running.
353    pub fn push_arrow(
354        &self,
355        batch: RecordBatch,
356    ) -> Result<(), laminar_core::streaming::StreamingError> {
357        self.entry.push_and_buffer(batch)
358    }
359
360    /// Emit a watermark.
361    pub fn watermark(&self, timestamp: i64) {
362        self.entry.source.watermark(timestamp);
363    }
364
365    /// Current watermark.
366    #[must_use]
367    pub fn current_watermark(&self) -> i64 {
368        self.entry.source.current_watermark()
369    }
370
371    /// Buffered record count.
372    #[must_use]
373    pub fn pending(&self) -> usize {
374        self.entry.source.pending()
375    }
376
377    /// Buffer capacity.
378    #[must_use]
379    pub fn capacity(&self) -> usize {
380        self.entry.source.capacity()
381    }
382
383    /// True when buffer is >80% full.
384    #[must_use]
385    pub fn is_backpressured(&self) -> bool {
386        crate::metrics::is_backpressured(self.pending(), self.capacity())
387    }
388
389    /// Source name.
390    #[must_use]
391    pub fn name(&self) -> &str {
392        &self.entry.name
393    }
394
395    /// Schema.
396    #[must_use]
397    pub fn schema(&self) -> &SchemaRef {
398        &self.entry.schema
399    }
400
401    /// Max out-of-orderness, if configured.
402    #[must_use]
403    pub fn max_out_of_orderness(&self) -> Option<Duration> {
404        self.entry.max_out_of_orderness
405    }
406
407    /// Set the event-time column for watermark-based late-row filtering.
408    pub fn set_event_time_column(&self, column: &str) {
409        self.entry.source.set_event_time_column(column);
410    }
411}
412
413impl std::fmt::Debug for UntypedSourceHandle {
414    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415        f.debug_struct("UntypedSourceHandle")
416            .field("name", &self.entry.name)
417            .finish()
418    }
419}
420
421/// Type of a node in the pipeline topology.
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub enum PipelineNodeType {
424    /// A data source (CREATE SOURCE).
425    Source,
426    /// A continuous stream (CREATE STREAM).
427    Stream,
428    /// A data sink (CREATE SINK).
429    Sink,
430}
431
432/// A node in the pipeline topology graph.
433#[derive(Debug, Clone)]
434pub struct PipelineNode {
435    /// Node name.
436    pub name: String,
437    /// Node type.
438    pub node_type: PipelineNodeType,
439    /// Arrow schema (sources only).
440    pub schema: Option<SchemaRef>,
441    /// SQL definition (streams only).
442    pub sql: Option<String>,
443}
444
445/// A directed edge in the pipeline topology graph.
446#[derive(Debug, Clone)]
447pub struct PipelineEdge {
448    /// Source node.
449    pub from: String,
450    /// Target node.
451    pub to: String,
452}
453
454/// The complete pipeline topology: nodes and edges.
455#[derive(Debug, Clone)]
456pub struct PipelineTopology {
457    /// Nodes.
458    pub nodes: Vec<PipelineNode>,
459    /// Edges.
460    pub edges: Vec<PipelineEdge>,
461}
462
463/// Metadata about a registered stream.
464#[derive(Debug, Clone)]
465pub struct StreamInfo {
466    /// Name.
467    pub name: String,
468    /// Defining SQL query.
469    pub sql: Option<String>,
470}
471
472/// Information about a registered source.
473#[derive(Debug, Clone)]
474pub struct SourceInfo {
475    /// Name.
476    pub name: String,
477    /// Schema.
478    pub schema: SchemaRef,
479    /// Watermark column, if configured.
480    pub watermark_column: Option<String>,
481}
482
483/// Information about a registered sink.
484#[derive(Debug, Clone)]
485pub struct SinkInfo {
486    /// Name.
487    pub name: String,
488}
489
490/// JSON-serializable projection of a `MaterializedView` for the control-plane
491/// HTTP API.
492#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
493pub struct MaterializedViewInfo {
494    /// View name.
495    pub name: String,
496    /// SQL definition.
497    pub sql: String,
498    /// Current execution state (e.g. `"Running"`, `"Dropping"`).
499    pub state: String,
500}
501
502impl From<&laminar_core::mv::MaterializedView> for MaterializedViewInfo {
503    fn from(view: &laminar_core::mv::MaterializedView) -> Self {
504        Self {
505            name: view.name.clone(),
506            sql: view.sql.clone(),
507            state: format!("{:?}", view.state),
508        }
509    }
510}
511
512/// Information about a running query.
513#[derive(Debug, Clone)]
514pub struct QueryInfo {
515    /// ID.
516    pub id: u64,
517    /// SQL text.
518    pub sql: String,
519    /// Active.
520    pub active: bool,
521}