laminar_core/dag/executor.rs
1//! Ring 0 DAG executor for event processing.
2//!
3//! [`DagExecutor`] processes events through a finalized [`StreamingDag`] in
4//! topological order. It uses the pre-computed [`RoutingTable`] for O(1)
5//! dispatch and integrates with the [`Operator`] trait for operator invocation.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌──────────────────────────────────────────────────────────────────┐
11//! │ RING 0: HOT PATH │
12//! │ │
13//! │ process_event(source, event) │
14//! │ │ │
15//! │ ▼ │
16//! │ ┌──────────┐ topological ┌───────────┐ route_output() │
17//! │ │ enqueue │──────order────▶│ operator │──────────────────┐ │
18//! │ │ (input │ │ .process()│ │ │
19//! │ │ queue) │ └───────────┘ │ │
20//! │ └──────────┘ │ │
21//! │ ▲ │ │
22//! │ │ ┌─────────────────┐ │ │
23//! │ └──────────────────│ RoutingTable │◀────────────────┘ │
24//! │ enqueue targets │ O(1) lookup │ │
25//! │ └─────────────────┘ │
26//! └──────────────────────────────────────────────────────────────────┘
27//! ```
28//!
29//! # Latency Budget
30//!
31//! | Component | Budget |
32//! |-----------|--------|
33//! | Routing table lookup | < 50ns |
34//! | Operator dispatch | < 200ns |
35//! | Multicast to N consumers | < 100ns |
36//! | State access | < 200ns |
37//! | **Total** | **< 500ns** |
38
39use std::collections::VecDeque;
40
41use rustc_hash::FxHashMap;
42
43use crate::alloc::HotPathGuard;
44use crate::operator::{
45 Event, Operator, OperatorContext, OperatorState, Output, OutputVec, SideOutputData, Timer,
46};
47use crate::state::InMemoryStore;
48use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
49
50use super::checkpoint::CheckpointBarrier;
51use super::error::DagError;
52use super::routing::RoutingTable;
53use super::topology::{DagNodeType, NodeId, StreamingDag};
54use super::watermark::{DagWatermarkCheckpoint, DagWatermarkTracker};
55
56/// Per-node runtime state (timer service, state store, watermark generator).
57///
58/// Created during executor construction (Ring 2) and used during event
59/// processing (Ring 0). Temporarily moved out of the executor during
60/// operator dispatch to satisfy Rust's borrow checker.
61struct NodeRuntime {
62 /// Timer service for this node.
63 timer_service: TimerService,
64 /// State store for this node.
65 state_store: Box<dyn crate::state::StateStore>,
66 /// Watermark generator for this node.
67 watermark_generator: Box<dyn crate::time::WatermarkGenerator>,
68}
69
70impl Default for NodeRuntime {
71 fn default() -> Self {
72 Self {
73 timer_service: TimerService::new(),
74 state_store: Box::new(InMemoryStore::new()),
75 watermark_generator: Box::new(BoundedOutOfOrdernessGenerator::new(0)),
76 }
77 }
78}
79
80/// Metrics tracked by the DAG executor.
81///
82/// Counters are updated during event processing and can be read
83/// at any time for observability.
84#[derive(Debug, Clone, Default)]
85pub struct DagExecutorMetrics {
86 /// Total events processed through operator dispatch.
87 pub events_processed: u64,
88 /// Total events routed to downstream nodes.
89 pub events_routed: u64,
90 /// Total multicast dispatches (fan-out to > 1 target).
91 pub multicast_publishes: u64,
92 /// Total backpressure stalls encountered.
93 pub backpressure_stalls: u64,
94 /// Total nodes skipped (empty input queue).
95 pub nodes_skipped: u64,
96 /// Total watermark advances processed via `process_watermark()`.
97 pub watermarks_processed: u64,
98}
99
100/// Per-operator metrics tracked by the DAG executor.
101///
102/// Only populated when the `dag-metrics` feature is enabled.
103/// Tracks event counts and cumulative processing time per operator node.
104#[cfg(feature = "dag-metrics")]
105#[derive(Debug, Clone, Default)]
106pub struct OperatorNodeMetrics {
107 /// Total events received by this operator.
108 pub events_in: u64,
109 /// Total events emitted by this operator.
110 pub events_out: u64,
111 /// Total processing time in nanoseconds.
112 pub total_time_ns: u64,
113 /// Number of `process()` invocations.
114 pub invocations: u64,
115}
116
117/// Ring 0 DAG executor for event processing.
118///
119/// Processes events through a finalized [`StreamingDag`] in topological order
120/// using the pre-computed [`RoutingTable`] for O(1) dispatch.
121///
122/// # Construction
123///
124/// ```rust,ignore
125/// let dag = DagBuilder::new()
126/// .source("src", schema.clone())
127/// .operator("transform", schema.clone())
128/// .connect("src", "transform")
129/// .sink_for("transform", "out", schema.clone())
130/// .build()?;
131///
132/// let mut executor = DagExecutor::from_dag(&dag);
133/// executor.register_operator(transform_id, Box::new(my_operator));
134/// executor.process_event(src_id, event)?;
135/// let outputs = executor.take_sink_outputs(out_id);
136/// ```
137pub struct DagExecutor {
138 /// Registered operators, indexed by `NodeId.0`. `None` = passthrough.
139 operators: Vec<Option<Box<dyn Operator>>>,
140 /// Per-node runtime state (timer, state store, watermark generator).
141 runtimes: Vec<Option<NodeRuntime>>,
142 /// Pre-allocated input queues per node, indexed by `NodeId.0`.
143 input_queues: Vec<VecDeque<Event>>,
144 /// Collected sink outputs, indexed by `NodeId.0`.
145 sink_outputs: Vec<Vec<Event>>,
146 /// Pre-computed routing table for O(1) dispatch.
147 routing: RoutingTable,
148 /// Topological execution order (from the finalized DAG).
149 execution_order: Vec<NodeId>,
150 /// Source node IDs.
151 source_nodes: Vec<NodeId>,
152 /// Sink node IDs.
153 sink_nodes: Vec<NodeId>,
154 /// Node types, indexed by `NodeId.0`.
155 node_types: Vec<DagNodeType>,
156 /// Total number of node slots allocated.
157 slot_count: usize,
158 /// Number of incoming edges per node, indexed by `NodeId.0`.
159 input_counts: Vec<usize>,
160 /// Temporary buffer for draining input queues (avoids allocation).
161 temp_events: Vec<Event>,
162 /// Executor metrics.
163 metrics: DagExecutorMetrics,
164 /// DAG-native watermark tracker for topology-aware propagation.
165 watermark_tracker: DagWatermarkTracker,
166 /// Per-operator node metrics (feature-gated).
167 #[cfg(feature = "dag-metrics")]
168 node_metrics: Vec<OperatorNodeMetrics>,
169}
170
171impl DagExecutor {
172 /// Creates a new executor from a finalized [`StreamingDag`].
173 ///
174 /// Allocates all per-node state (input queues, runtimes, sink buffers)
175 /// up front in Ring 2. The hot path (`process_event`) is allocation-free.
176 ///
177 /// # Arguments
178 ///
179 /// * `dag` - A finalized `StreamingDag` topology
180 #[must_use]
181 pub fn from_dag(dag: &StreamingDag) -> Self {
182 let slot_count = dag.nodes().keys().map(|n| n.0).max().map_or(0, |n| n + 1) as usize;
183
184 let routing = RoutingTable::from_dag(dag);
185
186 let mut operators = Vec::with_capacity(slot_count);
187 let mut runtimes = Vec::with_capacity(slot_count);
188 let mut input_queues = Vec::with_capacity(slot_count);
189 let mut sink_outputs = Vec::with_capacity(slot_count);
190 let mut node_types = Vec::with_capacity(slot_count);
191 let mut input_counts = vec![0usize; slot_count];
192
193 for _ in 0..slot_count {
194 operators.push(None);
195 runtimes.push(Some(NodeRuntime::default()));
196 input_queues.push(VecDeque::with_capacity(16));
197 sink_outputs.push(Vec::new());
198 node_types.push(DagNodeType::StatelessOperator);
199 }
200
201 // Populate node types and input counts from the DAG.
202 for node in dag.nodes().values() {
203 let idx = node.id.0 as usize;
204 if idx < slot_count {
205 node_types[idx] = node.node_type;
206 input_counts[idx] = dag.incoming_edge_count(node.id);
207 }
208 }
209
210 let watermark_tracker = DagWatermarkTracker::from_dag(dag);
211
212 Self {
213 operators,
214 runtimes,
215 input_queues,
216 sink_outputs,
217 routing,
218 execution_order: dag.execution_order().to_vec(),
219 source_nodes: dag.sources().to_vec(),
220 sink_nodes: dag.sinks().to_vec(),
221 node_types,
222 slot_count,
223 input_counts,
224 temp_events: Vec::with_capacity(64),
225 metrics: DagExecutorMetrics::default(),
226 watermark_tracker,
227 #[cfg(feature = "dag-metrics")]
228 node_metrics: (0..slot_count)
229 .map(|_| OperatorNodeMetrics::default())
230 .collect(),
231 }
232 }
233
234 /// Registers an operator for a node.
235 ///
236 /// Nodes without registered operators act as passthrough: events are
237 /// forwarded to downstream nodes unchanged. This is the default for
238 /// source and sink nodes.
239 ///
240 /// # Arguments
241 ///
242 /// * `node` - The node ID to register the operator for
243 /// * `operator` - The operator implementation
244 pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>) {
245 let idx = node.0 as usize;
246 if idx < self.slot_count {
247 self.operators[idx] = Some(operator);
248 }
249 }
250
251 /// Processes an event from a source node through the entire DAG.
252 ///
253 /// The event is enqueued at the source node, then all nodes are processed
254 /// in topological order. Events produced by operators are routed to
255 /// downstream nodes via the [`RoutingTable`]. Sink outputs are collected
256 /// and can be retrieved via [`take_sink_outputs()`](Self::take_sink_outputs).
257 ///
258 /// # Arguments
259 ///
260 /// * `source_node` - The source node to inject the event
261 /// * `event` - The event to process
262 ///
263 /// # Errors
264 ///
265 /// Returns [`DagError::NodeNotFound`] if the source node is out of bounds.
266 pub fn process_event(&mut self, source_node: NodeId, event: Event) -> Result<(), DagError> {
267 let idx = source_node.0 as usize;
268 if idx >= self.slot_count {
269 return Err(DagError::NodeNotFound(format!("{source_node}")));
270 }
271
272 self.input_queues[idx].push_back(event);
273 self.process_dag();
274 Ok(())
275 }
276
277 /// Advances watermark generators for all nodes in topological order.
278 ///
279 /// Called when an external source provides a watermark (e.g., via
280 /// `Source::watermark()`). Propagates the watermark through the DAG
281 /// so downstream operators can use it for late-data filtering and
282 /// window triggering.
283 ///
284 /// # Arguments
285 ///
286 /// * `source_node` - The source node that originated the watermark
287 /// * `watermark` - The watermark timestamp in milliseconds
288 ///
289 /// # Errors
290 ///
291 /// Returns [`DagError::NodeNotFound`] if `source_node` is out of bounds.
292 pub fn process_watermark(
293 &mut self,
294 source_node: NodeId,
295 watermark: i64,
296 ) -> Result<(), DagError> {
297 let idx = source_node.0 as usize;
298 if idx >= self.slot_count {
299 return Err(DagError::NodeNotFound(format!("{source_node}")));
300 }
301
302 // Use topology-aware propagation: only advance downstream nodes.
303 // Direct field access avoids collecting into a SmallVec.
304 for &(node_id, wm) in self
305 .watermark_tracker
306 .update_watermark(source_node, watermark)
307 {
308 let nidx = node_id.0 as usize;
309 if nidx < self.slot_count {
310 if let Some(ref mut rt) = self.runtimes[nidx] {
311 rt.watermark_generator.advance_watermark(wm);
312 }
313 }
314 }
315
316 // Fire timers for any nodes whose watermark advanced
317 self.fire_timers(watermark);
318
319 #[cfg(feature = "dag-metrics")]
320 {
321 self.metrics.watermarks_processed += 1;
322 }
323 Ok(())
324 }
325
326 /// Fires all expired timers for each node in topological order.
327 ///
328 /// Polls each node's [`TimerService`] for timers with
329 /// `timestamp <= current_time`, calls [`Operator::on_timer()`] for each
330 /// fired timer, and routes the resulting outputs downstream.
331 ///
332 /// This is used to trigger window closures after watermark advancement.
333 ///
334 /// # Arguments
335 ///
336 /// * `current_time` - The current event-time (typically the latest watermark)
337 pub fn fire_timers(&mut self, current_time: i64) {
338 for i in 0..self.execution_order.len() {
339 let node_id = self.execution_order[i];
340 let idx = node_id.0 as usize;
341 if idx >= self.slot_count {
342 continue;
343 }
344
345 let mut operator = self.operators[idx].take();
346 let mut runtime = self.runtimes[idx].take();
347
348 if let (Some(op), Some(rt)) = (&mut operator, &mut runtime) {
349 let fired = rt.timer_service.poll_timers(current_time);
350 for reg in fired {
351 let timer = Timer {
352 key: reg.key.unwrap_or_default(),
353 timestamp: reg.timestamp,
354 };
355 let mut ctx = OperatorContext {
356 event_time: reg.timestamp,
357 processing_time: current_time,
358 timers: &mut rt.timer_service,
359 state: rt.state_store.as_mut(),
360 watermark_generator: rt.watermark_generator.as_mut(),
361 operator_index: idx,
362 };
363 let outputs = op.on_timer(timer, &mut ctx);
364 self.route_all_outputs(node_id, outputs);
365 }
366 }
367
368 self.operators[idx] = operator;
369 self.runtimes[idx] = runtime;
370 }
371
372 // Process any events generated by timer firings through the rest of the DAG.
373 self.process_dag();
374 }
375
376 /// Takes collected sink outputs for a given sink node.
377 ///
378 /// Returns all events that reached this sink during prior
379 /// `process_event` calls, draining the internal buffer.
380 #[must_use]
381 pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event> {
382 let idx = sink_node.0 as usize;
383 if idx < self.slot_count {
384 std::mem::take(&mut self.sink_outputs[idx])
385 } else {
386 Vec::new()
387 }
388 }
389
390 /// Takes all sink outputs across all sink nodes.
391 #[must_use]
392 pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>> {
393 let mut outputs = FxHashMap::default();
394 // Index-based iteration avoids collecting sink_nodes into a SmallVec.
395 for i in 0..self.sink_nodes.len() {
396 let sink_id = self.sink_nodes[i];
397 let idx = sink_id.0 as usize;
398 if idx < self.slot_count {
399 let events = std::mem::take(&mut self.sink_outputs[idx]);
400 if !events.is_empty() {
401 outputs.insert(sink_id, events);
402 }
403 }
404 }
405 outputs
406 }
407
408 /// Returns a reference to the executor metrics.
409 #[must_use]
410 pub fn metrics(&self) -> &DagExecutorMetrics {
411 &self.metrics
412 }
413
414 /// Returns per-operator node metrics (only available with `dag-metrics` feature).
415 #[cfg(feature = "dag-metrics")]
416 #[must_use]
417 pub fn node_metrics(&self) -> &[OperatorNodeMetrics] {
418 &self.node_metrics
419 }
420
421 /// Resets all executor metrics to zero.
422 pub fn reset_metrics(&mut self) {
423 self.metrics = DagExecutorMetrics::default();
424 #[cfg(feature = "dag-metrics")]
425 {
426 for m in &mut self.node_metrics {
427 *m = OperatorNodeMetrics::default();
428 }
429 }
430 }
431
432 /// Returns the source node IDs.
433 #[must_use]
434 pub fn source_nodes(&self) -> &[NodeId] {
435 &self.source_nodes
436 }
437
438 /// Returns the sink node IDs.
439 #[must_use]
440 pub fn sink_nodes(&self) -> &[NodeId] {
441 &self.sink_nodes
442 }
443
444 /// Returns the node type for a given node ID.
445 #[must_use]
446 pub fn node_type(&self, node: NodeId) -> Option<DagNodeType> {
447 let idx = node.0 as usize;
448 if idx < self.slot_count {
449 Some(self.node_types[idx])
450 } else {
451 None
452 }
453 }
454
455 /// Checkpoints all registered operators.
456 ///
457 /// Returns a map of `NodeId` to `OperatorState` for all nodes
458 /// that have registered operators.
459 #[must_use]
460 pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState> {
461 let mut states = FxHashMap::default();
462 for (idx, op) in self.operators.iter().enumerate() {
463 if let Some(operator) = op {
464 #[allow(clippy::cast_possible_truncation)]
465 // DAG node count bounded by topology (< u32::MAX)
466 let node_id = NodeId(idx as u32);
467 states.insert(node_id, operator.checkpoint());
468 }
469 }
470 states
471 }
472
473 /// Restores operator state from a checkpoint snapshot.
474 ///
475 /// Iterates the provided states and calls `operator.restore()` on each
476 /// registered operator.
477 ///
478 /// # Errors
479 ///
480 /// Returns [`DagError::RestoreFailed`] if any operator fails to restore.
481 pub fn restore(&mut self, states: &FxHashMap<NodeId, OperatorState>) -> Result<(), DagError> {
482 for (node_id, state) in states {
483 let idx = node_id.0 as usize;
484 if idx < self.slot_count {
485 if let Some(ref mut operator) = self.operators[idx] {
486 operator
487 .restore(state.clone())
488 .map_err(|e| DagError::RestoreFailed {
489 node_id: format!("{node_id}"),
490 reason: e.to_string(),
491 })?;
492 }
493 }
494 }
495 Ok(())
496 }
497
498 /// Checkpoints the watermark tracker state.
499 #[must_use]
500 pub fn checkpoint_watermarks(&self) -> DagWatermarkCheckpoint {
501 self.watermark_tracker.checkpoint()
502 }
503
504 /// Restores watermark tracker state from a checkpoint.
505 pub fn restore_watermarks(&mut self, checkpoint: &DagWatermarkCheckpoint) {
506 self.watermark_tracker.restore(checkpoint);
507 }
508
509 /// Injects events into a node's input queue.
510 ///
511 /// Used during recovery to repopulate queues with buffered events.
512 pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>) {
513 let idx = node_id.0 as usize;
514 if idx < self.slot_count {
515 self.input_queues[idx].extend(events);
516 }
517 }
518
519 /// Returns the number of incoming edges for a node.
520 #[must_use]
521 pub fn input_count(&self, node_id: NodeId) -> usize {
522 let idx = node_id.0 as usize;
523 if idx < self.slot_count {
524 self.input_counts[idx]
525 } else {
526 0
527 }
528 }
529
530 /// Snapshots all registered operators in topological order.
531 ///
532 /// Takes the barrier for consistency (future use with epoch tracking).
533 /// In the synchronous single-threaded executor, topological ordering
534 /// guarantees upstream-first snapshots.
535 #[must_use]
536 pub fn process_checkpoint_barrier(
537 &mut self,
538 _barrier: &CheckpointBarrier,
539 ) -> FxHashMap<NodeId, OperatorState> {
540 let mut states = FxHashMap::default();
541 for &node_id in &self.execution_order {
542 let idx = node_id.0 as usize;
543 if idx < self.slot_count {
544 if let Some(ref operator) = self.operators[idx] {
545 states.insert(node_id, operator.checkpoint());
546 }
547 }
548 }
549 states
550 }
551
552 /// Processes all nodes in topological order.
553 ///
554 /// Drains input queues, dispatches to operators, and routes outputs
555 /// to downstream nodes. Uses [`HotPathGuard`] for zero-allocation
556 /// enforcement in debug builds.
557 fn process_dag(&mut self) {
558 let _guard = HotPathGuard::enter("dag_executor");
559
560 let order_len = self.execution_order.len();
561 for i in 0..order_len {
562 let node_id = self.execution_order[i];
563 self.process_node(node_id);
564 }
565 }
566
567 /// Processes a single node: drains its input queue, dispatches each event
568 /// to the operator, and routes outputs downstream.
569 fn process_node(&mut self, node_id: NodeId) {
570 let idx = node_id.0 as usize;
571
572 if self.input_queues[idx].is_empty() {
573 #[cfg(feature = "dag-metrics")]
574 {
575 self.metrics.nodes_skipped += 1;
576 }
577 return;
578 }
579
580 // Swap temp buffer out of self so the borrow checker allows
581 // mutable access to other fields during the loop.
582 let mut events = std::mem::take(&mut self.temp_events);
583 events.clear();
584 events.extend(self.input_queues[idx].drain(..));
585
586 // Take operator and runtime out temporarily.
587 // This lets us mutably access the rest of `self` for routing.
588 let mut operator = self.operators[idx].take();
589 let mut runtime = self.runtimes[idx].take();
590
591 for event in events.drain(..) {
592 #[cfg(feature = "dag-metrics")]
593 {
594 self.metrics.events_processed += 1;
595 self.node_metrics[idx].events_in += 1;
596 }
597
598 #[cfg(feature = "dag-metrics")]
599 let start = std::time::Instant::now();
600
601 let outputs = if let Some(op) = &mut operator {
602 if let Some(rt) = &mut runtime {
603 let mut ctx = OperatorContext {
604 event_time: event.timestamp,
605 processing_time: 0,
606 timers: &mut rt.timer_service,
607 state: rt.state_store.as_mut(),
608 watermark_generator: rt.watermark_generator.as_mut(),
609 operator_index: idx,
610 };
611 op.process(&event, &mut ctx)
612 } else {
613 passthrough_output(event)
614 }
615 } else {
616 passthrough_output(event)
617 };
618
619 #[cfg(feature = "dag-metrics")]
620 {
621 #[allow(clippy::cast_possible_truncation)]
622 let elapsed = start.elapsed().as_nanos() as u64;
623 self.node_metrics[idx].total_time_ns += elapsed;
624 self.node_metrics[idx].invocations += 1;
625 }
626
627 // Route all output variants to downstream nodes.
628 self.route_all_outputs(node_id, outputs);
629 }
630
631 // Put operator and runtime back.
632 self.operators[idx] = operator;
633 self.runtimes[idx] = runtime;
634 self.temp_events = events;
635 }
636
637 /// Routes all output variants from a source node.
638 ///
639 /// Handles each [`Output`] variant:
640 /// - `Event` → route to downstream nodes via [`route_output()`]
641 /// - `Watermark` → feed into watermark tracker, advance downstream runtimes
642 /// - `LateEvent` → collect in sink outputs for the source node
643 /// - `SideOutput` → collect in sink outputs for the source node
644 /// - `Changelog` → collect changelog event in sink outputs
645 /// - `CheckpointComplete` → no-op (consumed by coordinator)
646 fn route_all_outputs(&mut self, source: NodeId, outputs: OutputVec) {
647 use crate::operator::window::ChangelogRecord;
648
649 for output in outputs {
650 match output {
651 Output::Event(out_event) => {
652 #[cfg(feature = "dag-metrics")]
653 {
654 let sidx = source.0 as usize;
655 if sidx < self.slot_count {
656 self.node_metrics[sidx].events_out += 1;
657 }
658 }
659 self.route_output(source, out_event);
660 }
661 Output::Watermark(wm) => {
662 for &(node_id, new_wm) in self.watermark_tracker.update_watermark(source, wm) {
663 let nidx = node_id.0 as usize;
664 if nidx < self.slot_count {
665 if let Some(ref mut rt) = self.runtimes[nidx] {
666 rt.watermark_generator.advance_watermark(new_wm);
667 }
668 }
669 }
670 }
671 Output::LateEvent(late_event) => {
672 self.sink_outputs[source.0 as usize].push(late_event);
673 }
674 Output::SideOutput(data) => {
675 let SideOutputData { event, .. } = *data;
676 self.sink_outputs[source.0 as usize].push(event);
677 }
678 Output::Changelog(ChangelogRecord { event, .. }) => {
679 self.sink_outputs[source.0 as usize].push(event);
680 }
681 Output::CheckpointComplete(_) | Output::Barrier(_) => {
682 // No-op: consumed by checkpoint coordinator
683 }
684 }
685 }
686 }
687
688 /// Routes an output event from a source node to its downstream targets.
689 ///
690 /// - **Terminal (sink)**: event is collected in `sink_outputs`.
691 /// - **Single target**: event is enqueued directly (no clone).
692 /// - **Multicast**: event is cloned to N-1 targets, moved to the last.
693 fn route_output(&mut self, source: NodeId, event: Event) {
694 let entry = self.routing.node_targets(source);
695
696 if entry.is_terminal() {
697 // Sink node: collect output.
698 self.sink_outputs[source.0 as usize].push(event);
699 return;
700 }
701
702 #[cfg(feature = "dag-metrics")]
703 {
704 self.metrics.events_routed += 1;
705 }
706
707 if entry.is_multicast {
708 #[cfg(feature = "dag-metrics")]
709 {
710 self.metrics.multicast_publishes += 1;
711 }
712 let targets = entry.target_ids();
713
714 // Clone to all targets except the last, which gets the moved value.
715 for &target_id in &targets[..targets.len() - 1] {
716 self.input_queues[target_id as usize].push_back(event.clone());
717 }
718 self.input_queues[targets[targets.len() - 1] as usize].push_back(event);
719 } else {
720 // Single target: enqueue directly (zero-copy move).
721 self.input_queues[entry.targets[0] as usize].push_back(event);
722 }
723 }
724}
725
726impl std::fmt::Debug for DagExecutor {
727 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728 f.debug_struct("DagExecutor")
729 .field("slot_count", &self.slot_count)
730 .field("source_nodes", &self.source_nodes)
731 .field("sink_nodes", &self.sink_nodes)
732 .field("execution_order", &self.execution_order)
733 .field("metrics", &self.metrics)
734 .finish_non_exhaustive()
735 }
736}
737
738/// Creates a passthrough output (forwards the event unchanged).
739#[inline]
740fn passthrough_output(event: Event) -> OutputVec {
741 let mut v = OutputVec::new();
742 v.push(Output::Event(event));
743 v
744}