laminar_db/pipeline/callback.rs
1//! Pipeline callback trait and source registration types.
2//!
3//! Decouples the pipeline coordinator from `db.rs` internals so the
4//! TPC coordinator can drive SQL cycles, sink writes, and checkpoints
5//! through a narrow interface.
6
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_connectors::config::ConnectorConfig;
12use laminar_connectors::connector::SourceConnector;
13use rustc_hash::FxHashMap;
14
15/// Why a barrier checkpoint was deliberately skipped, as opposed to
16/// attempted-and-failed.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SkipReason {
19 /// No execution cycles ran since the last checkpoint.
20 NoCyclesSinceLastCheckpoint,
21 /// A sink write timed out; skip to keep the replay window intact.
22 PreservingReplayWindowAfterSinkTimeout,
23}
24
25impl std::fmt::Display for SkipReason {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.write_str(match self {
28 SkipReason::NoCyclesSinceLastCheckpoint => "no_cycles_since_last_checkpoint",
29 SkipReason::PreservingReplayWindowAfterSinkTimeout => {
30 "preserving_replay_window_after_sink_timeout"
31 }
32 })
33 }
34}
35
36/// Outcome of a barrier-aligned checkpoint attempt. `Skipped` is logged
37/// at debug; `Failed` keeps the retry warning.
38#[derive(Debug)]
39pub enum BarrierOutcome {
40 /// Checkpoint committed at the given epoch.
41 Committed(u64),
42 /// Checkpoint is processing asynchronously in the background.
43 Async,
44 /// Deliberately skipped (see `SkipReason`).
45 Skipped(SkipReason),
46 /// Attempted and failed; retry on the next interval.
47 Failed,
48}
49
50/// A registered source with its name and config.
51pub struct SourceRegistration {
52 /// Source name.
53 pub name: String,
54 /// The connector (owned).
55 pub connector: Box<dyn SourceConnector>,
56 /// Connector config (for open).
57 pub config: ConnectorConfig,
58 /// Whether this source supports replay from a checkpointed position.
59 pub supports_replay: bool,
60 /// Checkpoint to restore on startup (set during recovery).
61 ///
62 /// When `Some`, the source adapter calls `connector.restore()` after
63 /// `open()` to seek to the checkpointed position. This is how Kafka
64 /// sources resume from their last checkpoint offset on recovery.
65 pub restore_checkpoint: Option<SourceCheckpoint>,
66}
67
68/// Callback trait for the coordinator to interact with the rest of the DB.
69/// Trait exists for test seam; production impl is `ConnectorPipelineCallback`.
70#[trait_variant::make(Send)]
71pub trait PipelineCallback: Send + 'static {
72 /// Called with accumulated source batches to execute a SQL cycle.
73 async fn execute_cycle(
74 &mut self,
75 source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
76 watermark: i64,
77 ) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>;
78
79 /// Called with results to push to stream subscriptions.
80 fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
81
82 /// Update materialized view stores with cycle results.
83 fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
84 let _ = results;
85 }
86
87 /// Called with results to write to sinks.
88 async fn write_to_sinks(&mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
89
90 /// Extract watermark from a batch for a given source.
91 fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
92
93 /// Filter late rows from a batch. Returns the filtered batch (or None if all late).
94 fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
95
96 /// Get the current pipeline watermark.
97 fn current_watermark(&self) -> i64;
98
99 /// Returns `true` if this node is the leader in cluster mode, or if running in single-node mode.
100 fn is_leader(&self) -> bool {
101 true
102 }
103
104 /// Per drain cycle: demote sources idle past their timeout so a quiet
105 /// input doesn't hold back the combined watermark. Default: no-op.
106 fn tick_idle_watermark(&mut self) {}
107
108 /// Perform a periodic (timer-based) checkpoint. At-least-once semantics.
109 /// For exactly-once, use [`checkpoint_with_barrier`].
110 ///
111 /// [`checkpoint_with_barrier`]: PipelineCallback::checkpoint_with_barrier
112 async fn maybe_checkpoint(
113 &mut self,
114 force: bool,
115 source_offsets: FxHashMap<String, SourceCheckpoint>,
116 ) -> Option<u64>;
117
118 /// Called when all sources have aligned on a barrier.
119 /// `checkpoint_id` identifies the barrier round the offsets were
120 /// captured under — implementations must not attribute them to a
121 /// different (e.g. newer, post-abandonment) announcement.
122 async fn checkpoint_with_barrier(
123 &mut self,
124 source_checkpoints: FxHashMap<String, SourceCheckpoint>,
125 checkpoint_id: u64,
126 ) -> BarrierOutcome;
127
128 /// Record cycle metrics.
129 fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
130
131 /// Poll table sources for incremental CDC changes.
132 async fn poll_tables(&mut self);
133
134 /// Apply a DDL control message (add/drop stream) to the running pipeline.
135 fn apply_control(&mut self, msg: super::ControlMsg);
136
137 /// Returns `true` when internal buffers are near capacity.
138 fn is_backpressured(&self) -> bool {
139 false
140 }
141
142 /// Returns `true` when deferred operators have pending input to drain.
143 fn has_deferred_input(&self) -> bool {
144 false
145 }
146
147 /// Forward a durable checkpoint epoch to external SUBSCRIBE consumers.
148 fn publish_barrier(&self, epoch: u64, checkpoint_id: u64) {
149 let _ = (epoch, checkpoint_id);
150 }
151
152 /// Get the next checkpoint ID if managed externally.
153 fn next_checkpoint_id(&self) -> Option<u64> {
154 None
155 }
156
157 /// Register the local source barrier injectors.
158 fn set_barrier_injectors(
159 &mut self,
160 injectors: Vec<(
161 Arc<str>,
162 laminar_core::checkpoint::CheckpointBarrierInjector,
163 )>,
164 ) {
165 let _ = injectors;
166 }
167}