laminar_core/operator/
mod.rs1use std::sync::Arc;
4
5use arrow_array::RecordBatch;
6use smallvec::SmallVec;
7
8pub type TimerKey = SmallVec<[u8; 16]>;
10
11#[derive(Debug, Clone)]
13pub struct Event {
14 pub timestamp: i64,
16 pub data: Arc<RecordBatch>,
18}
19
20impl Event {
21 #[must_use]
23 pub fn new(timestamp: i64, data: RecordBatch) -> Self {
24 Self {
25 timestamp,
26 data: Arc::new(data),
27 }
28 }
29}
30
31#[derive(Debug, Clone)]
33pub struct OperatorState {
34 pub operator_id: String,
36 pub version: u32,
38 pub data: Vec<u8>,
40}
41
42impl OperatorState {
43 #[must_use]
45 pub fn v1(operator_id: String, data: Vec<u8>) -> Self {
46 Self {
47 operator_id,
48 version: 1,
49 data,
50 }
51 }
52}
53
54#[derive(Debug, thiserror::Error)]
56pub enum OperatorError {
57 #[error("State access failed: {0}")]
59 StateAccessFailed(String),
60
61 #[error("Serialization failed: {0}")]
63 SerializationFailed(String),
64
65 #[error("Processing failed: {0}")]
67 ProcessingFailed(String),
68
69 #[error("Configuration error: {0}")]
71 ConfigError(String),
72}
73
74impl From<arrow_schema::ArrowError> for OperatorError {
75 fn from(e: arrow_schema::ArrowError) -> Self {
76 Self::SerializationFailed(e.to_string())
77 }
78}
79
80pub mod sliding_window;
81pub mod table_cache;
82pub mod window;
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use arrow_array::{Int64Array, RecordBatch};
88 use std::sync::Arc;
89
90 #[test]
91 fn test_event_creation() {
92 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
93 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
94
95 let event = Event::new(12345, batch);
96
97 assert_eq!(event.timestamp, 12345);
98 assert_eq!(event.data.num_rows(), 3);
99 }
100}