Expand description
§DAG Pipeline Topology
Directed Acyclic Graph (DAG) topology data structures for complex streaming workflows with fan-out, fan-in, and shared intermediate stages.
§Overview
This module provides the topology layer for DAG pipelines:
StreamingDag: The complete DAG topology with topological orderingDagBuilder: Fluent builder API for programmatic DAG constructionDagNode/DagEdge: Adjacency list representationDagChannelType: Auto-derived channel types (SPSC/SPMC/MPSC)MulticastBuffer: Zero-copy SPMC multicast for shared stagesRoutingTable: Pre-computed O(1) dispatch tableDagExecutor: Ring 0 event processing engine
§Key Design Principles
- Channel type is auto-derived - SPSC/SPMC/MPSC inferred from topology
- Cycle detection - Rejected at construction time
- Schema validation - Connected edges must have compatible schemas
- Immutable once finalized - Topology is frozen after
build() - Zero-alloc hot path - Multicast and routing are allocation-free
§Architecture
┌─────────────────────────────────────────────────────────────────┐
│ RING 2: CONTROL PLANE │
│ DagBuilder constructs StreamingDag topology │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │DagBuilder│──▶│ StreamingDag │──▶│ RoutingTable │ │
│ │ (Ring 2) │ │ (immutable) │ │ (cache-aligned) │ │
│ └──────────┘ └──────────────┘ └───────────────────┘ │
│ │
│ MulticastBuffer<T> per shared stage (pre-allocated slots) │
└─────────────────────────────────────────────────────────────────┘§Example
ⓘ
use laminar_core::dag::{DagBuilder, RoutingTable};
let dag = DagBuilder::new()
.source("trades", schema.clone())
.operator("normalize", schema.clone())
.connect("trades", "normalize")
.fan_out("normalize", |b| {
b.branch("vwap", schema.clone())
.branch("anomaly", schema.clone())
})
.sink_for("vwap", "analytics", schema.clone())
.sink_for("anomaly", "alerts", schema.clone())
.build()?;
let routing = RoutingTable::from_dag(&dag);
assert_eq!(dag.node_count(), 5);
assert_eq!(dag.sources().len(), 1);
assert_eq!(dag.sinks().len(), 2);Re-exports§
pub use builder::DagBuilder;pub use builder::FanOutBuilder;pub use changelog::DagChangelogPropagator;pub use checkpoint::AlignmentResult;pub use checkpoint::BarrierAligner;pub use checkpoint::BarrierType;pub use checkpoint::CheckpointBarrier;pub use checkpoint::CheckpointId;pub use checkpoint::DagCheckpointConfig;pub use checkpoint::DagCheckpointCoordinator;pub use error::DagError;pub use executor::OperatorNodeMetrics;pub use executor::DagExecutor;pub use executor::DagExecutorMetrics;pub use multicast::MulticastBuffer;pub use recovery::DagCheckpointSnapshot;pub use recovery::DagRecoveryManager;pub use recovery::RecoveredDagState;pub use recovery::SerializableOperatorState;pub use routing::RoutingEntry;pub use routing::RoutingTable;pub use routing::MAX_PORTS;pub use topology::DagChannelType;pub use topology::DagEdge;pub use topology::DagNode;pub use topology::DagNodeType;pub use topology::EdgeId;pub use topology::NodeId;pub use topology::PartitioningStrategy;pub use topology::StatePartitionId;pub use topology::StreamingDag;pub use topology::MAX_FAN_OUT;pub use watermark::DagWatermarkCheckpoint;pub use watermark::DagWatermarkTracker;
Modules§
- builder
- DAG builder API for programmatic topology construction.
- changelog
- DAG-native changelog propagation.
- checkpoint
- Chandy-Lamport barrier checkpointing for DAG pipelines.
- error
- Error types for DAG topology operations.
- executor
- Ring 0 DAG executor for event processing.
- multicast
- Zero-copy multicast buffer for shared intermediate stages.
- recovery
- Snapshot and recovery management for DAG checkpoints.
- routing
- Pre-computed routing table for O(1) hot path dispatch.
- topology
- DAG topology data structures.
- watermark
- DAG-native watermark tracking.