laminar_db/pipeline/callback.rs
1//! Pipeline callback trait and source registration types.
2//!
3//! Decouples the pipeline coordinator from `db.rs` internals so the
4//! TPC coordinator can drive SQL cycles, sink writes, and checkpoints
5//! through a narrow interface.
6
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_connectors::config::ConnectorConfig;
12use laminar_connectors::connector::SourceConnector;
13use rustc_hash::FxHashMap;
14
15/// A registered source with its name and config.
16pub struct SourceRegistration {
17 /// Source name.
18 pub name: String,
19 /// The connector (owned).
20 pub connector: Box<dyn SourceConnector>,
21 /// Connector config (for open).
22 pub config: ConnectorConfig,
23 /// Whether this source supports replay from a checkpointed position.
24 pub supports_replay: bool,
25 /// Checkpoint to restore on startup (set during recovery).
26 ///
27 /// When `Some`, the source adapter calls `connector.restore()` after
28 /// `open()` to seek to the checkpointed position. This is how Kafka
29 /// sources resume from their last checkpoint offset on recovery.
30 pub restore_checkpoint: Option<SourceCheckpoint>,
31}
32
33/// Callback trait for the coordinator to interact with the rest of the DB.
34/// Trait exists for test seam; production impl is `ConnectorPipelineCallback`.
35#[trait_variant::make(Send)]
36pub trait PipelineCallback: Send + 'static {
37 /// Called with accumulated source batches to execute a SQL cycle.
38 async fn execute_cycle(
39 &mut self,
40 source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
41 watermark: i64,
42 ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>;
43
44 /// Called with results to push to stream subscriptions.
45 fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
46
47 /// Update materialized view stores with cycle results.
48 fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
49 let _ = results;
50 }
51
52 /// Called with results to write to sinks.
53 async fn write_to_sinks(&mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
54
55 /// Extract watermark from a batch for a given source.
56 fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
57
58 /// Filter late rows from a batch. Returns the filtered batch (or None if all late).
59 fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
60
61 /// Get the current pipeline watermark.
62 fn current_watermark(&self) -> i64;
63
64 /// Perform a periodic (timer-based) checkpoint. At-least-once semantics.
65 /// For exactly-once, use [`checkpoint_with_barrier`].
66 ///
67 /// [`checkpoint_with_barrier`]: PipelineCallback::checkpoint_with_barrier
68 async fn maybe_checkpoint(
69 &mut self,
70 force: bool,
71 source_offsets: FxHashMap<String, SourceCheckpoint>,
72 ) -> Option<u64>;
73
74 /// Called when all sources have aligned on a barrier.
75 async fn checkpoint_with_barrier(
76 &mut self,
77 source_checkpoints: FxHashMap<String, SourceCheckpoint>,
78 ) -> Option<u64>;
79
80 /// Record cycle metrics.
81 fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
82
83 /// Poll table sources for incremental CDC changes.
84 async fn poll_tables(&mut self);
85
86 /// Apply a DDL control message (add/drop stream) to the running pipeline.
87 fn apply_control(&mut self, msg: super::ControlMsg);
88
89 /// Returns `true` when internal buffers are near capacity.
90 fn is_backpressured(&self) -> bool {
91 false
92 }
93
94 /// Returns `true` when deferred operators have pending input to drain.
95 fn has_deferred_input(&self) -> bool {
96 false
97 }
98}