Skip to main content

laminar_core/dag/
executor.rs

1//! Ring 0 DAG executor for event processing.
2//!
3//! [`DagExecutor`] processes events through a finalized [`StreamingDag`] in
4//! topological order. It uses the pre-computed [`RoutingTable`] for O(1)
5//! dispatch and integrates with the [`Operator`] trait for operator invocation.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌──────────────────────────────────────────────────────────────────┐
11//! │                      RING 0: HOT PATH                            │
12//! │                                                                  │
13//! │  process_event(source, event)                                    │
14//! │       │                                                          │
15//! │       ▼                                                          │
16//! │  ┌──────────┐   topological   ┌───────────┐   route_output()    │
17//! │  │  enqueue  │──────order────▶│  operator  │──────────────────┐  │
18//! │  │  (input   │                │  .process()│                  │  │
19//! │  │   queue)  │                └───────────┘                  │  │
20//! │  └──────────┘                                                │  │
21//! │       ▲                                                      │  │
22//! │       │                  ┌─────────────────┐                 │  │
23//! │       └──────────────────│  RoutingTable   │◀────────────────┘  │
24//! │         enqueue targets  │  O(1) lookup    │                    │
25//! │                          └─────────────────┘                    │
26//! └──────────────────────────────────────────────────────────────────┘
27//! ```
28//!
29//! # Latency Budget
30//!
31//! | Component | Budget |
32//! |-----------|--------|
33//! | Routing table lookup | < 50ns |
34//! | Operator dispatch | < 200ns |
35//! | Multicast to N consumers | < 100ns |
36//! | State access | < 200ns |
37//! | **Total** | **< 500ns** |
38
39use std::collections::VecDeque;
40
41use rustc_hash::FxHashMap;
42
43use crate::alloc::HotPathGuard;
44use crate::operator::{
45    Event, Operator, OperatorContext, OperatorState, Output, OutputVec, SideOutputData, Timer,
46};
47use crate::state::InMemoryStore;
48use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
49
50use super::checkpoint::CheckpointBarrier;
51use super::error::DagError;
52use super::routing::RoutingTable;
53use super::topology::{DagNodeType, NodeId, StreamingDag};
54use super::watermark::{DagWatermarkCheckpoint, DagWatermarkTracker};
55
56/// Per-node runtime state (timer service, state store, watermark generator).
57///
58/// Created during executor construction (Ring 2) and used during event
59/// processing (Ring 0). Temporarily moved out of the executor during
60/// operator dispatch to satisfy Rust's borrow checker.
61struct NodeRuntime {
62    /// Timer service for this node.
63    timer_service: TimerService,
64    /// State store for this node.
65    state_store: Box<dyn crate::state::StateStore>,
66    /// Watermark generator for this node.
67    watermark_generator: Box<dyn crate::time::WatermarkGenerator>,
68}
69
70impl Default for NodeRuntime {
71    fn default() -> Self {
72        Self {
73            timer_service: TimerService::new(),
74            state_store: Box::new(InMemoryStore::new()),
75            watermark_generator: Box::new(BoundedOutOfOrdernessGenerator::new(0)),
76        }
77    }
78}
79
80/// Metrics tracked by the DAG executor.
81///
82/// Counters are updated during event processing and can be read
83/// at any time for observability.
84#[derive(Debug, Clone, Default)]
85pub struct DagExecutorMetrics {
86    /// Total events processed through operator dispatch.
87    pub events_processed: u64,
88    /// Total events routed to downstream nodes.
89    pub events_routed: u64,
90    /// Total multicast dispatches (fan-out to > 1 target).
91    pub multicast_publishes: u64,
92    /// Total backpressure stalls encountered.
93    pub backpressure_stalls: u64,
94    /// Total nodes skipped (empty input queue).
95    pub nodes_skipped: u64,
96    /// Total watermark advances processed via `process_watermark()`.
97    pub watermarks_processed: u64,
98}
99
100/// Per-operator metrics tracked by the DAG executor.
101///
102/// Only populated when the `dag-metrics` feature is enabled.
103/// Tracks event counts and cumulative processing time per operator node.
104#[cfg(feature = "dag-metrics")]
105#[derive(Debug, Clone, Default)]
106pub struct OperatorNodeMetrics {
107    /// Total events received by this operator.
108    pub events_in: u64,
109    /// Total events emitted by this operator.
110    pub events_out: u64,
111    /// Total processing time in nanoseconds.
112    pub total_time_ns: u64,
113    /// Number of `process()` invocations.
114    pub invocations: u64,
115}
116
117/// Ring 0 DAG executor for event processing.
118///
119/// Processes events through a finalized [`StreamingDag`] in topological order
120/// using the pre-computed [`RoutingTable`] for O(1) dispatch.
121///
122/// # Construction
123///
124/// ```rust,ignore
125/// let dag = DagBuilder::new()
126///     .source("src", schema.clone())
127///     .operator("transform", schema.clone())
128///     .connect("src", "transform")
129///     .sink_for("transform", "out", schema.clone())
130///     .build()?;
131///
132/// let mut executor = DagExecutor::from_dag(&dag);
133/// executor.register_operator(transform_id, Box::new(my_operator));
134/// executor.process_event(src_id, event)?;
135/// let outputs = executor.take_sink_outputs(out_id);
136/// ```
137pub struct DagExecutor {
138    /// Registered operators, indexed by `NodeId.0`. `None` = passthrough.
139    operators: Vec<Option<Box<dyn Operator>>>,
140    /// Per-node runtime state (timer, state store, watermark generator).
141    runtimes: Vec<Option<NodeRuntime>>,
142    /// Pre-allocated input queues per node, indexed by `NodeId.0`.
143    input_queues: Vec<VecDeque<Event>>,
144    /// Collected sink outputs, indexed by `NodeId.0`.
145    sink_outputs: Vec<Vec<Event>>,
146    /// Pre-computed routing table for O(1) dispatch.
147    routing: RoutingTable,
148    /// Topological execution order (from the finalized DAG).
149    execution_order: Vec<NodeId>,
150    /// Source node IDs.
151    source_nodes: Vec<NodeId>,
152    /// Sink node IDs.
153    sink_nodes: Vec<NodeId>,
154    /// Node types, indexed by `NodeId.0`.
155    node_types: Vec<DagNodeType>,
156    /// Total number of node slots allocated.
157    slot_count: usize,
158    /// Number of incoming edges per node, indexed by `NodeId.0`.
159    input_counts: Vec<usize>,
160    /// Temporary buffer for draining input queues (avoids allocation).
161    temp_events: Vec<Event>,
162    /// Executor metrics.
163    metrics: DagExecutorMetrics,
164    /// DAG-native watermark tracker for topology-aware propagation.
165    watermark_tracker: DagWatermarkTracker,
166    /// Per-operator node metrics (feature-gated).
167    #[cfg(feature = "dag-metrics")]
168    node_metrics: Vec<OperatorNodeMetrics>,
169}
170
171impl DagExecutor {
172    /// Creates a new executor from a finalized [`StreamingDag`].
173    ///
174    /// Allocates all per-node state (input queues, runtimes, sink buffers)
175    /// up front in Ring 2. The hot path (`process_event`) is allocation-free.
176    ///
177    /// # Arguments
178    ///
179    /// * `dag` - A finalized `StreamingDag` topology
180    #[must_use]
181    pub fn from_dag(dag: &StreamingDag) -> Self {
182        let slot_count = dag.nodes().keys().map(|n| n.0).max().map_or(0, |n| n + 1) as usize;
183
184        let routing = RoutingTable::from_dag(dag);
185
186        let mut operators = Vec::with_capacity(slot_count);
187        let mut runtimes = Vec::with_capacity(slot_count);
188        let mut input_queues = Vec::with_capacity(slot_count);
189        let mut sink_outputs = Vec::with_capacity(slot_count);
190        let mut node_types = Vec::with_capacity(slot_count);
191        let mut input_counts = vec![0usize; slot_count];
192
193        for _ in 0..slot_count {
194            operators.push(None);
195            runtimes.push(Some(NodeRuntime::default()));
196            input_queues.push(VecDeque::with_capacity(16));
197            sink_outputs.push(Vec::new());
198            node_types.push(DagNodeType::StatelessOperator);
199        }
200
201        // Populate node types and input counts from the DAG.
202        for node in dag.nodes().values() {
203            let idx = node.id.0 as usize;
204            if idx < slot_count {
205                node_types[idx] = node.node_type;
206                input_counts[idx] = dag.incoming_edge_count(node.id);
207            }
208        }
209
210        let watermark_tracker = DagWatermarkTracker::from_dag(dag);
211
212        Self {
213            operators,
214            runtimes,
215            input_queues,
216            sink_outputs,
217            routing,
218            execution_order: dag.execution_order().to_vec(),
219            source_nodes: dag.sources().to_vec(),
220            sink_nodes: dag.sinks().to_vec(),
221            node_types,
222            slot_count,
223            input_counts,
224            temp_events: Vec::with_capacity(64),
225            metrics: DagExecutorMetrics::default(),
226            watermark_tracker,
227            #[cfg(feature = "dag-metrics")]
228            node_metrics: (0..slot_count)
229                .map(|_| OperatorNodeMetrics::default())
230                .collect(),
231        }
232    }
233
234    /// Registers an operator for a node.
235    ///
236    /// Nodes without registered operators act as passthrough: events are
237    /// forwarded to downstream nodes unchanged. This is the default for
238    /// source and sink nodes.
239    ///
240    /// # Arguments
241    ///
242    /// * `node` - The node ID to register the operator for
243    /// * `operator` - The operator implementation
244    pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>) {
245        let idx = node.0 as usize;
246        if idx < self.slot_count {
247            self.operators[idx] = Some(operator);
248        }
249    }
250
251    /// Processes an event from a source node through the entire DAG.
252    ///
253    /// The event is enqueued at the source node, then all nodes are processed
254    /// in topological order. Events produced by operators are routed to
255    /// downstream nodes via the [`RoutingTable`]. Sink outputs are collected
256    /// and can be retrieved via [`take_sink_outputs()`](Self::take_sink_outputs).
257    ///
258    /// # Arguments
259    ///
260    /// * `source_node` - The source node to inject the event
261    /// * `event` - The event to process
262    ///
263    /// # Errors
264    ///
265    /// Returns [`DagError::NodeNotFound`] if the source node is out of bounds.
266    pub fn process_event(&mut self, source_node: NodeId, event: Event) -> Result<(), DagError> {
267        let idx = source_node.0 as usize;
268        if idx >= self.slot_count {
269            return Err(DagError::NodeNotFound(format!("{source_node}")));
270        }
271
272        self.input_queues[idx].push_back(event);
273        self.process_dag();
274        Ok(())
275    }
276
277    /// Advances watermark generators for all nodes in topological order.
278    ///
279    /// Called when an external source provides a watermark (e.g., via
280    /// `Source::watermark()`). Propagates the watermark through the DAG
281    /// so downstream operators can use it for late-data filtering and
282    /// window triggering.
283    ///
284    /// # Arguments
285    ///
286    /// * `source_node` - The source node that originated the watermark
287    /// * `watermark` - The watermark timestamp in milliseconds
288    ///
289    /// # Errors
290    ///
291    /// Returns [`DagError::NodeNotFound`] if `source_node` is out of bounds.
292    pub fn process_watermark(
293        &mut self,
294        source_node: NodeId,
295        watermark: i64,
296    ) -> Result<(), DagError> {
297        let idx = source_node.0 as usize;
298        if idx >= self.slot_count {
299            return Err(DagError::NodeNotFound(format!("{source_node}")));
300        }
301
302        // Use topology-aware propagation: only advance downstream nodes.
303        // Direct field access avoids collecting into a SmallVec.
304        for &(node_id, wm) in self
305            .watermark_tracker
306            .update_watermark(source_node, watermark)
307        {
308            let nidx = node_id.0 as usize;
309            if nidx < self.slot_count {
310                if let Some(ref mut rt) = self.runtimes[nidx] {
311                    rt.watermark_generator.advance_watermark(wm);
312                }
313            }
314        }
315
316        // Fire timers for any nodes whose watermark advanced
317        self.fire_timers(watermark);
318
319        #[cfg(feature = "dag-metrics")]
320        {
321            self.metrics.watermarks_processed += 1;
322        }
323        Ok(())
324    }
325
326    /// Fires all expired timers for each node in topological order.
327    ///
328    /// Polls each node's [`TimerService`] for timers with
329    /// `timestamp <= current_time`, calls [`Operator::on_timer()`] for each
330    /// fired timer, and routes the resulting outputs downstream.
331    ///
332    /// This is used to trigger window closures after watermark advancement.
333    ///
334    /// # Arguments
335    ///
336    /// * `current_time` - The current event-time (typically the latest watermark)
337    pub fn fire_timers(&mut self, current_time: i64) {
338        for i in 0..self.execution_order.len() {
339            let node_id = self.execution_order[i];
340            let idx = node_id.0 as usize;
341            if idx >= self.slot_count {
342                continue;
343            }
344
345            let mut operator = self.operators[idx].take();
346            let mut runtime = self.runtimes[idx].take();
347
348            if let (Some(op), Some(rt)) = (&mut operator, &mut runtime) {
349                let fired = rt.timer_service.poll_timers(current_time);
350                for reg in fired {
351                    let timer = Timer {
352                        key: reg.key.unwrap_or_default(),
353                        timestamp: reg.timestamp,
354                    };
355                    let mut ctx = OperatorContext {
356                        event_time: reg.timestamp,
357                        processing_time: current_time,
358                        timers: &mut rt.timer_service,
359                        state: rt.state_store.as_mut(),
360                        watermark_generator: rt.watermark_generator.as_mut(),
361                        operator_index: idx,
362                    };
363                    let outputs = op.on_timer(timer, &mut ctx);
364                    self.route_all_outputs(node_id, outputs);
365                }
366            }
367
368            self.operators[idx] = operator;
369            self.runtimes[idx] = runtime;
370        }
371
372        // Process any events generated by timer firings through the rest of the DAG.
373        self.process_dag();
374    }
375
376    /// Takes collected sink outputs for a given sink node.
377    ///
378    /// Returns all events that reached this sink during prior
379    /// `process_event` calls, draining the internal buffer.
380    #[must_use]
381    pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event> {
382        let idx = sink_node.0 as usize;
383        if idx < self.slot_count {
384            std::mem::take(&mut self.sink_outputs[idx])
385        } else {
386            Vec::new()
387        }
388    }
389
390    /// Takes all sink outputs across all sink nodes.
391    #[must_use]
392    pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>> {
393        let mut outputs = FxHashMap::default();
394        // Index-based iteration avoids collecting sink_nodes into a SmallVec.
395        for i in 0..self.sink_nodes.len() {
396            let sink_id = self.sink_nodes[i];
397            let idx = sink_id.0 as usize;
398            if idx < self.slot_count {
399                let events = std::mem::take(&mut self.sink_outputs[idx]);
400                if !events.is_empty() {
401                    outputs.insert(sink_id, events);
402                }
403            }
404        }
405        outputs
406    }
407
408    /// Returns a reference to the executor metrics.
409    #[must_use]
410    pub fn metrics(&self) -> &DagExecutorMetrics {
411        &self.metrics
412    }
413
414    /// Returns per-operator node metrics (only available with `dag-metrics` feature).
415    #[cfg(feature = "dag-metrics")]
416    #[must_use]
417    pub fn node_metrics(&self) -> &[OperatorNodeMetrics] {
418        &self.node_metrics
419    }
420
421    /// Resets all executor metrics to zero.
422    pub fn reset_metrics(&mut self) {
423        self.metrics = DagExecutorMetrics::default();
424        #[cfg(feature = "dag-metrics")]
425        {
426            for m in &mut self.node_metrics {
427                *m = OperatorNodeMetrics::default();
428            }
429        }
430    }
431
432    /// Returns the source node IDs.
433    #[must_use]
434    pub fn source_nodes(&self) -> &[NodeId] {
435        &self.source_nodes
436    }
437
438    /// Returns the sink node IDs.
439    #[must_use]
440    pub fn sink_nodes(&self) -> &[NodeId] {
441        &self.sink_nodes
442    }
443
444    /// Returns the node type for a given node ID.
445    #[must_use]
446    pub fn node_type(&self, node: NodeId) -> Option<DagNodeType> {
447        let idx = node.0 as usize;
448        if idx < self.slot_count {
449            Some(self.node_types[idx])
450        } else {
451            None
452        }
453    }
454
455    /// Checkpoints all registered operators.
456    ///
457    /// Returns a map of `NodeId` to `OperatorState` for all nodes
458    /// that have registered operators.
459    #[must_use]
460    pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState> {
461        let mut states = FxHashMap::default();
462        for (idx, op) in self.operators.iter().enumerate() {
463            if let Some(operator) = op {
464                #[allow(clippy::cast_possible_truncation)]
465                // DAG node count bounded by topology (< u32::MAX)
466                let node_id = NodeId(idx as u32);
467                states.insert(node_id, operator.checkpoint());
468            }
469        }
470        states
471    }
472
473    /// Restores operator state from a checkpoint snapshot.
474    ///
475    /// Iterates the provided states and calls `operator.restore()` on each
476    /// registered operator.
477    ///
478    /// # Errors
479    ///
480    /// Returns [`DagError::RestoreFailed`] if any operator fails to restore.
481    pub fn restore(&mut self, states: &FxHashMap<NodeId, OperatorState>) -> Result<(), DagError> {
482        for (node_id, state) in states {
483            let idx = node_id.0 as usize;
484            if idx < self.slot_count {
485                if let Some(ref mut operator) = self.operators[idx] {
486                    operator
487                        .restore(state.clone())
488                        .map_err(|e| DagError::RestoreFailed {
489                            node_id: format!("{node_id}"),
490                            reason: e.to_string(),
491                        })?;
492                }
493            }
494        }
495        Ok(())
496    }
497
498    /// Checkpoints the watermark tracker state.
499    #[must_use]
500    pub fn checkpoint_watermarks(&self) -> DagWatermarkCheckpoint {
501        self.watermark_tracker.checkpoint()
502    }
503
504    /// Restores watermark tracker state from a checkpoint.
505    pub fn restore_watermarks(&mut self, checkpoint: &DagWatermarkCheckpoint) {
506        self.watermark_tracker.restore(checkpoint);
507    }
508
509    /// Injects events into a node's input queue.
510    ///
511    /// Used during recovery to repopulate queues with buffered events.
512    pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>) {
513        let idx = node_id.0 as usize;
514        if idx < self.slot_count {
515            self.input_queues[idx].extend(events);
516        }
517    }
518
519    /// Returns the number of incoming edges for a node.
520    #[must_use]
521    pub fn input_count(&self, node_id: NodeId) -> usize {
522        let idx = node_id.0 as usize;
523        if idx < self.slot_count {
524            self.input_counts[idx]
525        } else {
526            0
527        }
528    }
529
530    /// Snapshots all registered operators in topological order.
531    ///
532    /// Takes the barrier for consistency (future use with epoch tracking).
533    /// In the synchronous single-threaded executor, topological ordering
534    /// guarantees upstream-first snapshots.
535    #[must_use]
536    pub fn process_checkpoint_barrier(
537        &mut self,
538        _barrier: &CheckpointBarrier,
539    ) -> FxHashMap<NodeId, OperatorState> {
540        let mut states = FxHashMap::default();
541        for &node_id in &self.execution_order {
542            let idx = node_id.0 as usize;
543            if idx < self.slot_count {
544                if let Some(ref operator) = self.operators[idx] {
545                    states.insert(node_id, operator.checkpoint());
546                }
547            }
548        }
549        states
550    }
551
552    /// Processes all nodes in topological order.
553    ///
554    /// Drains input queues, dispatches to operators, and routes outputs
555    /// to downstream nodes. Uses [`HotPathGuard`] for zero-allocation
556    /// enforcement in debug builds.
557    fn process_dag(&mut self) {
558        let _guard = HotPathGuard::enter("dag_executor");
559
560        let order_len = self.execution_order.len();
561        for i in 0..order_len {
562            let node_id = self.execution_order[i];
563            self.process_node(node_id);
564        }
565    }
566
567    /// Processes a single node: drains its input queue, dispatches each event
568    /// to the operator, and routes outputs downstream.
569    fn process_node(&mut self, node_id: NodeId) {
570        let idx = node_id.0 as usize;
571
572        if self.input_queues[idx].is_empty() {
573            #[cfg(feature = "dag-metrics")]
574            {
575                self.metrics.nodes_skipped += 1;
576            }
577            return;
578        }
579
580        // Swap temp buffer out of self so the borrow checker allows
581        // mutable access to other fields during the loop.
582        let mut events = std::mem::take(&mut self.temp_events);
583        events.clear();
584        events.extend(self.input_queues[idx].drain(..));
585
586        // Take operator and runtime out temporarily.
587        // This lets us mutably access the rest of `self` for routing.
588        let mut operator = self.operators[idx].take();
589        let mut runtime = self.runtimes[idx].take();
590
591        for event in events.drain(..) {
592            #[cfg(feature = "dag-metrics")]
593            {
594                self.metrics.events_processed += 1;
595                self.node_metrics[idx].events_in += 1;
596            }
597
598            #[cfg(feature = "dag-metrics")]
599            let start = std::time::Instant::now();
600
601            let outputs = if let Some(op) = &mut operator {
602                if let Some(rt) = &mut runtime {
603                    let mut ctx = OperatorContext {
604                        event_time: event.timestamp,
605                        processing_time: 0,
606                        timers: &mut rt.timer_service,
607                        state: rt.state_store.as_mut(),
608                        watermark_generator: rt.watermark_generator.as_mut(),
609                        operator_index: idx,
610                    };
611                    op.process(&event, &mut ctx)
612                } else {
613                    passthrough_output(event)
614                }
615            } else {
616                passthrough_output(event)
617            };
618
619            #[cfg(feature = "dag-metrics")]
620            {
621                #[allow(clippy::cast_possible_truncation)]
622                let elapsed = start.elapsed().as_nanos() as u64;
623                self.node_metrics[idx].total_time_ns += elapsed;
624                self.node_metrics[idx].invocations += 1;
625            }
626
627            // Route all output variants to downstream nodes.
628            self.route_all_outputs(node_id, outputs);
629        }
630
631        // Put operator and runtime back.
632        self.operators[idx] = operator;
633        self.runtimes[idx] = runtime;
634        self.temp_events = events;
635    }
636
637    /// Routes all output variants from a source node.
638    ///
639    /// Handles each [`Output`] variant:
640    /// - `Event` → route to downstream nodes via [`route_output()`]
641    /// - `Watermark` → feed into watermark tracker, advance downstream runtimes
642    /// - `LateEvent` → collect in sink outputs for the source node
643    /// - `SideOutput` → collect in sink outputs for the source node
644    /// - `Changelog` → collect changelog event in sink outputs
645    /// - `CheckpointComplete` → no-op (consumed by coordinator)
646    fn route_all_outputs(&mut self, source: NodeId, outputs: OutputVec) {
647        use crate::operator::window::ChangelogRecord;
648
649        for output in outputs {
650            match output {
651                Output::Event(out_event) => {
652                    #[cfg(feature = "dag-metrics")]
653                    {
654                        let sidx = source.0 as usize;
655                        if sidx < self.slot_count {
656                            self.node_metrics[sidx].events_out += 1;
657                        }
658                    }
659                    self.route_output(source, out_event);
660                }
661                Output::Watermark(wm) => {
662                    for &(node_id, new_wm) in self.watermark_tracker.update_watermark(source, wm) {
663                        let nidx = node_id.0 as usize;
664                        if nidx < self.slot_count {
665                            if let Some(ref mut rt) = self.runtimes[nidx] {
666                                rt.watermark_generator.advance_watermark(new_wm);
667                            }
668                        }
669                    }
670                }
671                Output::LateEvent(late_event) => {
672                    self.sink_outputs[source.0 as usize].push(late_event);
673                }
674                Output::SideOutput(data) => {
675                    let SideOutputData { event, .. } = *data;
676                    self.sink_outputs[source.0 as usize].push(event);
677                }
678                Output::Changelog(ChangelogRecord { event, .. }) => {
679                    self.sink_outputs[source.0 as usize].push(event);
680                }
681                Output::CheckpointComplete(_) | Output::Barrier(_) => {
682                    // No-op: consumed by checkpoint coordinator
683                }
684            }
685        }
686    }
687
688    /// Routes an output event from a source node to its downstream targets.
689    ///
690    /// - **Terminal (sink)**: event is collected in `sink_outputs`.
691    /// - **Single target**: event is enqueued directly (no clone).
692    /// - **Multicast**: event is cloned to N-1 targets, moved to the last.
693    fn route_output(&mut self, source: NodeId, event: Event) {
694        let entry = self.routing.node_targets(source);
695
696        if entry.is_terminal() {
697            // Sink node: collect output.
698            self.sink_outputs[source.0 as usize].push(event);
699            return;
700        }
701
702        #[cfg(feature = "dag-metrics")]
703        {
704            self.metrics.events_routed += 1;
705        }
706
707        if entry.is_multicast {
708            #[cfg(feature = "dag-metrics")]
709            {
710                self.metrics.multicast_publishes += 1;
711            }
712            let targets = entry.target_ids();
713
714            // Clone to all targets except the last, which gets the moved value.
715            for &target_id in &targets[..targets.len() - 1] {
716                self.input_queues[target_id as usize].push_back(event.clone());
717            }
718            self.input_queues[targets[targets.len() - 1] as usize].push_back(event);
719        } else {
720            // Single target: enqueue directly (zero-copy move).
721            self.input_queues[entry.targets[0] as usize].push_back(event);
722        }
723    }
724}
725
726impl std::fmt::Debug for DagExecutor {
727    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728        f.debug_struct("DagExecutor")
729            .field("slot_count", &self.slot_count)
730            .field("source_nodes", &self.source_nodes)
731            .field("sink_nodes", &self.sink_nodes)
732            .field("execution_order", &self.execution_order)
733            .field("metrics", &self.metrics)
734            .finish_non_exhaustive()
735    }
736}
737
738/// Creates a passthrough output (forwards the event unchanged).
739#[inline]
740fn passthrough_output(event: Event) -> OutputVec {
741    let mut v = OutputVec::new();
742    v.push(Output::Event(event));
743    v
744}