Skip to main content

laminar_core/shuffle/
message.rs

1//! Logical messages carried on a shuffle stream.
2//!
3//! The wire encoding is gRPC/protobuf (`proto/shuffle.proto`, `ShuffleFrame`);
4//! the `<->` conversion lives in [`super::transport`]. A `VnodeData`'s batch is
5//! Arrow IPC-encoded with a per-stage streaming encoder (see
6//! [`crate::serialization::BatchStreamEncoder`]): the schema rides only the first
7//! frame of each stage and later frames are schema-less continuations. This
8//! assumes a stage's schema is stable for the life of a connection.
9
10use arrow_array::RecordBatch;
11
12use crate::checkpoint::barrier::CheckpointBarrier;
13
14/// Maximum Arrow IPC payload accepted for a single `VnodeData` frame: 64 MiB.
15pub const MAX_PAYLOAD_BYTES: usize = 64 * 1024 * 1024;
16
17/// Logical message carried on a shuffle connection.
18#[derive(Debug, Clone, PartialEq)]
19pub enum ShuffleMessage {
20    /// A checkpoint barrier (Chandy-Lamport).
21    Barrier(CheckpointBarrier),
22    /// Peer identifying itself during the connection handshake.
23    Hello(u64),
24    /// A batch of rows pre-routed to `vnode`, tagged with the logical `stage`
25    /// (the operator / MV name) it belongs to. The stage lets a receiver shared
26    /// by multiple sharded operators demux frames to the correct one instead of
27    /// cross-feeding them.
28    VnodeData(String, u32, RecordBatch),
29    /// Sender announcing graceful shutdown with a brief reason.
30    Close(String),
31}