Skip to main content

laminar_db/pipeline/
callback.rs

1//! Pipeline callback trait and source registration types.
2//!
3//! Decouples the pipeline coordinator from `db.rs` internals so the
4//! TPC coordinator can drive SQL cycles, sink writes, and checkpoints
5//! through a narrow interface.
6
7use arrow_array::RecordBatch;
8use laminar_connectors::checkpoint::SourceCheckpoint;
9use laminar_connectors::config::ConnectorConfig;
10use laminar_connectors::connector::SourceConnector;
11use rustc_hash::FxHashMap;
12
13/// A registered source with its name and config.
14pub struct SourceRegistration {
15    /// Source name.
16    pub name: String,
17    /// The connector (owned).
18    pub connector: Box<dyn SourceConnector>,
19    /// Connector config (for open).
20    pub config: ConnectorConfig,
21    /// Whether this source supports replay from a checkpointed position.
22    pub supports_replay: bool,
23    /// Checkpoint to restore on startup (set during recovery).
24    ///
25    /// When `Some`, the source adapter calls `connector.restore()` after
26    /// `open()` to seek to the checkpointed position. This is how Kafka
27    /// sources resume from their last checkpoint offset on recovery.
28    pub restore_checkpoint: Option<SourceCheckpoint>,
29}
30
31/// Callback trait for the coordinator to interact with the rest of the DB.
32///
33/// This decouples the pipeline module from db.rs internals.
34#[async_trait::async_trait]
35pub trait PipelineCallback: Send + 'static {
36    /// Called with accumulated source batches to execute a SQL cycle.
37    async fn execute_cycle(
38        &mut self,
39        source_batches: &FxHashMap<String, Vec<RecordBatch>>,
40        watermark: i64,
41    ) -> Result<FxHashMap<String, Vec<RecordBatch>>, String>;
42
43    /// Called with results to push to stream subscriptions.
44    fn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>);
45
46    /// Called with results to write to sinks.
47    async fn write_to_sinks(&mut self, results: &FxHashMap<String, Vec<RecordBatch>>);
48
49    /// Extract watermark from a batch for a given source.
50    fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
51
52    /// Filter late rows from a batch. Returns the filtered batch (or None if all late).
53    fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
54
55    /// Get the current pipeline watermark.
56    fn current_watermark(&self) -> i64;
57
58    /// Perform a periodic (timer-based) checkpoint. Returns true if checkpoint was triggered.
59    ///
60    /// **Semantics: at-least-once.** Timer-based checkpoints capture source
61    /// offsets *before* operator state. On recovery the consumer replays
62    /// from the offset, and operators may re-process records that were
63    /// already processed before the crash (at-least-once, not exactly-once).
64    ///
65    /// For exactly-once semantics, use [`checkpoint_with_barrier`] instead,
66    /// which captures offsets and operator state at a consistent cut across
67    /// all sources.
68    ///
69    /// [`checkpoint_with_barrier`]: PipelineCallback::checkpoint_with_barrier
70    async fn maybe_checkpoint(
71        &mut self,
72        force: bool,
73        source_offsets: FxHashMap<String, SourceCheckpoint>,
74    ) -> bool;
75
76    /// Called when all sources have aligned on a barrier.
77    ///
78    /// Receives source checkpoints captured at the barrier point (consistent).
79    /// The callback should snapshot operator state and persist the checkpoint.
80    /// Returns true if the checkpoint succeeded.
81    async fn checkpoint_with_barrier(
82        &mut self,
83        source_checkpoints: FxHashMap<String, SourceCheckpoint>,
84    ) -> bool;
85
86    /// Record cycle metrics.
87    fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
88
89    /// Poll table sources for incremental CDC changes.
90    async fn poll_tables(&mut self);
91}