Skip to main content

DagExecutor

Struct DagExecutor 

Source
pub struct DagExecutor { /* private fields */ }
Expand description

Ring 0 DAG executor for event processing.

Processes events through a finalized StreamingDag in topological order using the pre-computed RoutingTable for O(1) dispatch.

§Construction

let dag = DagBuilder::new()
    .source("src", schema.clone())
    .operator("transform", schema.clone())
    .connect("src", "transform")
    .sink_for("transform", "out", schema.clone())
    .build()?;

let mut executor = DagExecutor::from_dag(&dag);
executor.register_operator(transform_id, Box::new(my_operator));
executor.process_event(src_id, event)?;
let outputs = executor.take_sink_outputs(out_id);

Implementations§

Source§

impl DagExecutor

Source

pub fn from_dag(dag: &StreamingDag) -> Self

Creates a new executor from a finalized StreamingDag.

Allocates all per-node state (input queues, runtimes, sink buffers) up front in Ring 2. The hot path (process_event) is allocation-free.

§Arguments
  • dag - A finalized StreamingDag topology
Source

pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>)

Registers an operator for a node.

Nodes without registered operators act as passthrough: events are forwarded to downstream nodes unchanged. This is the default for source and sink nodes.

§Arguments
  • node - The node ID to register the operator for
  • operator - The operator implementation
Source

pub fn process_event( &mut self, source_node: NodeId, event: Event, ) -> Result<(), DagError>

Processes an event from a source node through the entire DAG.

The event is enqueued at the source node, then all nodes are processed in topological order. Events produced by operators are routed to downstream nodes via the RoutingTable. Sink outputs are collected and can be retrieved via take_sink_outputs().

§Arguments
  • source_node - The source node to inject the event
  • event - The event to process
§Errors

Returns DagError::NodeNotFound if the source node is out of bounds.

Source

pub fn process_watermark( &mut self, source_node: NodeId, watermark: i64, ) -> Result<(), DagError>

Advances watermark generators for all nodes in topological order.

Called when an external source provides a watermark (e.g., via Source::watermark()). Propagates the watermark through the DAG so downstream operators can use it for late-data filtering and window triggering.

§Arguments
  • source_node - The source node that originated the watermark
  • watermark - The watermark timestamp in milliseconds
§Errors

Returns DagError::NodeNotFound if source_node is out of bounds.

Source

pub fn fire_timers(&mut self, current_time: i64)

Fires all expired timers for each node in topological order.

Polls each node’s TimerService for timers with timestamp <= current_time, calls Operator::on_timer() for each fired timer, and routes the resulting outputs downstream.

This is used to trigger window closures after watermark advancement.

§Arguments
  • current_time - The current event-time (typically the latest watermark)
Source

pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event>

Takes collected sink outputs for a given sink node.

Returns all events that reached this sink during prior process_event calls, draining the internal buffer.

Source

pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>>

Takes all sink outputs across all sink nodes.

Source

pub fn metrics(&self) -> &DagExecutorMetrics

Returns a reference to the executor metrics.

Source

pub fn node_metrics(&self) -> &[OperatorNodeMetrics]

Returns per-operator node metrics (only available with dag-metrics feature).

Source

pub fn reset_metrics(&mut self)

Resets all executor metrics to zero.

Source

pub fn source_nodes(&self) -> &[NodeId]

Returns the source node IDs.

Source

pub fn sink_nodes(&self) -> &[NodeId]

Returns the sink node IDs.

Source

pub fn node_type(&self, node: NodeId) -> Option<DagNodeType>

Returns the node type for a given node ID.

Source

pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState>

Checkpoints all registered operators.

Returns a map of NodeId to OperatorState for all nodes that have registered operators.

Source

pub fn restore( &mut self, states: &FxHashMap<NodeId, OperatorState>, ) -> Result<(), DagError>

Restores operator state from a checkpoint snapshot.

Iterates the provided states and calls operator.restore() on each registered operator.

§Errors

Returns DagError::RestoreFailed if any operator fails to restore.

Source

pub fn checkpoint_watermarks(&self) -> DagWatermarkCheckpoint

Checkpoints the watermark tracker state.

Source

pub fn restore_watermarks(&mut self, checkpoint: &DagWatermarkCheckpoint)

Restores watermark tracker state from a checkpoint.

Source

pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>)

Injects events into a node’s input queue.

Used during recovery to repopulate queues with buffered events.

Source

pub fn input_count(&self, node_id: NodeId) -> usize

Returns the number of incoming edges for a node.

Source

pub fn process_checkpoint_barrier( &mut self, _barrier: &CheckpointBarrier, ) -> FxHashMap<NodeId, OperatorState>

Snapshots all registered operators in topological order.

Takes the barrier for consistency (future use with epoch tracking). In the synchronous single-threaded executor, topological ordering guarantees upstream-first snapshots.

Trait Implementations§

Source§

impl Debug for DagExecutor

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> ArchivePointee for T

§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> LayoutRaw for T

§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
§

impl<T> Pointee for T

§

type Metadata = ()

The metadata type for pointers and references to this type.
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T> Scope for T

§

fn with<F, R>(self, f: F) -> R
where Self: Sized, F: FnOnce(Self) -> R,

Scoped with ownership.
§

fn with_ref<F, R>(&self, f: F) -> R
where F: FnOnce(&Self) -> R,

Scoped with reference.
§

fn with_mut<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut Self) -> R,

Scoped with mutable reference.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more