laminar_db/pipeline/callback.rs
1//! Pipeline callback trait and source registration types.
2//!
3//! Decouples the pipeline coordinator from `db.rs` internals so the
4//! TPC coordinator can drive SQL cycles, sink writes, and checkpoints
5//! through a narrow interface.
6
7use arrow_array::RecordBatch;
8use laminar_connectors::checkpoint::SourceCheckpoint;
9use laminar_connectors::config::ConnectorConfig;
10use laminar_connectors::connector::SourceConnector;
11use rustc_hash::FxHashMap;
12
13/// A registered source with its name and config.
14pub struct SourceRegistration {
15 /// Source name.
16 pub name: String,
17 /// The connector (owned).
18 pub connector: Box<dyn SourceConnector>,
19 /// Connector config (for open).
20 pub config: ConnectorConfig,
21 /// Whether this source supports replay from a checkpointed position.
22 pub supports_replay: bool,
23 /// Checkpoint to restore on startup (set during recovery).
24 ///
25 /// When `Some`, the source adapter calls `connector.restore()` after
26 /// `open()` to seek to the checkpointed position. This is how Kafka
27 /// sources resume from their last checkpoint offset on recovery.
28 pub restore_checkpoint: Option<SourceCheckpoint>,
29}
30
31/// Callback trait for the coordinator to interact with the rest of the DB.
32///
33/// This decouples the pipeline module from db.rs internals.
34#[async_trait::async_trait]
35pub trait PipelineCallback: Send + 'static {
36 /// Called with accumulated source batches to execute a SQL cycle.
37 async fn execute_cycle(
38 &mut self,
39 source_batches: &FxHashMap<String, Vec<RecordBatch>>,
40 watermark: i64,
41 ) -> Result<FxHashMap<String, Vec<RecordBatch>>, String>;
42
43 /// Called with results to push to stream subscriptions.
44 fn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>);
45
46 /// Called with results to write to sinks.
47 async fn write_to_sinks(&mut self, results: &FxHashMap<String, Vec<RecordBatch>>);
48
49 /// Extract watermark from a batch for a given source.
50 fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
51
52 /// Filter late rows from a batch. Returns the filtered batch (or None if all late).
53 fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
54
55 /// Get the current pipeline watermark.
56 fn current_watermark(&self) -> i64;
57
58 /// Perform a periodic (timer-based) checkpoint. Returns true if checkpoint was triggered.
59 ///
60 /// **Semantics: at-least-once.** Timer-based checkpoints capture source
61 /// offsets *before* operator state. On recovery the consumer replays
62 /// from the offset, and operators may re-process records that were
63 /// already processed before the crash (at-least-once, not exactly-once).
64 ///
65 /// For exactly-once semantics, use [`checkpoint_with_barrier`] instead,
66 /// which captures offsets and operator state at a consistent cut across
67 /// all sources.
68 ///
69 /// [`checkpoint_with_barrier`]: PipelineCallback::checkpoint_with_barrier
70 async fn maybe_checkpoint(
71 &mut self,
72 force: bool,
73 source_offsets: FxHashMap<String, SourceCheckpoint>,
74 ) -> bool;
75
76 /// Called when all sources have aligned on a barrier.
77 ///
78 /// Receives source checkpoints captured at the barrier point (consistent).
79 /// The callback should snapshot operator state and persist the checkpoint.
80 /// Returns true if the checkpoint succeeded.
81 async fn checkpoint_with_barrier(
82 &mut self,
83 source_checkpoints: FxHashMap<String, SourceCheckpoint>,
84 ) -> bool;
85
86 /// Record cycle metrics.
87 fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
88
89 /// Poll table sources for incremental CDC changes.
90 async fn poll_tables(&mut self);
91}