1use std::marker::PhantomData;
4use std::sync::Arc;
5use std::time::Duration;
6
7use arrow::array::RecordBatch;
8use arrow::datatypes::SchemaRef;
9
10use laminar_core::streaming::{Record, Subscription};
11
12use crate::catalog::{ArrowRecord, SourceEntry};
13use crate::DbError;
14
15#[derive(Debug)]
17pub enum ExecuteResult {
18 Ddl(DdlInfo),
20 Query(QueryHandle),
22 RowsAffected(u64),
24 Metadata(RecordBatch),
26}
27
28impl ExecuteResult {
29 pub fn into_query(self) -> Result<QueryHandle, DbError> {
34 match self {
35 Self::Query(q) => Ok(q),
36 _ => Err(DbError::InvalidOperation(
37 "Expected a query result".to_string(),
38 )),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct DdlInfo {
46 pub statement_type: String,
48 pub object_name: String,
50}
51
52#[derive(Debug)]
54pub struct QueryHandle {
55 pub(crate) id: u64,
56 pub(crate) schema: SchemaRef,
57 pub(crate) sql: String,
58 pub(crate) subscription: Option<Subscription<ArrowRecord>>,
59 pub(crate) active: bool,
60 pub(crate) cancel_token: tokio_util::sync::CancellationToken,
61}
62
63impl QueryHandle {
64 #[must_use]
66 pub fn schema(&self) -> &SchemaRef {
67 &self.schema
68 }
69
70 #[must_use]
72 pub fn sql(&self) -> &str {
73 &self.sql
74 }
75
76 #[must_use]
78 pub fn id(&self) -> u64 {
79 self.id
80 }
81
82 #[must_use]
84 pub fn is_active(&self) -> bool {
85 self.active
86 }
87
88 pub fn subscribe_raw(&mut self) -> Result<Subscription<ArrowRecord>, DbError> {
95 self.subscription
96 .take()
97 .ok_or_else(|| DbError::InvalidOperation("Subscription already consumed".to_string()))
98 }
99
100 pub fn subscribe<T: FromBatch>(&mut self) -> Result<TypedSubscription<T>, DbError> {
105 let sub = self.subscribe_raw()?;
106 Ok(TypedSubscription {
107 inner: sub,
108 _phantom: PhantomData,
109 })
110 }
111
112 pub fn cancel(&mut self) {
114 self.active = false;
115 self.subscription = None;
116 self.cancel_token.cancel();
117 }
118}
119
120impl Drop for QueryHandle {
121 fn drop(&mut self) {
122 self.cancel_token.cancel();
123 }
124}
125
126pub trait FromBatch: Sized {
128 fn from_batch(batch: &RecordBatch, row: usize) -> Self;
130 fn from_batch_all(batch: &RecordBatch) -> Vec<Self>;
132}
133
134pub struct TypedSubscription<T: FromBatch> {
136 inner: Subscription<ArrowRecord>,
137 _phantom: PhantomData<T>,
138}
139
140impl<T: FromBatch> TypedSubscription<T> {
141 pub(crate) fn from_raw(sub: Subscription<ArrowRecord>) -> Self {
142 Self {
143 inner: sub,
144 _phantom: PhantomData,
145 }
146 }
147
148 pub fn poll(&mut self) -> Option<Vec<T>> {
150 self.inner.poll().map(|batch| T::from_batch_all(&batch))
151 }
152
153 pub fn recv(&mut self) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
158 self.inner.recv().map(|batch| T::from_batch_all(&batch))
159 }
160
161 pub fn recv_timeout(
166 &mut self,
167 timeout: Duration,
168 ) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
169 self.inner
170 .recv_timeout(timeout)
171 .map(|batch| T::from_batch_all(&batch))
172 }
173
174 pub fn poll_each<F: FnMut(T) -> bool>(&mut self, max_batches: usize, mut f: F) -> usize {
176 let mut count = 0;
177 for _ in 0..max_batches {
178 match self.inner.poll() {
179 Some(batch) => {
180 let items = T::from_batch_all(&batch);
181 for item in items {
182 count += 1;
183 if !f(item) {
184 return count;
185 }
186 }
187 }
188 None => break,
189 }
190 }
191 count
192 }
193}
194
195impl<T: FromBatch> std::fmt::Debug for TypedSubscription<T> {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 f.debug_struct("TypedSubscription").finish()
198 }
199}
200
201pub struct SourceHandle<T: Record> {
203 entry: Arc<SourceEntry>,
204 _phantom: PhantomData<T>,
205}
206
207impl<T: Record> SourceHandle<T> {
208 pub(crate) fn new(entry: Arc<SourceEntry>) -> Result<Self, DbError> {
210 let rust_schema = T::schema();
211 let sql_schema = &entry.schema;
212
213 if rust_schema.fields().len() != sql_schema.fields().len() {
215 return Err(DbError::SchemaMismatch(format!(
216 "Rust type has {} fields but source '{}' has {} columns",
217 rust_schema.fields().len(),
218 entry.name,
219 sql_schema.fields().len()
220 )));
221 }
222
223 Ok(Self {
224 entry,
225 _phantom: PhantomData,
226 })
227 }
228
229 #[allow(clippy::needless_pass_by_value)]
234 pub fn push(&self, record: T) -> Result<(), laminar_core::streaming::StreamingError> {
235 let batch = record.to_record_batch();
236 self.entry.push_and_buffer(batch)
237 }
238
239 pub fn push_batch(&self, records: impl IntoIterator<Item = T>) -> usize {
241 const BATCH_SIZE: usize = 1024;
242 let mut count = 0;
243 let mut buffer = Vec::with_capacity(BATCH_SIZE);
244
245 for record in records {
246 buffer.push(record);
247 if buffer.len() >= BATCH_SIZE {
248 let batch = T::to_record_batch_from_iter(buffer.drain(..));
249 if self.push_arrow(batch).is_err() {
250 return count;
251 }
252 count += BATCH_SIZE;
253 }
254 }
255
256 if !buffer.is_empty() {
257 let len = buffer.len();
258 let batch = T::to_record_batch_from_iter(buffer);
259 if self.push_arrow(batch).is_ok() {
260 count += len;
261 }
262 }
263 count
264 }
265
266 pub fn push_arrow(
271 &self,
272 batch: RecordBatch,
273 ) -> Result<(), laminar_core::streaming::StreamingError> {
274 self.entry.push_and_buffer(batch)
275 }
276
277 pub fn watermark(&self, timestamp: i64) {
279 self.entry.source.watermark(timestamp);
280 }
281
282 #[must_use]
284 pub fn current_watermark(&self) -> i64 {
285 self.entry.source.current_watermark()
286 }
287
288 #[must_use]
290 pub fn pending(&self) -> usize {
291 self.entry.source.pending()
292 }
293
294 #[must_use]
296 pub fn capacity(&self) -> usize {
297 self.entry.source.capacity()
298 }
299
300 #[must_use]
302 pub fn is_backpressured(&self) -> bool {
303 crate::metrics::is_backpressured(self.pending(), self.capacity())
304 }
305
306 #[must_use]
308 pub fn name(&self) -> &str {
309 &self.entry.name
310 }
311
312 #[must_use]
314 pub fn schema(&self) -> &SchemaRef {
315 &self.entry.schema
316 }
317
318 #[must_use]
320 pub fn max_out_of_orderness(&self) -> Option<Duration> {
321 self.entry.max_out_of_orderness
322 }
323
324 pub fn set_event_time_column(&self, column: &str) {
326 self.entry.source.set_event_time_column(column);
327 }
328}
329
330impl<T: Record> std::fmt::Debug for SourceHandle<T> {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 f.debug_struct("SourceHandle")
333 .field("name", &self.entry.name)
334 .field("pending", &self.pending())
335 .finish()
336 }
337}
338
339pub struct UntypedSourceHandle {
341 entry: Arc<SourceEntry>,
342}
343
344impl UntypedSourceHandle {
345 pub(crate) fn new(entry: Arc<SourceEntry>) -> Self {
346 Self { entry }
347 }
348
349 pub fn push_arrow(
354 &self,
355 batch: RecordBatch,
356 ) -> Result<(), laminar_core::streaming::StreamingError> {
357 self.entry.push_and_buffer(batch)
358 }
359
360 pub fn watermark(&self, timestamp: i64) {
362 self.entry.source.watermark(timestamp);
363 }
364
365 #[must_use]
367 pub fn current_watermark(&self) -> i64 {
368 self.entry.source.current_watermark()
369 }
370
371 #[must_use]
373 pub fn pending(&self) -> usize {
374 self.entry.source.pending()
375 }
376
377 #[must_use]
379 pub fn capacity(&self) -> usize {
380 self.entry.source.capacity()
381 }
382
383 #[must_use]
385 pub fn is_backpressured(&self) -> bool {
386 crate::metrics::is_backpressured(self.pending(), self.capacity())
387 }
388
389 #[must_use]
391 pub fn name(&self) -> &str {
392 &self.entry.name
393 }
394
395 #[must_use]
397 pub fn schema(&self) -> &SchemaRef {
398 &self.entry.schema
399 }
400
401 #[must_use]
403 pub fn max_out_of_orderness(&self) -> Option<Duration> {
404 self.entry.max_out_of_orderness
405 }
406
407 pub fn set_event_time_column(&self, column: &str) {
409 self.entry.source.set_event_time_column(column);
410 }
411}
412
413impl std::fmt::Debug for UntypedSourceHandle {
414 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415 f.debug_struct("UntypedSourceHandle")
416 .field("name", &self.entry.name)
417 .finish()
418 }
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub enum PipelineNodeType {
424 Source,
426 Stream,
428 Sink,
430}
431
432#[derive(Debug, Clone)]
434pub struct PipelineNode {
435 pub name: String,
437 pub node_type: PipelineNodeType,
439 pub schema: Option<SchemaRef>,
441 pub sql: Option<String>,
443}
444
445#[derive(Debug, Clone)]
447pub struct PipelineEdge {
448 pub from: String,
450 pub to: String,
452}
453
454#[derive(Debug, Clone)]
456pub struct PipelineTopology {
457 pub nodes: Vec<PipelineNode>,
459 pub edges: Vec<PipelineEdge>,
461}
462
463#[derive(Debug, Clone)]
465pub struct StreamInfo {
466 pub name: String,
468 pub sql: Option<String>,
470}
471
472#[derive(Debug, Clone)]
474pub struct SourceInfo {
475 pub name: String,
477 pub schema: SchemaRef,
479 pub watermark_column: Option<String>,
481}
482
483#[derive(Debug, Clone)]
485pub struct SinkInfo {
486 pub name: String,
488}
489
490#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
493pub struct MaterializedViewInfo {
494 pub name: String,
496 pub sql: String,
498 pub state: String,
500}
501
502impl From<&laminar_core::mv::MaterializedView> for MaterializedViewInfo {
503 fn from(view: &laminar_core::mv::MaterializedView) -> Self {
504 Self {
505 name: view.name.clone(),
506 sql: view.sql.clone(),
507 state: format!("{:?}", view.state),
508 }
509 }
510}
511
512#[derive(Debug, Clone)]
514pub struct QueryInfo {
515 pub id: u64,
517 pub sql: String,
519 pub active: bool,
521}