Skip to main content

laminar_db/pipeline/
callback.rs

1//! Pipeline callback trait and source registration types.
2//!
3//! Decouples the pipeline coordinator from `db.rs` internals so the
4//! TPC coordinator can drive SQL cycles, sink writes, and checkpoints
5//! through a narrow interface.
6
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_connectors::config::ConnectorConfig;
12use laminar_connectors::connector::SourceConnector;
13use rustc_hash::FxHashMap;
14
15/// A registered source with its name and config.
16pub struct SourceRegistration {
17    /// Source name.
18    pub name: String,
19    /// The connector (owned).
20    pub connector: Box<dyn SourceConnector>,
21    /// Connector config (for open).
22    pub config: ConnectorConfig,
23    /// Whether this source supports replay from a checkpointed position.
24    pub supports_replay: bool,
25    /// Checkpoint to restore on startup (set during recovery).
26    ///
27    /// When `Some`, the source adapter calls `connector.restore()` after
28    /// `open()` to seek to the checkpointed position. This is how Kafka
29    /// sources resume from their last checkpoint offset on recovery.
30    pub restore_checkpoint: Option<SourceCheckpoint>,
31}
32
33/// Callback trait for the coordinator to interact with the rest of the DB.
34/// Trait exists for test seam; production impl is `ConnectorPipelineCallback`.
35#[trait_variant::make(Send)]
36pub trait PipelineCallback: Send + 'static {
37    /// Called with accumulated source batches to execute a SQL cycle.
38    async fn execute_cycle(
39        &mut self,
40        source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
41        watermark: i64,
42    ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>;
43
44    /// Called with results to push to stream subscriptions.
45    fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
46
47    /// Update materialized view stores with cycle results.
48    fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
49        let _ = results;
50    }
51
52    /// Called with results to write to sinks.
53    async fn write_to_sinks(&mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
54
55    /// Extract watermark from a batch for a given source.
56    fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
57
58    /// Filter late rows from a batch. Returns the filtered batch (or None if all late).
59    fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
60
61    /// Get the current pipeline watermark.
62    fn current_watermark(&self) -> i64;
63
64    /// Perform a periodic (timer-based) checkpoint. At-least-once semantics.
65    /// For exactly-once, use [`checkpoint_with_barrier`].
66    ///
67    /// [`checkpoint_with_barrier`]: PipelineCallback::checkpoint_with_barrier
68    async fn maybe_checkpoint(
69        &mut self,
70        force: bool,
71        source_offsets: FxHashMap<String, SourceCheckpoint>,
72    ) -> Option<u64>;
73
74    /// Called when all sources have aligned on a barrier.
75    async fn checkpoint_with_barrier(
76        &mut self,
77        source_checkpoints: FxHashMap<String, SourceCheckpoint>,
78    ) -> Option<u64>;
79
80    /// Record cycle metrics.
81    fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
82
83    /// Poll table sources for incremental CDC changes.
84    async fn poll_tables(&mut self);
85
86    /// Apply a DDL control message (add/drop stream) to the running pipeline.
87    fn apply_control(&mut self, msg: super::ControlMsg);
88
89    /// Returns `true` when internal buffers are near capacity.
90    fn is_backpressured(&self) -> bool {
91        false
92    }
93
94    /// Returns `true` when deferred operators have pending input to drain.
95    fn has_deferred_input(&self) -> bool {
96        false
97    }
98}