Skip to main content

laminar_core/dag/
mod.rs

1//! # DAG Pipeline Topology
2//!
3//! Directed Acyclic Graph (DAG) topology data structures for complex streaming
4//! workflows with fan-out, fan-in, and shared intermediate stages.
5//!
6//! ## Overview
7//!
8//! This module provides the topology layer for DAG pipelines:
9//!
10//! - **`StreamingDag`**: The complete DAG topology with topological ordering
11//! - **`DagBuilder`**: Fluent builder API for programmatic DAG construction
12//! - **`DagNode`** / **`DagEdge`**: Adjacency list representation
13//! - **`DagChannelType`**: Auto-derived channel types (SPSC/SPMC/MPSC)
14//! - **`MulticastBuffer`**: Zero-copy SPMC multicast for shared stages
15//! - **`RoutingTable`**: Pre-computed O(1) dispatch table
16//! - **`DagExecutor`**: Ring 0 event processing engine
17//!
18//! ## Key Design Principles
19//!
20//! 1. **Channel type is auto-derived** - SPSC/SPMC/MPSC inferred from topology
21//! 2. **Cycle detection** - Rejected at construction time
22//! 3. **Schema validation** - Connected edges must have compatible schemas
23//! 4. **Immutable once finalized** - Topology is frozen after `build()`
24//! 5. **Zero-alloc hot path** - Multicast and routing are allocation-free
25//!
26//! ## Architecture
27//!
28//! ```text
29//! ┌─────────────────────────────────────────────────────────────────┐
30//! │                     RING 2: CONTROL PLANE                       │
31//! │  DagBuilder constructs StreamingDag topology                    │
32//! │  ┌──────────┐   ┌──────────────┐   ┌───────────────────┐       │
33//! │  │DagBuilder│──▶│ StreamingDag │──▶│ RoutingTable      │       │
34//! │  │ (Ring 2) │   │  (immutable) │   │ (cache-aligned)   │       │
35//! │  └──────────┘   └──────────────┘   └───────────────────┘       │
36//! │                                                                 │
37//! │  MulticastBuffer<T> per shared stage (pre-allocated slots)      │
38//! └─────────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! ## Example
42//!
43//! ```rust,ignore
44//! use laminar_core::dag::{DagBuilder, RoutingTable};
45//!
46//! let dag = DagBuilder::new()
47//!     .source("trades", schema.clone())
48//!     .operator("normalize", schema.clone())
49//!     .connect("trades", "normalize")
50//!     .fan_out("normalize", |b| {
51//!         b.branch("vwap", schema.clone())
52//!          .branch("anomaly", schema.clone())
53//!     })
54//!     .sink_for("vwap", "analytics", schema.clone())
55//!     .sink_for("anomaly", "alerts", schema.clone())
56//!     .build()?;
57//!
58//! let routing = RoutingTable::from_dag(&dag);
59//! assert_eq!(dag.node_count(), 5);
60//! assert_eq!(dag.sources().len(), 1);
61//! assert_eq!(dag.sinks().len(), 2);
62//! ```
63
64pub mod builder;
65pub mod changelog;
66pub mod checkpoint;
67pub mod error;
68pub mod executor;
69pub mod multicast;
70pub mod recovery;
71pub mod routing;
72pub mod topology;
73pub mod watermark;
74
75#[cfg(test)]
76mod tests;
77
78// Re-export key types
79pub use builder::{DagBuilder, FanOutBuilder};
80pub use changelog::DagChangelogPropagator;
81pub use checkpoint::{
82    AlignmentResult, BarrierAligner, BarrierType, CheckpointBarrier, CheckpointId,
83    DagCheckpointConfig, DagCheckpointCoordinator,
84};
85pub use error::DagError;
86#[cfg(feature = "dag-metrics")]
87pub use executor::OperatorNodeMetrics;
88pub use executor::{DagExecutor, DagExecutorMetrics};
89pub use multicast::MulticastBuffer;
90pub use recovery::{
91    DagCheckpointSnapshot, DagRecoveryManager, RecoveredDagState, SerializableOperatorState,
92};
93pub use routing::{RoutingEntry, RoutingTable, MAX_PORTS};
94pub use topology::{
95    DagChannelType, DagEdge, DagNode, DagNodeType, EdgeId, NodeId, PartitioningStrategy,
96    SharedStageMetadata, StatePartitionId, StreamingDag, MAX_FAN_OUT,
97};
98pub use watermark::{DagWatermarkCheckpoint, DagWatermarkTracker};