1use std::marker::PhantomData;
4use std::sync::Arc;
5use std::time::Duration;
6
7use arrow::array::RecordBatch;
8use arrow::datatypes::SchemaRef;
9
10use laminar_core::streaming::{Record, Subscription};
11
12use crate::catalog::{ArrowRecord, SourceEntry};
13use crate::DbError;
14
15#[derive(Debug)]
17pub enum ExecuteResult {
18 Ddl(DdlInfo),
20 Query(QueryHandle),
22 RowsAffected(u64),
24 Metadata(RecordBatch),
26}
27
28impl ExecuteResult {
29 pub fn into_query(self) -> Result<QueryHandle, DbError> {
34 match self {
35 Self::Query(q) => Ok(q),
36 _ => Err(DbError::InvalidOperation(
37 "Expected a query result".to_string(),
38 )),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct DdlInfo {
46 pub statement_type: String,
48 pub object_name: String,
50}
51
52#[derive(Debug)]
54pub struct QueryHandle {
55 pub(crate) id: u64,
56 pub(crate) schema: SchemaRef,
57 pub(crate) sql: String,
58 pub(crate) subscription: Option<Subscription<ArrowRecord>>,
59 pub(crate) active: bool,
60}
61
62impl QueryHandle {
63 #[must_use]
65 pub fn schema(&self) -> &SchemaRef {
66 &self.schema
67 }
68
69 #[must_use]
71 pub fn sql(&self) -> &str {
72 &self.sql
73 }
74
75 #[must_use]
77 pub fn id(&self) -> u64 {
78 self.id
79 }
80
81 #[must_use]
83 pub fn is_active(&self) -> bool {
84 self.active
85 }
86
87 pub(crate) fn subscribe_raw(&mut self) -> Result<Subscription<ArrowRecord>, DbError> {
88 self.subscription
89 .take()
90 .ok_or_else(|| DbError::InvalidOperation("Subscription already consumed".to_string()))
91 }
92
93 pub fn subscribe<T: FromBatch>(&mut self) -> Result<TypedSubscription<T>, DbError> {
98 let sub = self.subscribe_raw()?;
99 Ok(TypedSubscription {
100 inner: sub,
101 _phantom: PhantomData,
102 })
103 }
104
105 pub fn cancel(&mut self) {
107 self.active = false;
108 self.subscription = None;
109 }
110}
111
112pub trait FromBatch: Sized {
114 fn from_batch(batch: &RecordBatch, row: usize) -> Self;
116 fn from_batch_all(batch: &RecordBatch) -> Vec<Self>;
118}
119
120pub struct TypedSubscription<T: FromBatch> {
122 inner: Subscription<ArrowRecord>,
123 _phantom: PhantomData<T>,
124}
125
126impl<T: FromBatch> TypedSubscription<T> {
127 pub(crate) fn from_raw(sub: Subscription<ArrowRecord>) -> Self {
128 Self {
129 inner: sub,
130 _phantom: PhantomData,
131 }
132 }
133
134 pub fn poll(&mut self) -> Option<Vec<T>> {
136 self.inner.poll().map(|batch| T::from_batch_all(&batch))
137 }
138
139 pub fn recv(&mut self) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
144 self.inner.recv().map(|batch| T::from_batch_all(&batch))
145 }
146
147 pub fn recv_timeout(
152 &mut self,
153 timeout: Duration,
154 ) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
155 self.inner
156 .recv_timeout(timeout)
157 .map(|batch| T::from_batch_all(&batch))
158 }
159
160 pub fn poll_each<F: FnMut(T) -> bool>(&mut self, max_batches: usize, mut f: F) -> usize {
162 let mut count = 0;
163 for _ in 0..max_batches {
164 match self.inner.poll() {
165 Some(batch) => {
166 let items = T::from_batch_all(&batch);
167 for item in items {
168 count += 1;
169 if !f(item) {
170 return count;
171 }
172 }
173 }
174 None => break,
175 }
176 }
177 count
178 }
179}
180
181impl<T: FromBatch> std::fmt::Debug for TypedSubscription<T> {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_struct("TypedSubscription").finish()
184 }
185}
186
187pub struct SourceHandle<T: Record> {
189 entry: Arc<SourceEntry>,
190 _phantom: PhantomData<T>,
191}
192
193impl<T: Record> SourceHandle<T> {
194 pub(crate) fn new(entry: Arc<SourceEntry>) -> Result<Self, DbError> {
196 let rust_schema = T::schema();
197 let sql_schema = &entry.schema;
198
199 if rust_schema.fields().len() != sql_schema.fields().len() {
201 return Err(DbError::SchemaMismatch(format!(
202 "Rust type has {} fields but source '{}' has {} columns",
203 rust_schema.fields().len(),
204 entry.name,
205 sql_schema.fields().len()
206 )));
207 }
208
209 Ok(Self {
210 entry,
211 _phantom: PhantomData,
212 })
213 }
214
215 #[allow(clippy::needless_pass_by_value)]
220 pub fn push(&self, record: T) -> Result<(), laminar_core::streaming::StreamingError> {
221 let batch = record.to_record_batch();
222 self.entry.push_and_buffer(batch)
223 }
224
225 pub fn push_batch(&self, records: impl IntoIterator<Item = T>) -> usize {
227 const BATCH_SIZE: usize = 1024;
228 let mut count = 0;
229 let mut buffer = Vec::with_capacity(BATCH_SIZE);
230
231 for record in records {
232 buffer.push(record);
233 if buffer.len() >= BATCH_SIZE {
234 let batch = T::to_record_batch_from_iter(buffer.drain(..));
235 if self.push_arrow(batch).is_err() {
236 return count;
237 }
238 count += BATCH_SIZE;
239 }
240 }
241
242 if !buffer.is_empty() {
243 let len = buffer.len();
244 let batch = T::to_record_batch_from_iter(buffer);
245 if self.push_arrow(batch).is_ok() {
246 count += len;
247 }
248 }
249 count
250 }
251
252 pub fn push_arrow(
257 &self,
258 batch: RecordBatch,
259 ) -> Result<(), laminar_core::streaming::StreamingError> {
260 self.entry.push_and_buffer(batch)
261 }
262
263 pub fn watermark(&self, timestamp: i64) {
265 self.entry.source.watermark(timestamp);
266 }
267
268 #[must_use]
270 pub fn current_watermark(&self) -> i64 {
271 self.entry.source.current_watermark()
272 }
273
274 #[must_use]
276 pub fn pending(&self) -> usize {
277 self.entry.source.pending()
278 }
279
280 #[must_use]
282 pub fn capacity(&self) -> usize {
283 self.entry.source.capacity()
284 }
285
286 #[must_use]
288 pub fn is_backpressured(&self) -> bool {
289 crate::metrics::is_backpressured(self.pending(), self.capacity())
290 }
291
292 #[must_use]
294 pub fn name(&self) -> &str {
295 &self.entry.name
296 }
297
298 #[must_use]
300 pub fn schema(&self) -> &SchemaRef {
301 &self.entry.schema
302 }
303
304 #[must_use]
306 pub fn max_out_of_orderness(&self) -> Option<Duration> {
307 self.entry.max_out_of_orderness
308 }
309
310 pub fn set_event_time_column(&self, column: &str) {
312 self.entry.source.set_event_time_column(column);
313 }
314}
315
316impl<T: Record> std::fmt::Debug for SourceHandle<T> {
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 f.debug_struct("SourceHandle")
319 .field("name", &self.entry.name)
320 .field("pending", &self.pending())
321 .finish()
322 }
323}
324
325pub struct UntypedSourceHandle {
327 entry: Arc<SourceEntry>,
328}
329
330impl UntypedSourceHandle {
331 pub(crate) fn new(entry: Arc<SourceEntry>) -> Self {
332 Self { entry }
333 }
334
335 pub fn push_arrow(
340 &self,
341 batch: RecordBatch,
342 ) -> Result<(), laminar_core::streaming::StreamingError> {
343 self.entry.push_and_buffer(batch)
344 }
345
346 pub fn watermark(&self, timestamp: i64) {
348 self.entry.source.watermark(timestamp);
349 }
350
351 #[must_use]
353 pub fn current_watermark(&self) -> i64 {
354 self.entry.source.current_watermark()
355 }
356
357 #[must_use]
359 pub fn pending(&self) -> usize {
360 self.entry.source.pending()
361 }
362
363 #[must_use]
365 pub fn capacity(&self) -> usize {
366 self.entry.source.capacity()
367 }
368
369 #[must_use]
371 pub fn is_backpressured(&self) -> bool {
372 crate::metrics::is_backpressured(self.pending(), self.capacity())
373 }
374
375 #[must_use]
377 pub fn name(&self) -> &str {
378 &self.entry.name
379 }
380
381 #[must_use]
383 pub fn schema(&self) -> &SchemaRef {
384 &self.entry.schema
385 }
386
387 #[must_use]
389 pub fn max_out_of_orderness(&self) -> Option<Duration> {
390 self.entry.max_out_of_orderness
391 }
392
393 pub fn set_event_time_column(&self, column: &str) {
395 self.entry.source.set_event_time_column(column);
396 }
397}
398
399impl std::fmt::Debug for UntypedSourceHandle {
400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 f.debug_struct("UntypedSourceHandle")
402 .field("name", &self.entry.name)
403 .finish()
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
409pub enum PipelineNodeType {
410 Source,
412 Stream,
414 Sink,
416}
417
418#[derive(Debug, Clone)]
420pub struct PipelineNode {
421 pub name: String,
423 pub node_type: PipelineNodeType,
425 pub schema: Option<SchemaRef>,
427 pub sql: Option<String>,
429}
430
431#[derive(Debug, Clone)]
433pub struct PipelineEdge {
434 pub from: String,
436 pub to: String,
438}
439
440#[derive(Debug, Clone)]
442pub struct PipelineTopology {
443 pub nodes: Vec<PipelineNode>,
445 pub edges: Vec<PipelineEdge>,
447}
448
449#[derive(Debug, Clone)]
451pub struct StreamInfo {
452 pub name: String,
454 pub sql: Option<String>,
456}
457
458#[derive(Debug, Clone)]
460pub struct SourceInfo {
461 pub name: String,
463 pub schema: SchemaRef,
465 pub watermark_column: Option<String>,
467}
468
469#[derive(Debug, Clone)]
471pub struct SinkInfo {
472 pub name: String,
474}
475
476#[derive(Debug, Clone)]
478pub struct QueryInfo {
479 pub id: u64,
481 pub sql: String,
483 pub active: bool,
485}