1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use arrow::array::RecordBatch;
10use arrow::datatypes::SchemaRef;
11use parking_lot::RwLock;
12use tokio::sync::Notify;
13
14use laminar_core::streaming::{self, BackpressureStrategy, SourceConfig, WaitStrategy};
15
16#[derive(Clone, Debug)]
18pub(crate) struct ArrowRecord {
19 pub(crate) batch: RecordBatch,
21}
22
23impl laminar_core::streaming::Record for ArrowRecord {
24 fn schema() -> SchemaRef {
25 Arc::new(arrow::datatypes::Schema::empty())
28 }
29
30 fn to_record_batch(&self) -> RecordBatch {
31 self.batch.clone()
32 }
33}
34
35struct SnapshotRing {
41 slots: Box<[parking_lot::Mutex<Option<RecordBatch>>]>,
42 tail: AtomicUsize,
44 capacity: usize,
45}
46
47impl SnapshotRing {
48 fn new(capacity: usize) -> Self {
49 let cap = capacity.max(1);
50 let slots: Vec<_> = (0..cap).map(|_| parking_lot::Mutex::new(None)).collect();
51 Self {
52 slots: slots.into_boxed_slice(),
53 tail: AtomicUsize::new(0),
54 capacity: cap,
55 }
56 }
57
58 fn push(&self, batch: RecordBatch) {
59 let idx = self.tail.fetch_add(1, Ordering::Relaxed) % self.capacity;
61 *self.slots[idx].lock() = Some(batch);
62 }
63
64 fn snapshot(&self) -> Vec<RecordBatch> {
65 let tail = self.tail.load(Ordering::Acquire);
66 let count = tail.min(self.capacity);
67 let start = if tail <= self.capacity {
69 0
70 } else {
71 tail % self.capacity
72 };
73 let mut result = Vec::with_capacity(count);
74 for i in 0..count {
75 let idx = (start + i) % self.capacity;
76 if let Some(batch) = self.slots[idx].lock().as_ref() {
77 result.push(batch.clone());
78 }
79 }
80 result
81 }
82}
83
84pub struct SourceEntry {
86 pub name: String,
88 pub schema: SchemaRef,
90 pub watermark_column: Option<String>,
92 pub max_out_of_orderness: Option<Duration>,
94 pub is_processing_time: std::sync::atomic::AtomicBool,
96 pub(crate) source: streaming::Source<ArrowRecord>,
98 pub(crate) sink: streaming::Sink<ArrowRecord>,
100 buffer: SnapshotRing,
102 data_notify: Arc<Notify>,
104}
105
106impl SourceEntry {
107 pub(crate) fn push_and_buffer(
113 &self,
114 batch: RecordBatch,
115 ) -> Result<(), laminar_core::streaming::StreamingError> {
116 self.source.push_arrow(batch.clone())?;
117 self.buffer.push(batch);
118 self.data_notify.notify_one();
122 Ok(())
123 }
124
125 pub(crate) fn snapshot(&self) -> Vec<RecordBatch> {
127 self.buffer.snapshot()
128 }
129
130 pub(crate) fn data_notify(&self) -> Arc<Notify> {
132 Arc::clone(&self.data_notify)
133 }
134}
135
136#[allow(dead_code)]
138pub(crate) struct SinkEntry {
139 pub(crate) name: String,
141 pub(crate) input: String,
143}
144
145pub(crate) struct QueryEntry {
147 pub(crate) id: u64,
149 pub(crate) sql: String,
151 pub(crate) active: bool,
153}
154
155#[allow(dead_code)]
157pub(crate) struct StreamEntry {
158 pub(crate) name: String,
160 pub(crate) source: streaming::Source<ArrowRecord>,
162 pub(crate) sink: streaming::Sink<ArrowRecord>,
164}
165
166pub struct SourceCatalog {
168 sources: RwLock<HashMap<String, Arc<SourceEntry>>>,
169 sinks: RwLock<HashMap<String, SinkEntry>>,
170 streams: RwLock<HashMap<String, Arc<StreamEntry>>>,
171 queries: RwLock<HashMap<u64, QueryEntry>>,
172 next_query_id: AtomicU64,
173 default_buffer_size: usize,
174 default_backpressure: BackpressureStrategy,
175}
176
177impl SourceCatalog {
178 #[must_use]
180 pub fn new(buffer_size: usize, backpressure: BackpressureStrategy) -> Self {
181 Self {
182 sources: RwLock::new(HashMap::new()),
183 sinks: RwLock::new(HashMap::new()),
184 streams: RwLock::new(HashMap::new()),
185 queries: RwLock::new(HashMap::new()),
186 next_query_id: AtomicU64::new(1),
187 default_buffer_size: buffer_size,
188 default_backpressure: backpressure,
189 }
190 }
191
192 #[allow(clippy::too_many_arguments)]
194 pub(crate) fn register_source(
195 &self,
196 name: &str,
197 schema: SchemaRef,
198 watermark_column: Option<String>,
199 max_out_of_orderness: Option<Duration>,
200 buffer_size: Option<usize>,
201 backpressure: Option<BackpressureStrategy>,
202 ) -> Result<Arc<SourceEntry>, crate::DbError> {
203 let mut sources = self.sources.write();
204 if sources.contains_key(name) {
205 return Err(crate::DbError::SourceAlreadyExists(name.to_string()));
206 }
207
208 let buf_size = buffer_size.unwrap_or(self.default_buffer_size);
209 let bp = backpressure.unwrap_or(self.default_backpressure);
210
211 let config = SourceConfig {
212 channel: streaming::ChannelConfig {
213 buffer_size: buf_size,
214 backpressure: bp,
215 wait_strategy: WaitStrategy::SpinYield,
216 track_stats: false,
217 },
218 name: Some(name.to_string()),
219 };
220
221 let (source, sink) = streaming::create_with_config::<ArrowRecord>(config);
222
223 let entry = Arc::new(SourceEntry {
224 name: name.to_string(),
225 schema,
226 watermark_column,
227 max_out_of_orderness,
228 is_processing_time: std::sync::atomic::AtomicBool::new(false),
229 source,
230 sink,
231 buffer: SnapshotRing::new(buf_size),
232 data_notify: Arc::new(Notify::new()),
233 });
234
235 sources.insert(name.to_string(), Arc::clone(&entry));
236 Ok(entry)
237 }
238
239 pub(crate) fn register_source_or_replace(
241 &self,
242 name: &str,
243 schema: SchemaRef,
244 watermark_column: Option<String>,
245 max_out_of_orderness: Option<Duration>,
246 buffer_size: Option<usize>,
247 backpressure: Option<BackpressureStrategy>,
248 ) -> Arc<SourceEntry> {
249 self.sources.write().remove(name);
251 self.register_source(
253 name,
254 schema,
255 watermark_column,
256 max_out_of_orderness,
257 buffer_size,
258 backpressure,
259 )
260 .unwrap()
261 }
262
263 pub fn get_source(&self, name: &str) -> Option<Arc<SourceEntry>> {
265 self.sources.read().get(name).cloned()
266 }
267
268 pub fn drop_source(&self, name: &str) -> bool {
270 self.sources.write().remove(name).is_some()
271 }
272
273 pub(crate) fn register_sink(&self, name: &str, input: &str) -> Result<(), crate::DbError> {
275 let mut sinks = self.sinks.write();
276 if sinks.contains_key(name) {
277 return Err(crate::DbError::SinkAlreadyExists(name.to_string()));
278 }
279 sinks.insert(
280 name.to_string(),
281 SinkEntry {
282 name: name.to_string(),
283 input: input.to_string(),
284 },
285 );
286 Ok(())
287 }
288
289 pub fn drop_sink(&self, name: &str) -> bool {
291 self.sinks.write().remove(name).is_some()
292 }
293
294 pub(crate) fn register_stream(&self, name: &str) -> Result<(), crate::DbError> {
296 let mut streams = self.streams.write();
297 if streams.contains_key(name) {
298 return Err(crate::DbError::StreamAlreadyExists(name.to_string()));
299 }
300
301 let config = SourceConfig {
302 channel: streaming::ChannelConfig {
303 buffer_size: self.default_buffer_size,
304 backpressure: self.default_backpressure,
305 wait_strategy: WaitStrategy::SpinYield,
306 track_stats: false,
307 },
308 name: Some(name.to_string()),
309 };
310
311 let (source, sink) = streaming::create_with_config::<ArrowRecord>(config);
312
313 streams.insert(
314 name.to_string(),
315 Arc::new(StreamEntry {
316 name: name.to_string(),
317 source,
318 sink,
319 }),
320 );
321 Ok(())
322 }
323
324 pub(crate) fn get_stream_subscription(
326 &self,
327 name: &str,
328 ) -> Option<streaming::Subscription<ArrowRecord>> {
329 self.streams
330 .read()
331 .get(name)
332 .map(|entry| entry.sink.subscribe())
333 }
334
335 pub(crate) fn get_stream_entry(&self, name: &str) -> Option<Arc<StreamEntry>> {
337 self.streams.read().get(name).cloned()
338 }
339
340 pub(crate) fn get_stream_source(&self, name: &str) -> Option<streaming::Source<ArrowRecord>> {
342 self.streams
343 .read()
344 .get(name)
345 .map(|entry| entry.source.clone())
346 }
347
348 pub fn drop_stream(&self, name: &str) -> bool {
350 self.streams.write().remove(name).is_some()
351 }
352
353 pub fn list_streams(&self) -> Vec<String> {
355 self.streams.read().keys().cloned().collect()
356 }
357
358 pub fn list_sources(&self) -> Vec<String> {
360 self.sources.read().keys().cloned().collect()
361 }
362
363 pub fn list_sinks(&self) -> Vec<String> {
365 self.sinks.read().keys().cloned().collect()
366 }
367
368 pub fn get_sink_input(&self, name: &str) -> Option<String> {
370 self.sinks.read().get(name).map(|e| e.input.clone())
371 }
372
373 pub(crate) fn register_query(&self, sql: &str) -> u64 {
375 let id = self.next_query_id.fetch_add(1, Ordering::Relaxed);
376 let mut queries = self.queries.write();
377 queries.insert(
378 id,
379 QueryEntry {
380 id,
381 sql: sql.to_string(),
382 active: true,
383 },
384 );
385 id
386 }
387
388 pub(crate) fn deactivate_query(&self, id: u64) -> bool {
390 if let Some(entry) = self.queries.write().get_mut(&id) {
391 entry.active = false;
392 true
393 } else {
394 false
395 }
396 }
397
398 pub(crate) fn list_queries(&self) -> Vec<(u64, String, bool)> {
400 self.queries
401 .read()
402 .values()
403 .map(|q| (q.id, q.sql.clone(), q.active))
404 .collect()
405 }
406
407 pub fn describe_source(&self, name: &str) -> Option<SchemaRef> {
409 self.sources.read().get(name).map(|e| e.schema.clone())
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use arrow::datatypes::{DataType, Field, Schema};
417
418 fn test_schema() -> SchemaRef {
419 Arc::new(Schema::new(vec![
420 Field::new("id", DataType::Int64, false),
421 Field::new("value", DataType::Float64, false),
422 ]))
423 }
424
425 #[test]
426 fn test_register_source() {
427 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
428 let result = catalog.register_source("test", test_schema(), None, None, None, None);
429 assert!(result.is_ok());
430 assert!(catalog.get_source("test").is_some());
431 }
432
433 #[test]
434 fn test_register_duplicate_source() {
435 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
436 catalog
437 .register_source("test", test_schema(), None, None, None, None)
438 .unwrap();
439 let result = catalog.register_source("test", test_schema(), None, None, None, None);
440 assert!(matches!(
441 result,
442 Err(crate::DbError::SourceAlreadyExists(_))
443 ));
444 }
445
446 #[test]
447 fn test_drop_source() {
448 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
449 catalog
450 .register_source("test", test_schema(), None, None, None, None)
451 .unwrap();
452 assert!(catalog.drop_source("test"));
453 assert!(catalog.get_source("test").is_none());
454 }
455
456 #[test]
457 fn test_list_sources() {
458 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
459 catalog
460 .register_source("a", test_schema(), None, None, None, None)
461 .unwrap();
462 catalog
463 .register_source("b", test_schema(), None, None, None, None)
464 .unwrap();
465 let mut names = catalog.list_sources();
466 names.sort();
467 assert_eq!(names, vec!["a", "b"]);
468 }
469
470 #[test]
471 fn test_register_sink() {
472 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
473 assert!(catalog.register_sink("output", "events").is_ok());
474 assert_eq!(catalog.list_sinks(), vec!["output"]);
475 }
476
477 #[test]
478 fn test_register_query() {
479 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
480 let id = catalog.register_query("SELECT * FROM events");
481 assert_eq!(id, 1);
482 let queries = catalog.list_queries();
483 assert_eq!(queries.len(), 1);
484 assert!(queries[0].2); }
486
487 #[test]
488 fn test_deactivate_query() {
489 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
490 let id = catalog.register_query("SELECT * FROM events");
491 catalog.deactivate_query(id);
492 let queries = catalog.list_queries();
493 assert!(!queries[0].2); }
495
496 #[test]
497 fn test_describe_source() {
498 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
499 let schema = test_schema();
500 catalog
501 .register_source("test", schema.clone(), None, None, None, None)
502 .unwrap();
503 let result = catalog.describe_source("test");
504 assert!(result.is_some());
505 assert_eq!(result.unwrap().fields().len(), 2);
506 }
507
508 #[test]
509 fn test_or_replace() {
510 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
511 catalog
512 .register_source("test", test_schema(), None, None, None, None)
513 .unwrap();
514 let entry = catalog.register_source_or_replace(
515 "test",
516 test_schema(),
517 Some("ts".into()),
518 None,
519 None,
520 None,
521 );
522 assert_eq!(entry.watermark_column, Some("ts".to_string()));
523 }
524
525 #[test]
526 fn test_push_and_buffer_snapshot() {
527 let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
528 let schema = test_schema();
529 let entry = catalog
530 .register_source("test", schema.clone(), None, None, None, None)
531 .unwrap();
532
533 let batch = RecordBatch::try_new(
534 schema,
535 vec![
536 Arc::new(arrow::array::Int64Array::from(vec![1])),
537 Arc::new(arrow::array::Float64Array::from(vec![1.5])),
538 ],
539 )
540 .unwrap();
541
542 entry.push_and_buffer(batch).unwrap();
543 let snap = entry.snapshot();
544 assert_eq!(snap.len(), 1);
545 assert_eq!(snap[0].num_rows(), 1);
546 }
547
548 #[test]
549 fn test_buffer_capacity_drops_oldest() {
550 let catalog = SourceCatalog::new(2, BackpressureStrategy::DropOldest);
552 let schema = test_schema();
553 let entry = catalog
554 .register_source("test", schema.clone(), None, None, None, None)
555 .unwrap();
556
557 let values: [(i64, f64); 3] = [(0, 1.0), (1, 2.0), (2, 3.0)];
558 for (id, val) in values {
559 let batch = RecordBatch::try_new(
560 schema.clone(),
561 vec![
562 Arc::new(arrow::array::Int64Array::from(vec![id])),
563 Arc::new(arrow::array::Float64Array::from(vec![val])),
564 ],
565 )
566 .unwrap();
567 entry.push_and_buffer(batch).unwrap();
568 }
569
570 let snap = entry.snapshot();
571 assert_eq!(snap.len(), 2);
573 let col = snap[0]
574 .column(0)
575 .as_any()
576 .downcast_ref::<arrow::array::Int64Array>()
577 .unwrap();
578 assert_eq!(col.value(0), 1); }
580}