Skip to main content

Module dag

Module dag 

Source
Expand description

§DAG Pipeline Topology

Directed Acyclic Graph (DAG) topology data structures for complex streaming workflows with fan-out, fan-in, and shared intermediate stages.

§Overview

This module provides the topology layer for DAG pipelines:

  • StreamingDag: The complete DAG topology with topological ordering
  • DagBuilder: Fluent builder API for programmatic DAG construction
  • DagNode / DagEdge: Adjacency list representation
  • DagChannelType: Auto-derived channel types (SPSC/SPMC/MPSC)
  • MulticastBuffer: Zero-copy SPMC multicast for shared stages
  • RoutingTable: Pre-computed O(1) dispatch table
  • DagExecutor: Ring 0 event processing engine

§Key Design Principles

  1. Channel type is auto-derived - SPSC/SPMC/MPSC inferred from topology
  2. Cycle detection - Rejected at construction time
  3. Schema validation - Connected edges must have compatible schemas
  4. Immutable once finalized - Topology is frozen after build()
  5. Zero-alloc hot path - Multicast and routing are allocation-free

§Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     RING 2: CONTROL PLANE                       │
│  DagBuilder constructs StreamingDag topology                    │
│  ┌──────────┐   ┌──────────────┐   ┌───────────────────┐       │
│  │DagBuilder│──▶│ StreamingDag │──▶│ RoutingTable      │       │
│  │ (Ring 2) │   │  (immutable) │   │ (cache-aligned)   │       │
│  └──────────┘   └──────────────┘   └───────────────────┘       │
│                                                                 │
│  MulticastBuffer<T> per shared stage (pre-allocated slots)      │
└─────────────────────────────────────────────────────────────────┘

§Example

use laminar_core::dag::{DagBuilder, RoutingTable};

let dag = DagBuilder::new()
    .source("trades", schema.clone())
    .operator("normalize", schema.clone())
    .connect("trades", "normalize")
    .fan_out("normalize", |b| {
        b.branch("vwap", schema.clone())
         .branch("anomaly", schema.clone())
    })
    .sink_for("vwap", "analytics", schema.clone())
    .sink_for("anomaly", "alerts", schema.clone())
    .build()?;

let routing = RoutingTable::from_dag(&dag);
assert_eq!(dag.node_count(), 5);
assert_eq!(dag.sources().len(), 1);
assert_eq!(dag.sinks().len(), 2);

Re-exports§

pub use builder::DagBuilder;
pub use builder::FanOutBuilder;
pub use changelog::DagChangelogPropagator;
pub use checkpoint::AlignmentResult;
pub use checkpoint::BarrierAligner;
pub use checkpoint::BarrierType;
pub use checkpoint::CheckpointBarrier;
pub use checkpoint::CheckpointId;
pub use checkpoint::DagCheckpointConfig;
pub use checkpoint::DagCheckpointCoordinator;
pub use error::DagError;
pub use executor::OperatorNodeMetrics;
pub use executor::DagExecutor;
pub use executor::DagExecutorMetrics;
pub use multicast::MulticastBuffer;
pub use recovery::DagCheckpointSnapshot;
pub use recovery::DagRecoveryManager;
pub use recovery::RecoveredDagState;
pub use recovery::SerializableOperatorState;
pub use routing::RoutingEntry;
pub use routing::RoutingTable;
pub use routing::MAX_PORTS;
pub use topology::DagChannelType;
pub use topology::DagEdge;
pub use topology::DagNode;
pub use topology::DagNodeType;
pub use topology::EdgeId;
pub use topology::NodeId;
pub use topology::PartitioningStrategy;
pub use topology::SharedStageMetadata;
pub use topology::StatePartitionId;
pub use topology::StreamingDag;
pub use topology::MAX_FAN_OUT;
pub use watermark::DagWatermarkCheckpoint;
pub use watermark::DagWatermarkTracker;

Modules§

builder
DAG builder API for programmatic topology construction.
changelog
DAG-native changelog propagation.
checkpoint
Chandy-Lamport barrier checkpointing for DAG pipelines.
error
Error types for DAG topology operations.
executor
Ring 0 DAG executor for event processing.
multicast
Zero-copy multicast buffer for shared intermediate stages.
recovery
Snapshot and recovery management for DAG checkpoints.
routing
Pre-computed routing table for O(1) hot path dispatch.
topology
DAG topology data structures.
watermark
DAG-native watermark tracking.