Skip to main content

laminar_db/
handle.rs

1//! Handle types for query results, source access, and subscriptions.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5use std::time::Duration;
6
7use arrow::array::RecordBatch;
8use arrow::datatypes::SchemaRef;
9
10use laminar_core::streaming::{Record, Subscription};
11
12use crate::catalog::{ArrowRecord, SourceEntry};
13use crate::DbError;
14
15/// Result of executing a SQL statement.
16#[derive(Debug)]
17pub enum ExecuteResult {
18    /// DDL statement completed (CREATE, DROP, ALTER).
19    Ddl(DdlInfo),
20    /// Query is running, subscribe to results.
21    Query(QueryHandle),
22    /// Rows were affected (INSERT INTO).
23    RowsAffected(u64),
24    /// Metadata result (SHOW, DESCRIBE).
25    Metadata(RecordBatch),
26}
27
28impl ExecuteResult {
29    /// Convert to `QueryHandle`, or error if this is not a query result.
30    ///
31    /// # Errors
32    /// Returns `DbError::InvalidOperation` if result is not a query.
33    pub fn into_query(self) -> Result<QueryHandle, DbError> {
34        match self {
35            Self::Query(q) => Ok(q),
36            _ => Err(DbError::InvalidOperation(
37                "Expected a query result".to_string(),
38            )),
39        }
40    }
41}
42
43/// Information about a completed DDL statement.
44#[derive(Debug, Clone)]
45pub struct DdlInfo {
46    /// e.g. `"CREATE SOURCE"`.
47    pub statement_type: String,
48    /// Object affected.
49    pub object_name: String,
50}
51
52/// Handle to a running streaming query.
53#[derive(Debug)]
54pub struct QueryHandle {
55    pub(crate) id: u64,
56    pub(crate) schema: SchemaRef,
57    pub(crate) sql: String,
58    pub(crate) subscription: Option<Subscription<ArrowRecord>>,
59    pub(crate) active: bool,
60}
61
62impl QueryHandle {
63    /// Output schema.
64    #[must_use]
65    pub fn schema(&self) -> &SchemaRef {
66        &self.schema
67    }
68
69    /// SQL text.
70    #[must_use]
71    pub fn sql(&self) -> &str {
72        &self.sql
73    }
74
75    /// Query ID.
76    #[must_use]
77    pub fn id(&self) -> u64 {
78        self.id
79    }
80
81    /// Whether the query is still active.
82    #[must_use]
83    pub fn is_active(&self) -> bool {
84        self.active
85    }
86
87    pub(crate) fn subscribe_raw(&mut self) -> Result<Subscription<ArrowRecord>, DbError> {
88        self.subscription
89            .take()
90            .ok_or_else(|| DbError::InvalidOperation("Subscription already consumed".to_string()))
91    }
92
93    /// Subscribe to typed results. `T` must derive `FromRecordBatch`.
94    ///
95    /// # Errors
96    /// Returns error if subscription was already consumed.
97    pub fn subscribe<T: FromBatch>(&mut self) -> Result<TypedSubscription<T>, DbError> {
98        let sub = self.subscribe_raw()?;
99        Ok(TypedSubscription {
100            inner: sub,
101            _phantom: PhantomData,
102        })
103    }
104
105    /// Cancel the query and drop its subscription.
106    pub fn cancel(&mut self) {
107        self.active = false;
108        self.subscription = None;
109    }
110}
111
112/// Deserialize rows from a `RecordBatch`. Auto-generated by `#[derive(FromRecordBatch)]`.
113pub trait FromBatch: Sized {
114    /// Single row.
115    fn from_batch(batch: &RecordBatch, row: usize) -> Self;
116    /// All rows.
117    fn from_batch_all(batch: &RecordBatch) -> Vec<Self>;
118}
119
120/// Typed subscription that deserializes `RecordBatch` rows.
121pub struct TypedSubscription<T: FromBatch> {
122    inner: Subscription<ArrowRecord>,
123    _phantom: PhantomData<T>,
124}
125
126impl<T: FromBatch> TypedSubscription<T> {
127    pub(crate) fn from_raw(sub: Subscription<ArrowRecord>) -> Self {
128        Self {
129            inner: sub,
130            _phantom: PhantomData,
131        }
132    }
133
134    /// Non-blocking poll for the next batch.
135    pub fn poll(&mut self) -> Option<Vec<T>> {
136        self.inner.poll().map(|batch| T::from_batch_all(&batch))
137    }
138
139    /// Blocking receive.
140    ///
141    /// # Errors
142    /// Returns `RecvError` if the channel is closed.
143    pub fn recv(&mut self) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
144        self.inner.recv().map(|batch| T::from_batch_all(&batch))
145    }
146
147    /// Receive with timeout.
148    ///
149    /// # Errors
150    /// Returns `RecvError` on timeout or closed channel.
151    pub fn recv_timeout(
152        &mut self,
153        timeout: Duration,
154    ) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
155        self.inner
156            .recv_timeout(timeout)
157            .map(|batch| T::from_batch_all(&batch))
158    }
159
160    /// Callback-based drain. Return `false` from `f` to stop.
161    pub fn poll_each<F: FnMut(T) -> bool>(&mut self, max_batches: usize, mut f: F) -> usize {
162        let mut count = 0;
163        for _ in 0..max_batches {
164            match self.inner.poll() {
165                Some(batch) => {
166                    let items = T::from_batch_all(&batch);
167                    for item in items {
168                        count += 1;
169                        if !f(item) {
170                            return count;
171                        }
172                    }
173                }
174                None => break,
175            }
176        }
177        count
178    }
179}
180
181impl<T: FromBatch> std::fmt::Debug for TypedSubscription<T> {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        f.debug_struct("TypedSubscription").finish()
184    }
185}
186
187/// Typed handle for pushing data into a registered source.
188pub struct SourceHandle<T: Record> {
189    entry: Arc<SourceEntry>,
190    _phantom: PhantomData<T>,
191}
192
193impl<T: Record> SourceHandle<T> {
194    /// Validates that `T`'s schema matches the source schema.
195    pub(crate) fn new(entry: Arc<SourceEntry>) -> Result<Self, DbError> {
196        let rust_schema = T::schema();
197        let sql_schema = &entry.schema;
198
199        // Validate field count matches
200        if rust_schema.fields().len() != sql_schema.fields().len() {
201            return Err(DbError::SchemaMismatch(format!(
202                "Rust type has {} fields but source '{}' has {} columns",
203                rust_schema.fields().len(),
204                entry.name,
205                sql_schema.fields().len()
206            )));
207        }
208
209        Ok(Self {
210            entry,
211            _phantom: PhantomData,
212        })
213    }
214
215    /// Push a single record.
216    ///
217    /// # Errors
218    /// Returns `StreamingError` if the pipeline is not running.
219    #[allow(clippy::needless_pass_by_value)]
220    pub fn push(&self, record: T) -> Result<(), laminar_core::streaming::StreamingError> {
221        let batch = record.to_record_batch();
222        self.entry.push_and_buffer(batch)
223    }
224
225    /// Push multiple records, returns count successfully sent.
226    pub fn push_batch(&self, records: impl IntoIterator<Item = T>) -> usize {
227        const BATCH_SIZE: usize = 1024;
228        let mut count = 0;
229        let mut buffer = Vec::with_capacity(BATCH_SIZE);
230
231        for record in records {
232            buffer.push(record);
233            if buffer.len() >= BATCH_SIZE {
234                let batch = T::to_record_batch_from_iter(buffer.drain(..));
235                if self.push_arrow(batch).is_err() {
236                    return count;
237                }
238                count += BATCH_SIZE;
239            }
240        }
241
242        if !buffer.is_empty() {
243            let len = buffer.len();
244            let batch = T::to_record_batch_from_iter(buffer);
245            if self.push_arrow(batch).is_ok() {
246                count += len;
247            }
248        }
249        count
250    }
251
252    /// Push a raw `RecordBatch` (sent to pipeline and buffered for snapshots).
253    ///
254    /// # Errors
255    /// Returns `StreamingError` if the pipeline is not running.
256    pub fn push_arrow(
257        &self,
258        batch: RecordBatch,
259    ) -> Result<(), laminar_core::streaming::StreamingError> {
260        self.entry.push_and_buffer(batch)
261    }
262
263    /// Emit a watermark.
264    pub fn watermark(&self, timestamp: i64) {
265        self.entry.source.watermark(timestamp);
266    }
267
268    /// Current watermark.
269    #[must_use]
270    pub fn current_watermark(&self) -> i64 {
271        self.entry.source.current_watermark()
272    }
273
274    /// Buffered record count.
275    #[must_use]
276    pub fn pending(&self) -> usize {
277        self.entry.source.pending()
278    }
279
280    /// Buffer capacity.
281    #[must_use]
282    pub fn capacity(&self) -> usize {
283        self.entry.source.capacity()
284    }
285
286    /// True when buffer is >80% full.
287    #[must_use]
288    pub fn is_backpressured(&self) -> bool {
289        crate::metrics::is_backpressured(self.pending(), self.capacity())
290    }
291
292    /// Source name.
293    #[must_use]
294    pub fn name(&self) -> &str {
295        &self.entry.name
296    }
297
298    /// Schema.
299    #[must_use]
300    pub fn schema(&self) -> &SchemaRef {
301        &self.entry.schema
302    }
303
304    /// Max out-of-orderness, if configured.
305    #[must_use]
306    pub fn max_out_of_orderness(&self) -> Option<Duration> {
307        self.entry.max_out_of_orderness
308    }
309
310    /// Set the event-time column for watermark-based late-row filtering.
311    pub fn set_event_time_column(&self, column: &str) {
312        self.entry.source.set_event_time_column(column);
313    }
314}
315
316impl<T: Record> std::fmt::Debug for SourceHandle<T> {
317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318        f.debug_struct("SourceHandle")
319            .field("name", &self.entry.name)
320            .field("pending", &self.pending())
321            .finish()
322    }
323}
324
325/// Untyped handle for pushing raw `RecordBatch` data.
326pub struct UntypedSourceHandle {
327    entry: Arc<SourceEntry>,
328}
329
330impl UntypedSourceHandle {
331    pub(crate) fn new(entry: Arc<SourceEntry>) -> Self {
332        Self { entry }
333    }
334
335    /// Push a raw `RecordBatch` (sent to pipeline and buffered for snapshots).
336    ///
337    /// # Errors
338    /// Returns `StreamingError` if the pipeline is not running.
339    pub fn push_arrow(
340        &self,
341        batch: RecordBatch,
342    ) -> Result<(), laminar_core::streaming::StreamingError> {
343        self.entry.push_and_buffer(batch)
344    }
345
346    /// Emit a watermark.
347    pub fn watermark(&self, timestamp: i64) {
348        self.entry.source.watermark(timestamp);
349    }
350
351    /// Current watermark.
352    #[must_use]
353    pub fn current_watermark(&self) -> i64 {
354        self.entry.source.current_watermark()
355    }
356
357    /// Buffered record count.
358    #[must_use]
359    pub fn pending(&self) -> usize {
360        self.entry.source.pending()
361    }
362
363    /// Buffer capacity.
364    #[must_use]
365    pub fn capacity(&self) -> usize {
366        self.entry.source.capacity()
367    }
368
369    /// True when buffer is >80% full.
370    #[must_use]
371    pub fn is_backpressured(&self) -> bool {
372        crate::metrics::is_backpressured(self.pending(), self.capacity())
373    }
374
375    /// Source name.
376    #[must_use]
377    pub fn name(&self) -> &str {
378        &self.entry.name
379    }
380
381    /// Schema.
382    #[must_use]
383    pub fn schema(&self) -> &SchemaRef {
384        &self.entry.schema
385    }
386
387    /// Max out-of-orderness, if configured.
388    #[must_use]
389    pub fn max_out_of_orderness(&self) -> Option<Duration> {
390        self.entry.max_out_of_orderness
391    }
392
393    /// Set the event-time column for watermark-based late-row filtering.
394    pub fn set_event_time_column(&self, column: &str) {
395        self.entry.source.set_event_time_column(column);
396    }
397}
398
399impl std::fmt::Debug for UntypedSourceHandle {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        f.debug_struct("UntypedSourceHandle")
402            .field("name", &self.entry.name)
403            .finish()
404    }
405}
406
407/// Type of a node in the pipeline topology.
408#[derive(Debug, Clone, Copy, PartialEq, Eq)]
409pub enum PipelineNodeType {
410    /// A data source (CREATE SOURCE).
411    Source,
412    /// A continuous stream (CREATE STREAM).
413    Stream,
414    /// A data sink (CREATE SINK).
415    Sink,
416}
417
418/// A node in the pipeline topology graph.
419#[derive(Debug, Clone)]
420pub struct PipelineNode {
421    /// Node name.
422    pub name: String,
423    /// Node type.
424    pub node_type: PipelineNodeType,
425    /// Arrow schema (sources only).
426    pub schema: Option<SchemaRef>,
427    /// SQL definition (streams only).
428    pub sql: Option<String>,
429}
430
431/// A directed edge in the pipeline topology graph.
432#[derive(Debug, Clone)]
433pub struct PipelineEdge {
434    /// Source node.
435    pub from: String,
436    /// Target node.
437    pub to: String,
438}
439
440/// The complete pipeline topology: nodes and edges.
441#[derive(Debug, Clone)]
442pub struct PipelineTopology {
443    /// Nodes.
444    pub nodes: Vec<PipelineNode>,
445    /// Edges.
446    pub edges: Vec<PipelineEdge>,
447}
448
449/// Metadata about a registered stream.
450#[derive(Debug, Clone)]
451pub struct StreamInfo {
452    /// Name.
453    pub name: String,
454    /// Defining SQL query.
455    pub sql: Option<String>,
456}
457
458/// Information about a registered source.
459#[derive(Debug, Clone)]
460pub struct SourceInfo {
461    /// Name.
462    pub name: String,
463    /// Schema.
464    pub schema: SchemaRef,
465    /// Watermark column, if configured.
466    pub watermark_column: Option<String>,
467}
468
469/// Information about a registered sink.
470#[derive(Debug, Clone)]
471pub struct SinkInfo {
472    /// Name.
473    pub name: String,
474}
475
476/// Information about a running query.
477#[derive(Debug, Clone)]
478pub struct QueryInfo {
479    /// ID.
480    pub id: u64,
481    /// SQL text.
482    pub sql: String,
483    /// Active.
484    pub active: bool,
485}