Skip to main content

laminar_core/operator/
mod.rs

1//! Streaming operator types and window assigners for stream processing.
2
3use std::sync::Arc;
4
5use arrow_array::RecordBatch;
6use smallvec::SmallVec;
7
8/// Timer key type optimized for window IDs (16 bytes).
9pub type TimerKey = SmallVec<[u8; 16]>;
10
11/// An event flowing through the system.
12#[derive(Debug, Clone)]
13pub struct Event {
14    /// Timestamp of the event
15    pub timestamp: i64,
16    /// Event payload as Arrow `RecordBatch` wrapped in `Arc` for zero-copy multicast.
17    pub data: Arc<RecordBatch>,
18}
19
20impl Event {
21    /// Create a new event, wrapping the batch in `Arc` for zero-copy sharing.
22    #[must_use]
23    pub fn new(timestamp: i64, data: RecordBatch) -> Self {
24        Self {
25            timestamp,
26            data: Arc::new(data),
27        }
28    }
29}
30
31/// Serialized operator state for checkpointing.
32#[derive(Debug, Clone)]
33pub struct OperatorState {
34    /// Operator ID
35    pub operator_id: String,
36    /// State format version (for forward/backward compatibility detection)
37    pub version: u32,
38    /// Serialized state data
39    pub data: Vec<u8>,
40}
41
42impl OperatorState {
43    /// Create a version-1 operator state.
44    #[must_use]
45    pub fn v1(operator_id: String, data: Vec<u8>) -> Self {
46        Self {
47            operator_id,
48            version: 1,
49            data,
50        }
51    }
52}
53
54/// Errors that can occur in operators.
55#[derive(Debug, thiserror::Error)]
56pub enum OperatorError {
57    /// State access error
58    #[error("State access failed: {0}")]
59    StateAccessFailed(String),
60
61    /// Serialization error
62    #[error("Serialization failed: {0}")]
63    SerializationFailed(String),
64
65    /// Processing error
66    #[error("Processing failed: {0}")]
67    ProcessingFailed(String),
68
69    /// Configuration error (e.g., missing required builder field)
70    #[error("Configuration error: {0}")]
71    ConfigError(String),
72}
73
74impl From<arrow_schema::ArrowError> for OperatorError {
75    fn from(e: arrow_schema::ArrowError) -> Self {
76        Self::SerializationFailed(e.to_string())
77    }
78}
79
80pub mod sliding_window;
81pub mod table_cache;
82pub mod window;
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use arrow_array::{Int64Array, RecordBatch};
88    use std::sync::Arc;
89
90    #[test]
91    fn test_event_creation() {
92        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
93        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
94
95        let event = Event::new(12345, batch);
96
97        assert_eq!(event.timestamp, 12345);
98        assert_eq!(event.data.num_rows(), 3);
99    }
100}