1use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use arrow_schema::SchemaRef;
19use datafusion_expr::Expr;
20
21use super::row::RowSchema;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub struct PipelineId(pub u32);
26
27#[derive(Debug, Clone)]
29pub enum PipelineStage {
30 Filter {
32 predicate: Expr,
34 },
35 Project {
37 expressions: Vec<(Expr, String)>,
39 },
40 KeyExtract {
42 key_exprs: Vec<Expr>,
44 },
45}
46
47#[derive(Debug, Clone)]
52pub enum PipelineBreaker {
53 Aggregate {
55 group_exprs: Vec<Expr>,
57 aggr_exprs: Vec<Expr>,
59 },
60 Sort {
62 order_exprs: Vec<Expr>,
64 },
65 Join {
67 join_type: String,
69 left_keys: Vec<Expr>,
71 right_keys: Vec<Expr>,
73 },
74 Sink,
76}
77
78#[derive(Debug, Clone)]
80pub struct Pipeline {
81 pub id: PipelineId,
83 pub stages: Vec<PipelineStage>,
85 pub input_schema: SchemaRef,
87 pub output_schema: SchemaRef,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93#[repr(u8)]
94pub enum PipelineAction {
95 Drop = 0,
97 Emit = 1,
99 Error = 2,
101}
102
103impl PipelineAction {
104 #[must_use]
106 pub fn from_u8(v: u8) -> Self {
107 match v {
108 0 => Self::Drop,
109 1 => Self::Emit,
110 _ => Self::Error,
111 }
112 }
113}
114
115pub type PipelineFn = unsafe extern "C" fn(*const u8, *mut u8) -> u8;
123
124#[derive(Debug)]
126pub struct PipelineStats {
127 pub events_processed: AtomicU64,
129 pub events_emitted: AtomicU64,
131 pub events_dropped: AtomicU64,
133 pub total_ns: AtomicU64,
135}
136
137impl PipelineStats {
138 #[must_use]
140 pub fn new() -> Self {
141 Self {
142 events_processed: AtomicU64::new(0),
143 events_emitted: AtomicU64::new(0),
144 events_dropped: AtomicU64::new(0),
145 total_ns: AtomicU64::new(0),
146 }
147 }
148
149 pub fn record(&self, action: PipelineAction, elapsed_ns: u64) {
151 self.events_processed.fetch_add(1, Ordering::Relaxed);
152 match action {
153 PipelineAction::Emit => {
154 self.events_emitted.fetch_add(1, Ordering::Relaxed);
155 }
156 PipelineAction::Drop => {
157 self.events_dropped.fetch_add(1, Ordering::Relaxed);
158 }
159 PipelineAction::Error => {}
160 }
161 self.total_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
162 }
163}
164
165impl Default for PipelineStats {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171pub struct CompiledPipeline {
173 pub id: PipelineId,
175 func: PipelineFn,
177 pub input_schema: Arc<RowSchema>,
179 pub output_schema: Arc<RowSchema>,
181 pub stats: PipelineStats,
183}
184
185impl CompiledPipeline {
186 pub fn new(
188 id: PipelineId,
189 func: PipelineFn,
190 input_schema: Arc<RowSchema>,
191 output_schema: Arc<RowSchema>,
192 ) -> Self {
193 Self {
194 id,
195 func,
196 input_schema,
197 output_schema,
198 stats: PipelineStats::new(),
199 }
200 }
201
202 #[inline]
209 pub unsafe fn execute(&self, input: *const u8, output: *mut u8) -> PipelineAction {
210 let result = (self.func)(input, output);
211 PipelineAction::from_u8(result)
212 }
213
214 #[must_use]
216 pub fn func(&self) -> PipelineFn {
217 self.func
218 }
219}
220
221unsafe impl Send for CompiledPipeline {}
223unsafe impl Sync for CompiledPipeline {}
224
225impl std::fmt::Debug for CompiledPipeline {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 f.debug_struct("CompiledPipeline")
228 .field("id", &self.id)
229 .field("func", &"<native fn>")
230 .finish_non_exhaustive()
231 }
232}
233
234#[cfg(test)]
235#[allow(clippy::items_after_statements, clippy::useless_vec)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn pipeline_action_repr() {
241 assert_eq!(PipelineAction::Drop as u8, 0);
242 assert_eq!(PipelineAction::Emit as u8, 1);
243 assert_eq!(PipelineAction::Error as u8, 2);
244 }
245
246 #[test]
247 fn pipeline_action_from_u8() {
248 assert_eq!(PipelineAction::from_u8(0), PipelineAction::Drop);
249 assert_eq!(PipelineAction::from_u8(1), PipelineAction::Emit);
250 assert_eq!(PipelineAction::from_u8(2), PipelineAction::Error);
251 assert_eq!(PipelineAction::from_u8(255), PipelineAction::Error);
252 }
253
254 #[test]
255 fn pipeline_id_equality() {
256 assert_eq!(PipelineId(0), PipelineId(0));
257 assert_ne!(PipelineId(0), PipelineId(1));
258 }
259
260 #[test]
261 fn pipeline_stats_record() {
262 let stats = PipelineStats::new();
263 stats.record(PipelineAction::Emit, 100);
264 stats.record(PipelineAction::Drop, 50);
265 stats.record(PipelineAction::Emit, 80);
266
267 assert_eq!(stats.events_processed.load(Ordering::Relaxed), 3);
268 assert_eq!(stats.events_emitted.load(Ordering::Relaxed), 2);
269 assert_eq!(stats.events_dropped.load(Ordering::Relaxed), 1);
270 assert_eq!(stats.total_ns.load(Ordering::Relaxed), 230);
271 }
272
273 #[test]
274 fn compiled_pipeline_execute() {
275 use arrow_schema::{DataType, Field, Schema};
276
277 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
278 let row_schema = Arc::new(RowSchema::from_arrow(&schema).unwrap());
279
280 unsafe extern "C" fn always_emit(_input: *const u8, _output: *mut u8) -> u8 {
282 1
283 }
284
285 let compiled = CompiledPipeline::new(
286 PipelineId(0),
287 always_emit,
288 Arc::clone(&row_schema),
289 Arc::clone(&row_schema),
290 );
291
292 let input = vec![0u8; row_schema.min_row_size()];
293 let mut output = vec![0u8; row_schema.min_row_size()];
294 let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
295 assert_eq!(action, PipelineAction::Emit);
296 }
297
298 #[test]
299 fn compiled_pipeline_drop_action() {
300 use arrow_schema::{DataType, Field, Schema};
301
302 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
303 let row_schema = Arc::new(RowSchema::from_arrow(&schema).unwrap());
304
305 unsafe extern "C" fn always_drop(_input: *const u8, _output: *mut u8) -> u8 {
306 0
307 }
308
309 let compiled = CompiledPipeline::new(
310 PipelineId(1),
311 always_drop,
312 Arc::clone(&row_schema),
313 row_schema,
314 );
315
316 let input = vec![0u8; 64];
317 let mut output = vec![0u8; 64];
318 let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
319 assert_eq!(action, PipelineAction::Drop);
320 }
321
322 #[test]
323 fn pipeline_debug_impl() {
324 use arrow_schema::{DataType, Field, Schema};
325
326 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
327 let row_schema = Arc::new(RowSchema::from_arrow(&schema).unwrap());
328
329 unsafe extern "C" fn noop(_: *const u8, _: *mut u8) -> u8 {
330 0
331 }
332
333 let compiled =
334 CompiledPipeline::new(PipelineId(42), noop, Arc::clone(&row_schema), row_schema);
335 let dbg = format!("{compiled:?}");
336 assert!(dbg.contains("42"));
337 assert!(dbg.contains("native fn"));
338 }
339}