Skip to main content

laminar_core/operator/
mod.rs

1//! # Operator Module
2//!
3//! Streaming operators for transforming and processing events.
4//!
5//! ## Operator Types
6//!
7//! - **Stateless**: map, filter, flatmap
8//! - **Stateful**: window, aggregate, join
9//!
10//! All operators implement the `Operator` trait and can be composed into
11//! directed acyclic graphs (DAGs) for complex stream processing.
12
13use std::sync::Arc;
14
15use arrow_array::RecordBatch;
16use smallvec::SmallVec;
17
18/// Timer key type optimized for window IDs (16 bytes).
19/// Re-exported from time module for convenience.
20pub type TimerKey = SmallVec<[u8; 16]>;
21
22/// An event flowing through the system
23#[derive(Debug, Clone)]
24pub struct Event {
25    /// Timestamp of the event
26    pub timestamp: i64,
27    /// Event payload as Arrow `RecordBatch` wrapped in `Arc` for zero-copy multicast.
28    ///
29    /// Cloning an `Event` increments the `Arc` reference count (~2ns, O(1))
30    /// instead of copying all column `Arc` pointers (O(columns)).
31    pub data: Arc<RecordBatch>,
32}
33
34impl Event {
35    /// Create a new event, wrapping the batch in `Arc` for zero-copy sharing.
36    #[must_use]
37    pub fn new(timestamp: i64, data: RecordBatch) -> Self {
38        Self {
39            timestamp,
40            data: Arc::new(data),
41        }
42    }
43}
44
45/// Output from an operator
46///
47/// Infrequent variants (`SideOutput`, `CheckpointComplete`) are boxed
48/// to keep the enum size small for the common hot-path variants (`Event`, `Watermark`).
49#[derive(Debug)]
50pub enum Output {
51    /// Regular event output
52    Event(Event),
53    /// Watermark update
54    Watermark(i64),
55    /// Late event that arrived after watermark (no side output configured)
56    LateEvent(Event),
57    /// Late event routed to a named side output (boxed — infrequent path).
58    SideOutput(Box<SideOutputData>),
59    /// Changelog record with Z-set weight.
60    ///
61    /// Used by `EmitStrategy::Changelog` to emit structured change records
62    /// for CDC pipelines and cascading materialized views.
63    Changelog(window::ChangelogRecord),
64    /// Checkpoint completion with snapshotted operator states (boxed — infrequent path).
65    ///
66    /// Emitted when a `CheckpointRequest` is processed by a core thread.
67    /// Carries the checkpoint ID and all operator states for persistence by Ring 1.
68    CheckpointComplete(Box<CheckpointCompleteData>),
69    /// Checkpoint barrier forwarded from a core thread.
70    ///
71    /// When a core receives a `CoreMessage::Barrier`, it flushes the reactor
72    /// and forwards the barrier as an output so the coordinator can track
73    /// barrier alignment across sources.
74    ///
75    /// `CheckpointBarrier` is 24 bytes (`#[repr(C)]`, `Copy`), small enough
76    /// to store inline without boxing.
77    Barrier(crate::checkpoint::CheckpointBarrier),
78}
79
80/// Data for a late event routed to a named side output.
81///
82/// Boxed inside [`Output::SideOutput`] to reduce enum size on the hot path.
83#[derive(Debug)]
84pub struct SideOutputData {
85    /// The name of the side output to route to.
86    ///
87    /// Uses `Arc<str>` to avoid per-event String allocation — the name is
88    /// typically shared across all late events for a given operator.
89    pub name: Arc<str>,
90    /// The late event
91    pub event: Event,
92}
93
94/// Data for a checkpoint completion.
95///
96/// Boxed inside [`Output::CheckpointComplete`] to reduce enum size on the hot path.
97#[derive(Debug)]
98pub struct CheckpointCompleteData {
99    /// The checkpoint ID from the request
100    pub checkpoint_id: u64,
101    /// Snapshotted states from all operators on this core
102    pub operator_states: Vec<OperatorState>,
103}
104
105/// Collection type for operator outputs.
106///
107/// Uses `SmallVec` to avoid heap allocation for common cases (0-3 outputs).
108/// The size 4 is chosen based on typical operator patterns:
109/// - 0 outputs: filter that drops events
110/// - 1 output: most common case (map, regular processing)
111/// - 2 outputs: event + watermark
112/// - 3+ outputs: flatmap or window emission
113pub type OutputVec = SmallVec<[Output; 4]>;
114
115/// Context provided to operators during processing
116pub struct OperatorContext<'a> {
117    /// Current event time
118    pub event_time: i64,
119    /// Current processing time (system time in microseconds)
120    pub processing_time: i64,
121    /// Timer registration
122    pub timers: &'a mut crate::time::TimerService,
123    /// State store access
124    pub state: &'a mut dyn crate::state::StateStore,
125    /// Watermark generator
126    pub watermark_generator: &'a mut dyn crate::time::WatermarkGenerator,
127    /// Operator index in the chain
128    pub operator_index: usize,
129}
130
131/// Trait implemented by all streaming operators
132pub trait Operator: Send {
133    /// Process an incoming event
134    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec;
135
136    /// Handle timer expiration
137    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec;
138
139    /// Checkpoint the operator's state
140    fn checkpoint(&self) -> OperatorState;
141
142    /// Restore from a checkpoint
143    ///
144    /// # Errors
145    ///
146    /// Returns `OperatorError::StateAccessFailed` if the state cannot be accessed
147    /// Returns `OperatorError::SerializationFailed` if the state cannot be deserialized
148    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>;
149}
150
151/// A timer registration
152#[derive(Debug, Clone)]
153pub struct Timer {
154    /// Timer key (uses `SmallVec` to avoid heap allocation for keys up to 16 bytes)
155    pub key: TimerKey,
156    /// Expiration timestamp
157    pub timestamp: i64,
158}
159
160/// Serialized operator state for checkpointing
161#[derive(Debug, Clone)]
162pub struct OperatorState {
163    /// Operator ID
164    pub operator_id: String,
165    /// Serialized state data
166    pub data: Vec<u8>,
167}
168
169/// Errors that can occur in operators
170#[derive(Debug, thiserror::Error)]
171pub enum OperatorError {
172    /// State access error
173    #[error("State access failed: {0}")]
174    StateAccessFailed(String),
175
176    /// Serialization error
177    #[error("Serialization failed: {0}")]
178    SerializationFailed(String),
179
180    /// Processing error
181    #[error("Processing failed: {0}")]
182    ProcessingFailed(String),
183
184    /// Configuration error (e.g., missing required builder field)
185    #[error("Configuration error: {0}")]
186    ConfigError(String),
187}
188
189impl From<arrow_schema::ArrowError> for OperatorError {
190    fn from(e: arrow_schema::ArrowError) -> Self {
191        Self::SerializationFailed(e.to_string())
192    }
193}
194
195#[cfg(feature = "jit")]
196impl From<datafusion_common::DataFusionError> for OperatorError {
197    fn from(e: datafusion_common::DataFusionError) -> Self {
198        Self::ProcessingFailed(e.to_string())
199    }
200}
201
202pub mod asof_join;
203pub mod changelog;
204pub mod lag_lead;
205pub mod partitioned_topk;
206pub mod session_window;
207pub mod sliding_window;
208pub mod stream_join;
209pub mod table_cache;
210pub mod temporal_join;
211pub mod topk;
212pub mod watermark_sort;
213pub mod window;
214pub mod window_sort;
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use arrow_array::{Int64Array, RecordBatch};
220    use std::sync::Arc;
221
222    #[test]
223    fn test_event_creation() {
224        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
225        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
226
227        let event = Event::new(12345, batch);
228
229        assert_eq!(event.timestamp, 12345);
230        assert_eq!(event.data.num_rows(), 3);
231    }
232}