Skip to main content

laminar_db/pipeline/
callback.rs

1//! Pipeline callback trait and source registration types.
2//!
3//! Decouples the pipeline coordinator from `db.rs` internals so the
4//! TPC coordinator can drive SQL cycles, sink writes, and checkpoints
5//! through a narrow interface.
6
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_connectors::config::ConnectorConfig;
12use laminar_connectors::connector::SourceConnector;
13use rustc_hash::FxHashMap;
14
15/// Why a barrier checkpoint was deliberately skipped, as opposed to
16/// attempted-and-failed.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SkipReason {
19    /// No execution cycles ran since the last checkpoint.
20    NoCyclesSinceLastCheckpoint,
21    /// A sink write timed out; skip to keep the replay window intact.
22    PreservingReplayWindowAfterSinkTimeout,
23}
24
25impl std::fmt::Display for SkipReason {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.write_str(match self {
28            SkipReason::NoCyclesSinceLastCheckpoint => "no_cycles_since_last_checkpoint",
29            SkipReason::PreservingReplayWindowAfterSinkTimeout => {
30                "preserving_replay_window_after_sink_timeout"
31            }
32        })
33    }
34}
35
36/// Outcome of a barrier-aligned checkpoint attempt. `Skipped` is logged
37/// at debug; `Failed` keeps the retry warning.
38#[derive(Debug)]
39pub enum BarrierOutcome {
40    /// Checkpoint committed at the given epoch.
41    Committed(u64),
42    /// Checkpoint is processing asynchronously in the background.
43    Async,
44    /// Deliberately skipped (see `SkipReason`).
45    Skipped(SkipReason),
46    /// Attempted and failed; retry on the next interval.
47    Failed,
48}
49
50/// A registered source with its name and config.
51pub struct SourceRegistration {
52    /// Source name.
53    pub name: String,
54    /// The connector (owned).
55    pub connector: Box<dyn SourceConnector>,
56    /// Connector config (for open).
57    pub config: ConnectorConfig,
58    /// Whether this source supports replay from a checkpointed position.
59    pub supports_replay: bool,
60    /// Checkpoint to restore on startup (set during recovery).
61    ///
62    /// When `Some`, the source adapter calls `connector.restore()` after
63    /// `open()` to seek to the checkpointed position. This is how Kafka
64    /// sources resume from their last checkpoint offset on recovery.
65    pub restore_checkpoint: Option<SourceCheckpoint>,
66}
67
68/// Callback trait for the coordinator to interact with the rest of the DB.
69/// Trait exists for test seam; production impl is `ConnectorPipelineCallback`.
70#[trait_variant::make(Send)]
71pub trait PipelineCallback: Send + 'static {
72    /// Called with accumulated source batches to execute a SQL cycle.
73    async fn execute_cycle(
74        &mut self,
75        source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
76        watermark: i64,
77    ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>;
78
79    /// Called with results to push to stream subscriptions.
80    fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
81
82    /// Update materialized view stores with cycle results.
83    fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
84        let _ = results;
85    }
86
87    /// Called with results to write to sinks.
88    async fn write_to_sinks(&mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
89
90    /// Extract watermark from a batch for a given source.
91    fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
92
93    /// Filter late rows from a batch. Returns the filtered batch (or None if all late).
94    fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
95
96    /// Get the current pipeline watermark.
97    fn current_watermark(&self) -> i64;
98
99    /// Returns `true` if this node is the leader in cluster mode, or if running in single-node mode.
100    fn is_leader(&self) -> bool {
101        true
102    }
103
104    /// Per drain cycle: demote sources idle past their timeout so a quiet
105    /// input doesn't hold back the combined watermark. Default: no-op.
106    fn tick_idle_watermark(&mut self) {}
107
108    /// Perform a periodic (timer-based) checkpoint. At-least-once semantics.
109    /// For exactly-once, use [`checkpoint_with_barrier`].
110    ///
111    /// [`checkpoint_with_barrier`]: PipelineCallback::checkpoint_with_barrier
112    async fn maybe_checkpoint(
113        &mut self,
114        force: bool,
115        source_offsets: FxHashMap<String, SourceCheckpoint>,
116    ) -> Option<u64>;
117
118    /// Called when all sources have aligned on a barrier.
119    /// `checkpoint_id` identifies the barrier round the offsets were
120    /// captured under — implementations must not attribute them to a
121    /// different (e.g. newer, post-abandonment) announcement.
122    async fn checkpoint_with_barrier(
123        &mut self,
124        source_checkpoints: FxHashMap<String, SourceCheckpoint>,
125        checkpoint_id: u64,
126    ) -> BarrierOutcome;
127
128    /// Record cycle metrics.
129    fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
130
131    /// Poll table sources for incremental CDC changes.
132    async fn poll_tables(&mut self);
133
134    /// Apply a DDL control message (add/drop stream) to the running pipeline.
135    fn apply_control(&mut self, msg: super::ControlMsg);
136
137    /// Returns `true` when internal buffers are near capacity.
138    fn is_backpressured(&self) -> bool {
139        false
140    }
141
142    /// Returns `true` when deferred operators have pending input to drain.
143    fn has_deferred_input(&self) -> bool {
144        false
145    }
146
147    /// Forward a durable checkpoint epoch to external SUBSCRIBE consumers.
148    fn publish_barrier(&self, epoch: u64, checkpoint_id: u64) {
149        let _ = (epoch, checkpoint_id);
150    }
151
152    /// Get the next checkpoint ID if managed externally.
153    fn next_checkpoint_id(&self) -> Option<u64> {
154        None
155    }
156
157    /// Register the local source barrier injectors.
158    fn set_barrier_injectors(
159        &mut self,
160        injectors: Vec<(
161            Arc<str>,
162            laminar_core::checkpoint::CheckpointBarrierInjector,
163        )>,
164    ) {
165        let _ = injectors;
166    }
167}