pub trait PipelineCallback: Send + 'static {
// Required methods
fn execute_cycle<'life0, 'life1, 'async_trait>(
&'life0 mut self,
source_batches: &'life1 FxHashMap<String, Vec<RecordBatch>>,
watermark: i64,
) -> Pin<Box<dyn Future<Output = Result<FxHashMap<String, Vec<RecordBatch>>, String>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>);
fn write_to_sinks<'life0, 'life1, 'async_trait>(
&'life0 mut self,
results: &'life1 FxHashMap<String, Vec<RecordBatch>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
fn filter_late_rows(
&self,
source_name: &str,
batch: &RecordBatch,
) -> Option<RecordBatch>;
fn current_watermark(&self) -> i64;
fn maybe_checkpoint<'life0, 'async_trait>(
&'life0 mut self,
force: bool,
source_offsets: FxHashMap<String, SourceCheckpoint>,
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn checkpoint_with_barrier<'life0, 'async_trait>(
&'life0 mut self,
source_checkpoints: FxHashMap<String, SourceCheckpoint>,
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
fn poll_tables<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
}Expand description
Callback trait for the coordinator to interact with the rest of the DB.
This decouples the pipeline module from db.rs internals.
Required Methods§
Sourcefn execute_cycle<'life0, 'life1, 'async_trait>(
&'life0 mut self,
source_batches: &'life1 FxHashMap<String, Vec<RecordBatch>>,
watermark: i64,
) -> Pin<Box<dyn Future<Output = Result<FxHashMap<String, Vec<RecordBatch>>, String>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn execute_cycle<'life0, 'life1, 'async_trait>(
&'life0 mut self,
source_batches: &'life1 FxHashMap<String, Vec<RecordBatch>>,
watermark: i64,
) -> Pin<Box<dyn Future<Output = Result<FxHashMap<String, Vec<RecordBatch>>, String>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Called with accumulated source batches to execute a SQL cycle.
Sourcefn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>)
fn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>)
Called with results to push to stream subscriptions.
Sourcefn write_to_sinks<'life0, 'life1, 'async_trait>(
&'life0 mut self,
results: &'life1 FxHashMap<String, Vec<RecordBatch>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn write_to_sinks<'life0, 'life1, 'async_trait>(
&'life0 mut self,
results: &'life1 FxHashMap<String, Vec<RecordBatch>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Called with results to write to sinks.
Sourcefn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch)
fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch)
Extract watermark from a batch for a given source.
Sourcefn filter_late_rows(
&self,
source_name: &str,
batch: &RecordBatch,
) -> Option<RecordBatch>
fn filter_late_rows( &self, source_name: &str, batch: &RecordBatch, ) -> Option<RecordBatch>
Filter late rows from a batch. Returns the filtered batch (or None if all late).
Sourcefn current_watermark(&self) -> i64
fn current_watermark(&self) -> i64
Get the current pipeline watermark.
Sourcefn maybe_checkpoint<'life0, 'async_trait>(
&'life0 mut self,
force: bool,
source_offsets: FxHashMap<String, SourceCheckpoint>,
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn maybe_checkpoint<'life0, 'async_trait>(
&'life0 mut self,
force: bool,
source_offsets: FxHashMap<String, SourceCheckpoint>,
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Perform a periodic (timer-based) checkpoint. Returns true if checkpoint was triggered.
Semantics: at-least-once. Timer-based checkpoints capture source offsets before operator state. On recovery the consumer replays from the offset, and operators may re-process records that were already processed before the crash (at-least-once, not exactly-once).
For exactly-once semantics, use checkpoint_with_barrier instead,
which captures offsets and operator state at a consistent cut across
all sources.
Sourcefn checkpoint_with_barrier<'life0, 'async_trait>(
&'life0 mut self,
source_checkpoints: FxHashMap<String, SourceCheckpoint>,
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn checkpoint_with_barrier<'life0, 'async_trait>(
&'life0 mut self,
source_checkpoints: FxHashMap<String, SourceCheckpoint>,
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Called when all sources have aligned on a barrier.
Receives source checkpoints captured at the barrier point (consistent). The callback should snapshot operator state and persist the checkpoint. Returns true if the checkpoint succeeded.
Sourcefn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64)
fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64)
Record cycle metrics.