Skip to main content

PipelineCallback

Trait PipelineCallback 

Source
pub trait PipelineCallback:
    Send
    + 'static
    + Send {
Show 19 methods // Required methods fn execute_cycle( &mut self, source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>, watermark: i64, ) -> impl Future<Output = Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>> + Send; fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>); fn write_to_sinks( &mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>, ) -> impl Future<Output = ()> + Send; fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch); fn filter_late_rows( &self, source_name: &str, batch: &RecordBatch, ) -> Option<RecordBatch>; fn current_watermark(&self) -> i64; fn maybe_checkpoint( &mut self, force: bool, source_offsets: FxHashMap<String, SourceCheckpoint>, ) -> impl Future<Output = Option<u64>> + Send; fn checkpoint_with_barrier( &mut self, source_checkpoints: FxHashMap<String, SourceCheckpoint>, checkpoint_id: u64, ) -> impl Future<Output = BarrierOutcome> + Send; fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64); fn poll_tables(&mut self) -> impl Future<Output = ()> + Send; fn apply_control(&mut self, msg: ControlMsg); // Provided methods fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) { ... } fn is_leader(&self) -> bool { ... } fn tick_idle_watermark(&mut self) { ... } fn is_backpressured(&self) -> bool { ... } fn has_deferred_input(&self) -> bool { ... } fn publish_barrier(&self, epoch: u64, checkpoint_id: u64) { ... } fn next_checkpoint_id(&self) -> Option<u64> { ... } fn set_barrier_injectors( &mut self, injectors: Vec<(Arc<str>, CheckpointBarrierInjector)>, ) { ... }
}
Expand description

Callback trait for the coordinator to interact with the rest of the DB. Trait exists for test seam; production impl is ConnectorPipelineCallback.

Required Methods§

Source

fn execute_cycle( &mut self, source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>, watermark: i64, ) -> impl Future<Output = Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>> + Send

Called with accumulated source batches to execute a SQL cycle.

Source

fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>)

Called with results to push to stream subscriptions.

Source

fn write_to_sinks( &mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>, ) -> impl Future<Output = ()> + Send

Called with results to write to sinks.

Source

fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch)

Extract watermark from a batch for a given source.

Source

fn filter_late_rows( &self, source_name: &str, batch: &RecordBatch, ) -> Option<RecordBatch>

Filter late rows from a batch. Returns the filtered batch (or None if all late).

Source

fn current_watermark(&self) -> i64

Get the current pipeline watermark.

Source

fn maybe_checkpoint( &mut self, force: bool, source_offsets: FxHashMap<String, SourceCheckpoint>, ) -> impl Future<Output = Option<u64>> + Send

Perform a periodic (timer-based) checkpoint. At-least-once semantics. For exactly-once, use checkpoint_with_barrier.

Source

fn checkpoint_with_barrier( &mut self, source_checkpoints: FxHashMap<String, SourceCheckpoint>, checkpoint_id: u64, ) -> impl Future<Output = BarrierOutcome> + Send

Called when all sources have aligned on a barrier. checkpoint_id identifies the barrier round the offsets were captured under — implementations must not attribute them to a different (e.g. newer, post-abandonment) announcement.

Source

fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64)

Record cycle metrics.

Source

fn poll_tables(&mut self) -> impl Future<Output = ()> + Send

Poll table sources for incremental CDC changes.

Source

fn apply_control(&mut self, msg: ControlMsg)

Apply a DDL control message (add/drop stream) to the running pipeline.

Provided Methods§

Source

fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>)

Update materialized view stores with cycle results.

Source

fn is_leader(&self) -> bool

Returns true if this node is the leader in cluster mode, or if running in single-node mode.

Source

fn tick_idle_watermark(&mut self)

Per drain cycle: demote sources idle past their timeout so a quiet input doesn’t hold back the combined watermark. Default: no-op.

Source

fn is_backpressured(&self) -> bool

Returns true when internal buffers are near capacity.

Source

fn has_deferred_input(&self) -> bool

Returns true when deferred operators have pending input to drain.

Source

fn publish_barrier(&self, epoch: u64, checkpoint_id: u64)

Forward a durable checkpoint epoch to external SUBSCRIBE consumers.

Source

fn next_checkpoint_id(&self) -> Option<u64>

Get the next checkpoint ID if managed externally.

Source

fn set_barrier_injectors( &mut self, injectors: Vec<(Arc<str>, CheckpointBarrierInjector)>, )

Register the local source barrier injectors.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§