pub trait PipelineCallback:
Send
+ 'static
+ Send {
Show 19 methods
// Required methods
fn execute_cycle(
&mut self,
source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
watermark: i64,
) -> impl Future<Output = Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>> + Send;
fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
fn write_to_sinks(
&mut self,
results: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
) -> impl Future<Output = ()> + Send;
fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
fn filter_late_rows(
&self,
source_name: &str,
batch: &RecordBatch,
) -> Option<RecordBatch>;
fn current_watermark(&self) -> i64;
fn maybe_checkpoint(
&mut self,
force: bool,
source_offsets: FxHashMap<String, SourceCheckpoint>,
) -> impl Future<Output = Option<u64>> + Send;
fn checkpoint_with_barrier(
&mut self,
source_checkpoints: FxHashMap<String, SourceCheckpoint>,
checkpoint_id: u64,
) -> impl Future<Output = BarrierOutcome> + Send;
fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
fn poll_tables(&mut self) -> impl Future<Output = ()> + Send;
fn apply_control(&mut self, msg: ControlMsg);
// Provided methods
fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) { ... }
fn is_leader(&self) -> bool { ... }
fn tick_idle_watermark(&mut self) { ... }
fn is_backpressured(&self) -> bool { ... }
fn has_deferred_input(&self) -> bool { ... }
fn publish_barrier(&self, epoch: u64, checkpoint_id: u64) { ... }
fn next_checkpoint_id(&self) -> Option<u64> { ... }
fn set_barrier_injectors(
&mut self,
injectors: Vec<(Arc<str>, CheckpointBarrierInjector)>,
) { ... }
}Expand description
Callback trait for the coordinator to interact with the rest of the DB.
Trait exists for test seam; production impl is ConnectorPipelineCallback.
Required Methods§
Sourcefn execute_cycle(
&mut self,
source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
watermark: i64,
) -> impl Future<Output = Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>> + Send
fn execute_cycle( &mut self, source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>, watermark: i64, ) -> impl Future<Output = Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>> + Send
Called with accumulated source batches to execute a SQL cycle.
Sourcefn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>)
fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>)
Called with results to push to stream subscriptions.
Sourcefn write_to_sinks(
&mut self,
results: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
) -> impl Future<Output = ()> + Send
fn write_to_sinks( &mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>, ) -> impl Future<Output = ()> + Send
Called with results to write to sinks.
Sourcefn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch)
fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch)
Extract watermark from a batch for a given source.
Sourcefn filter_late_rows(
&self,
source_name: &str,
batch: &RecordBatch,
) -> Option<RecordBatch>
fn filter_late_rows( &self, source_name: &str, batch: &RecordBatch, ) -> Option<RecordBatch>
Filter late rows from a batch. Returns the filtered batch (or None if all late).
Sourcefn current_watermark(&self) -> i64
fn current_watermark(&self) -> i64
Get the current pipeline watermark.
Sourcefn maybe_checkpoint(
&mut self,
force: bool,
source_offsets: FxHashMap<String, SourceCheckpoint>,
) -> impl Future<Output = Option<u64>> + Send
fn maybe_checkpoint( &mut self, force: bool, source_offsets: FxHashMap<String, SourceCheckpoint>, ) -> impl Future<Output = Option<u64>> + Send
Perform a periodic (timer-based) checkpoint. At-least-once semantics.
For exactly-once, use checkpoint_with_barrier.
Sourcefn checkpoint_with_barrier(
&mut self,
source_checkpoints: FxHashMap<String, SourceCheckpoint>,
checkpoint_id: u64,
) -> impl Future<Output = BarrierOutcome> + Send
fn checkpoint_with_barrier( &mut self, source_checkpoints: FxHashMap<String, SourceCheckpoint>, checkpoint_id: u64, ) -> impl Future<Output = BarrierOutcome> + Send
Called when all sources have aligned on a barrier.
checkpoint_id identifies the barrier round the offsets were
captured under — implementations must not attribute them to a
different (e.g. newer, post-abandonment) announcement.
Sourcefn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64)
fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64)
Record cycle metrics.
Sourcefn poll_tables(&mut self) -> impl Future<Output = ()> + Send
fn poll_tables(&mut self) -> impl Future<Output = ()> + Send
Poll table sources for incremental CDC changes.
Sourcefn apply_control(&mut self, msg: ControlMsg)
fn apply_control(&mut self, msg: ControlMsg)
Apply a DDL control message (add/drop stream) to the running pipeline.
Provided Methods§
Sourcefn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>)
fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>)
Update materialized view stores with cycle results.
Sourcefn is_leader(&self) -> bool
fn is_leader(&self) -> bool
Returns true if this node is the leader in cluster mode, or if running in single-node mode.
Sourcefn tick_idle_watermark(&mut self)
fn tick_idle_watermark(&mut self)
Per drain cycle: demote sources idle past their timeout so a quiet input doesn’t hold back the combined watermark. Default: no-op.
Sourcefn is_backpressured(&self) -> bool
fn is_backpressured(&self) -> bool
Returns true when internal buffers are near capacity.
Sourcefn has_deferred_input(&self) -> bool
fn has_deferred_input(&self) -> bool
Returns true when deferred operators have pending input to drain.
Sourcefn publish_barrier(&self, epoch: u64, checkpoint_id: u64)
fn publish_barrier(&self, epoch: u64, checkpoint_id: u64)
Forward a durable checkpoint epoch to external SUBSCRIBE consumers.
Sourcefn next_checkpoint_id(&self) -> Option<u64>
fn next_checkpoint_id(&self) -> Option<u64>
Get the next checkpoint ID if managed externally.
Sourcefn set_barrier_injectors(
&mut self,
injectors: Vec<(Arc<str>, CheckpointBarrierInjector)>,
)
fn set_barrier_injectors( &mut self, injectors: Vec<(Arc<str>, CheckpointBarrierInjector)>, )
Register the local source barrier injectors.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".