Skip to main content

PipelineCallback

Trait PipelineCallback 

Source
pub trait PipelineCallback: Send + 'static {
    // Required methods
    fn execute_cycle<'life0, 'life1, 'async_trait>(
        &'life0 mut self,
        source_batches: &'life1 FxHashMap<String, Vec<RecordBatch>>,
        watermark: i64,
    ) -> Pin<Box<dyn Future<Output = Result<FxHashMap<String, Vec<RecordBatch>>, String>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>);
    fn write_to_sinks<'life0, 'life1, 'async_trait>(
        &'life0 mut self,
        results: &'life1 FxHashMap<String, Vec<RecordBatch>>,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
    fn filter_late_rows(
        &self,
        source_name: &str,
        batch: &RecordBatch,
    ) -> Option<RecordBatch>;
    fn current_watermark(&self) -> i64;
    fn maybe_checkpoint<'life0, 'async_trait>(
        &'life0 mut self,
        force: bool,
        source_offsets: FxHashMap<String, SourceCheckpoint>,
    ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn checkpoint_with_barrier<'life0, 'async_trait>(
        &'life0 mut self,
        source_checkpoints: FxHashMap<String, SourceCheckpoint>,
    ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
    fn poll_tables<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
}
Expand description

Callback trait for the coordinator to interact with the rest of the DB.

This decouples the pipeline module from db.rs internals.

Required Methods§

Source

fn execute_cycle<'life0, 'life1, 'async_trait>( &'life0 mut self, source_batches: &'life1 FxHashMap<String, Vec<RecordBatch>>, watermark: i64, ) -> Pin<Box<dyn Future<Output = Result<FxHashMap<String, Vec<RecordBatch>>, String>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Called with accumulated source batches to execute a SQL cycle.

Source

fn push_to_streams(&self, results: &FxHashMap<String, Vec<RecordBatch>>)

Called with results to push to stream subscriptions.

Source

fn write_to_sinks<'life0, 'life1, 'async_trait>( &'life0 mut self, results: &'life1 FxHashMap<String, Vec<RecordBatch>>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Called with results to write to sinks.

Source

fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch)

Extract watermark from a batch for a given source.

Source

fn filter_late_rows( &self, source_name: &str, batch: &RecordBatch, ) -> Option<RecordBatch>

Filter late rows from a batch. Returns the filtered batch (or None if all late).

Source

fn current_watermark(&self) -> i64

Get the current pipeline watermark.

Source

fn maybe_checkpoint<'life0, 'async_trait>( &'life0 mut self, force: bool, source_offsets: FxHashMap<String, SourceCheckpoint>, ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Perform a periodic (timer-based) checkpoint. Returns true if checkpoint was triggered.

Semantics: at-least-once. Timer-based checkpoints capture source offsets before operator state. On recovery the consumer replays from the offset, and operators may re-process records that were already processed before the crash (at-least-once, not exactly-once).

For exactly-once semantics, use checkpoint_with_barrier instead, which captures offsets and operator state at a consistent cut across all sources.

Source

fn checkpoint_with_barrier<'life0, 'async_trait>( &'life0 mut self, source_checkpoints: FxHashMap<String, SourceCheckpoint>, ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Called when all sources have aligned on a barrier.

Receives source checkpoints captured at the barrier point (consistent). The callback should snapshot operator state and persist the checkpoint. Returns true if the checkpoint succeeded.

Source

fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64)

Record cycle metrics.

Source

fn poll_tables<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Poll table sources for incremental CDC changes.

Implementors§