laminar_core/shuffle/message.rs
1//! Logical messages carried on a shuffle stream.
2//!
3//! The wire encoding is gRPC/protobuf (`proto/shuffle.proto`, `ShuffleFrame`);
4//! the `<->` conversion lives in [`super::transport`]. A `VnodeData`'s batch is
5//! Arrow IPC-encoded with a per-stage streaming encoder (see
6//! [`crate::serialization::BatchStreamEncoder`]): the schema rides only the first
7//! frame of each stage and later frames are schema-less continuations. This
8//! assumes a stage's schema is stable for the life of a connection.
9
10use arrow_array::RecordBatch;
11
12use crate::checkpoint::barrier::CheckpointBarrier;
13
14/// Maximum Arrow IPC payload accepted for a single `VnodeData` frame: 64 MiB.
15pub const MAX_PAYLOAD_BYTES: usize = 64 * 1024 * 1024;
16
17/// Logical message carried on a shuffle connection.
18#[derive(Debug, Clone, PartialEq)]
19pub enum ShuffleMessage {
20 /// A checkpoint barrier (Chandy-Lamport).
21 Barrier(CheckpointBarrier),
22 /// Peer identifying itself during the connection handshake.
23 Hello(u64),
24 /// A batch of rows pre-routed to `vnode`, tagged with the logical `stage`
25 /// (the operator / MV name) it belongs to. The stage lets a receiver shared
26 /// by multiple sharded operators demux frames to the correct one instead of
27 /// cross-feeding them.
28 VnodeData(String, u32, RecordBatch),
29 /// Sender announcing graceful shutdown with a brief reason.
30 Close(String),
31}