laminar_core/operator/mod.rs
1//! # Operator Module
2//!
3//! Streaming operators for transforming and processing events.
4//!
5//! ## Operator Types
6//!
7//! - **Stateless**: map, filter, flatmap
8//! - **Stateful**: window, aggregate, join
9//!
10//! All operators implement the `Operator` trait and can be composed into
11//! directed acyclic graphs (DAGs) for complex stream processing.
12
13use std::sync::Arc;
14
15use arrow_array::RecordBatch;
16use smallvec::SmallVec;
17
18/// Timer key type optimized for window IDs (16 bytes).
19/// Re-exported from time module for convenience.
20pub type TimerKey = SmallVec<[u8; 16]>;
21
22/// An event flowing through the system
23#[derive(Debug, Clone)]
24pub struct Event {
25 /// Timestamp of the event
26 pub timestamp: i64,
27 /// Event payload as Arrow `RecordBatch` wrapped in `Arc` for zero-copy multicast.
28 ///
29 /// Cloning an `Event` increments the `Arc` reference count (~2ns, O(1))
30 /// instead of copying all column `Arc` pointers (O(columns)).
31 pub data: Arc<RecordBatch>,
32}
33
34impl Event {
35 /// Create a new event, wrapping the batch in `Arc` for zero-copy sharing.
36 #[must_use]
37 pub fn new(timestamp: i64, data: RecordBatch) -> Self {
38 Self {
39 timestamp,
40 data: Arc::new(data),
41 }
42 }
43}
44
45/// Output from an operator
46///
47/// Infrequent variants (`SideOutput`, `CheckpointComplete`) are boxed
48/// to keep the enum size small for the common hot-path variants (`Event`, `Watermark`).
49#[derive(Debug)]
50pub enum Output {
51 /// Regular event output
52 Event(Event),
53 /// Watermark update
54 Watermark(i64),
55 /// Late event that arrived after watermark (no side output configured)
56 LateEvent(Event),
57 /// Late event routed to a named side output (boxed — infrequent path).
58 SideOutput(Box<SideOutputData>),
59 /// Changelog record with Z-set weight.
60 ///
61 /// Used by `EmitStrategy::Changelog` to emit structured change records
62 /// for CDC pipelines and cascading materialized views.
63 Changelog(window::ChangelogRecord),
64 /// Checkpoint completion with snapshotted operator states (boxed — infrequent path).
65 ///
66 /// Emitted when a `CheckpointRequest` is processed by a core thread.
67 /// Carries the checkpoint ID and all operator states for persistence by Ring 1.
68 CheckpointComplete(Box<CheckpointCompleteData>),
69 /// Checkpoint barrier forwarded from a core thread.
70 ///
71 /// When a core receives a `CoreMessage::Barrier`, it flushes the reactor
72 /// and forwards the barrier as an output so the coordinator can track
73 /// barrier alignment across sources.
74 ///
75 /// `CheckpointBarrier` is 24 bytes (`#[repr(C)]`, `Copy`), small enough
76 /// to store inline without boxing.
77 Barrier(crate::checkpoint::CheckpointBarrier),
78}
79
80/// Data for a late event routed to a named side output.
81///
82/// Boxed inside [`Output::SideOutput`] to reduce enum size on the hot path.
83#[derive(Debug)]
84pub struct SideOutputData {
85 /// The name of the side output to route to.
86 ///
87 /// Uses `Arc<str>` to avoid per-event String allocation — the name is
88 /// typically shared across all late events for a given operator.
89 pub name: Arc<str>,
90 /// The late event
91 pub event: Event,
92}
93
94/// Data for a checkpoint completion.
95///
96/// Boxed inside [`Output::CheckpointComplete`] to reduce enum size on the hot path.
97#[derive(Debug)]
98pub struct CheckpointCompleteData {
99 /// The checkpoint ID from the request
100 pub checkpoint_id: u64,
101 /// Snapshotted states from all operators on this core
102 pub operator_states: Vec<OperatorState>,
103}
104
105/// Collection type for operator outputs.
106///
107/// Uses `SmallVec` to avoid heap allocation for common cases (0-3 outputs).
108/// The size 4 is chosen based on typical operator patterns:
109/// - 0 outputs: filter that drops events
110/// - 1 output: most common case (map, regular processing)
111/// - 2 outputs: event + watermark
112/// - 3+ outputs: flatmap or window emission
113pub type OutputVec = SmallVec<[Output; 4]>;
114
115/// Context provided to operators during processing
116pub struct OperatorContext<'a> {
117 /// Current event time
118 pub event_time: i64,
119 /// Current processing time (system time in microseconds)
120 pub processing_time: i64,
121 /// Timer registration
122 pub timers: &'a mut crate::time::TimerService,
123 /// State store access
124 pub state: &'a mut dyn crate::state::StateStore,
125 /// Watermark generator
126 pub watermark_generator: &'a mut dyn crate::time::WatermarkGenerator,
127 /// Operator index in the chain
128 pub operator_index: usize,
129}
130
131/// Trait implemented by all streaming operators
132pub trait Operator: Send {
133 /// Process an incoming event
134 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec;
135
136 /// Handle timer expiration
137 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec;
138
139 /// Checkpoint the operator's state
140 fn checkpoint(&self) -> OperatorState;
141
142 /// Restore from a checkpoint
143 ///
144 /// # Errors
145 ///
146 /// Returns `OperatorError::StateAccessFailed` if the state cannot be accessed
147 /// Returns `OperatorError::SerializationFailed` if the state cannot be deserialized
148 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>;
149}
150
151/// A timer registration
152#[derive(Debug, Clone)]
153pub struct Timer {
154 /// Timer key (uses `SmallVec` to avoid heap allocation for keys up to 16 bytes)
155 pub key: TimerKey,
156 /// Expiration timestamp
157 pub timestamp: i64,
158}
159
160/// Serialized operator state for checkpointing
161#[derive(Debug, Clone)]
162pub struct OperatorState {
163 /// Operator ID
164 pub operator_id: String,
165 /// Serialized state data
166 pub data: Vec<u8>,
167}
168
169/// Errors that can occur in operators
170#[derive(Debug, thiserror::Error)]
171pub enum OperatorError {
172 /// State access error
173 #[error("State access failed: {0}")]
174 StateAccessFailed(String),
175
176 /// Serialization error
177 #[error("Serialization failed: {0}")]
178 SerializationFailed(String),
179
180 /// Processing error
181 #[error("Processing failed: {0}")]
182 ProcessingFailed(String),
183
184 /// Configuration error (e.g., missing required builder field)
185 #[error("Configuration error: {0}")]
186 ConfigError(String),
187}
188
189impl From<arrow_schema::ArrowError> for OperatorError {
190 fn from(e: arrow_schema::ArrowError) -> Self {
191 Self::SerializationFailed(e.to_string())
192 }
193}
194
195#[cfg(feature = "jit")]
196impl From<datafusion_common::DataFusionError> for OperatorError {
197 fn from(e: datafusion_common::DataFusionError) -> Self {
198 Self::ProcessingFailed(e.to_string())
199 }
200}
201
202pub mod asof_join;
203pub mod changelog;
204pub mod lag_lead;
205pub mod partitioned_topk;
206pub mod session_window;
207pub mod sliding_window;
208pub mod stream_join;
209pub mod table_cache;
210pub mod temporal_join;
211pub mod topk;
212pub mod watermark_sort;
213pub mod window;
214pub mod window_sort;
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use arrow_array::{Int64Array, RecordBatch};
220 use std::sync::Arc;
221
222 #[test]
223 fn test_event_creation() {
224 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
225 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
226
227 let event = Event::new(12345, batch);
228
229 assert_eq!(event.timestamp, 12345);
230 assert_eq!(event.data.num_rows(), 3);
231 }
232}