laminar_core/dag/mod.rs
1//! # DAG Pipeline Topology
2//!
3//! Directed Acyclic Graph (DAG) topology data structures for complex streaming
4//! workflows with fan-out, fan-in, and shared intermediate stages.
5//!
6//! ## Overview
7//!
8//! This module provides the topology layer for DAG pipelines:
9//!
10//! - **`StreamingDag`**: The complete DAG topology with topological ordering
11//! - **`DagBuilder`**: Fluent builder API for programmatic DAG construction
12//! - **`DagNode`** / **`DagEdge`**: Adjacency list representation
13//! - **`DagChannelType`**: Auto-derived channel types (SPSC/SPMC/MPSC)
14//! - **`MulticastBuffer`**: Zero-copy SPMC multicast for shared stages
15//! - **`RoutingTable`**: Pre-computed O(1) dispatch table
16//! - **`DagExecutor`**: Ring 0 event processing engine
17//!
18//! ## Key Design Principles
19//!
20//! 1. **Channel type is auto-derived** - SPSC/SPMC/MPSC inferred from topology
21//! 2. **Cycle detection** - Rejected at construction time
22//! 3. **Schema validation** - Connected edges must have compatible schemas
23//! 4. **Immutable once finalized** - Topology is frozen after `build()`
24//! 5. **Zero-alloc hot path** - Multicast and routing are allocation-free
25//!
26//! ## Architecture
27//!
28//! ```text
29//! ┌─────────────────────────────────────────────────────────────────┐
30//! │ RING 2: CONTROL PLANE │
31//! │ DagBuilder constructs StreamingDag topology │
32//! │ ┌──────────┐ ┌──────────────┐ ┌───────────────────┐ │
33//! │ │DagBuilder│──▶│ StreamingDag │──▶│ RoutingTable │ │
34//! │ │ (Ring 2) │ │ (immutable) │ │ (cache-aligned) │ │
35//! │ └──────────┘ └──────────────┘ └───────────────────┘ │
36//! │ │
37//! │ MulticastBuffer<T> per shared stage (pre-allocated slots) │
38//! └─────────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! ## Example
42//!
43//! ```rust,ignore
44//! use laminar_core::dag::{DagBuilder, RoutingTable};
45//!
46//! let dag = DagBuilder::new()
47//! .source("trades", schema.clone())
48//! .operator("normalize", schema.clone())
49//! .connect("trades", "normalize")
50//! .fan_out("normalize", |b| {
51//! b.branch("vwap", schema.clone())
52//! .branch("anomaly", schema.clone())
53//! })
54//! .sink_for("vwap", "analytics", schema.clone())
55//! .sink_for("anomaly", "alerts", schema.clone())
56//! .build()?;
57//!
58//! let routing = RoutingTable::from_dag(&dag);
59//! assert_eq!(dag.node_count(), 5);
60//! assert_eq!(dag.sources().len(), 1);
61//! assert_eq!(dag.sinks().len(), 2);
62//! ```
63
64pub mod builder;
65pub mod changelog;
66pub mod checkpoint;
67pub mod error;
68pub mod executor;
69pub mod multicast;
70pub mod recovery;
71pub mod routing;
72pub mod topology;
73pub mod watermark;
74
75#[cfg(test)]
76mod tests;
77
78// Re-export key types
79pub use builder::{DagBuilder, FanOutBuilder};
80pub use changelog::DagChangelogPropagator;
81pub use checkpoint::{
82 AlignmentResult, BarrierAligner, BarrierType, CheckpointBarrier, CheckpointId,
83 DagCheckpointConfig, DagCheckpointCoordinator,
84};
85pub use error::DagError;
86#[cfg(feature = "dag-metrics")]
87pub use executor::OperatorNodeMetrics;
88pub use executor::{DagExecutor, DagExecutorMetrics};
89pub use multicast::MulticastBuffer;
90pub use recovery::{
91 DagCheckpointSnapshot, DagRecoveryManager, RecoveredDagState, SerializableOperatorState,
92};
93pub use routing::{RoutingEntry, RoutingTable, MAX_PORTS};
94pub use topology::{
95 DagChannelType, DagEdge, DagNode, DagNodeType, EdgeId, NodeId, PartitioningStrategy,
96 SharedStageMetadata, StatePartitionId, StreamingDag, MAX_FAN_OUT,
97};
98pub use watermark::{DagWatermarkCheckpoint, DagWatermarkTracker};