Skip to main content

laminar_core/reactor/
mod.rs

1//! # Reactor Module
2//!
3//! The core event loop for `LaminarDB`, implementing a single-threaded reactor pattern
4//! optimized for streaming workloads.
5//!
6//! ## Design Goals
7//!
8//! - **Zero allocations** during event processing
9//! - **CPU-pinned** execution for cache locality
10//! - **Lock-free** communication with other threads
11//! - **500K+ events/sec** per core throughput
12//!
13//! ## Architecture
14//!
15//! The reactor runs a tight event loop that:
16//! 1. Polls input sources for events
17//! 2. Routes events to operators
18//! 3. Manages operator state
19//! 4. Emits results to sinks
20//!
21//! Communication with Ring 1 (background tasks) happens via SPSC queues.
22
23use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output};
31use crate::state::{AHashMapStore, StateStore};
32use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
33
34/// Trait for output sinks that consume reactor outputs.
35pub trait Sink: Send {
36    /// Write outputs to the sink.
37    ///
38    /// The sink should drain items from the provided vector, leaving it empty
39    /// but preserving capacity so the caller can reuse the allocation.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the sink cannot accept the outputs.
44    fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError>;
45
46    /// Flush any buffered data.
47    ///
48    /// # Errors
49    ///
50    /// Returns an error if the flush operation fails.
51    fn flush(&mut self) -> Result<(), SinkError>;
52}
53
54/// Errors that can occur in sinks.
55#[derive(Debug, thiserror::Error)]
56pub enum SinkError {
57    /// Failed to write to sink
58    #[error("Write failed: {0}")]
59    WriteFailed(String),
60
61    /// Failed to flush sink
62    #[error("Flush failed: {0}")]
63    FlushFailed(String),
64
65    /// Sink is closed
66    #[error("Sink is closed")]
67    Closed,
68}
69
70/// Configuration for the reactor
71#[derive(Debug, Clone)]
72pub struct ReactorConfig {
73    /// Maximum events to process per poll
74    pub batch_size: usize,
75    /// CPU core to pin the reactor thread to (None = no pinning)
76    pub cpu_affinity: Option<usize>,
77    /// Maximum time to spend in one iteration
78    pub max_iteration_time: Duration,
79    /// Size of the event buffer
80    pub event_buffer_size: usize,
81    /// Maximum out-of-orderness for watermark generation (milliseconds)
82    pub max_out_of_orderness: i64,
83}
84
85impl Default for ReactorConfig {
86    fn default() -> Self {
87        Self {
88            batch_size: 1024,
89            cpu_affinity: None,
90            max_iteration_time: Duration::from_micros(5),
91            event_buffer_size: 65536,
92            max_out_of_orderness: 1000, // 1 second
93        }
94    }
95}
96
97/// The main reactor for event processing
98pub struct Reactor {
99    config: ReactorConfig,
100    operators: Vec<Box<dyn Operator>>,
101    timer_service: TimerService,
102    event_queue: VecDeque<Event>,
103    output_buffer: Vec<Output>,
104    state_store: Box<dyn StateStore>,
105    watermark_generator: Box<dyn WatermarkGenerator>,
106    current_event_time: i64,
107    start_time: Instant,
108    events_processed: u64,
109    /// Pre-allocated buffers for operator chain processing
110    /// We use two buffers and swap between them to avoid allocations
111    operator_buffer_1: Vec<Output>,
112    operator_buffer_2: Vec<Output>,
113    /// Optional sink for outputs
114    sink: Option<Box<dyn Sink>>,
115    /// Shutdown flag for graceful termination
116    shutdown: Arc<AtomicBool>,
117}
118
119impl Reactor {
120    /// Creates a new reactor with the given configuration
121    ///
122    /// # Errors
123    ///
124    /// Currently does not return any errors, but may in the future if initialization fails
125    pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
126        let event_queue = VecDeque::with_capacity(config.event_buffer_size);
127        let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
128            config.max_out_of_orderness,
129        ));
130
131        Ok(Self {
132            config,
133            operators: Vec::new(),
134            timer_service: TimerService::new(),
135            event_queue,
136            output_buffer: Vec::with_capacity(1024),
137            state_store: Box::new(AHashMapStore::new()),
138            watermark_generator,
139            current_event_time: 0,
140            start_time: Instant::now(),
141            events_processed: 0,
142            operator_buffer_1: Vec::with_capacity(256),
143            operator_buffer_2: Vec::with_capacity(256),
144            sink: None,
145            shutdown: Arc::new(AtomicBool::new(false)),
146        })
147    }
148
149    /// Register an operator in the processing chain
150    pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
151        self.operators.push(operator);
152    }
153
154    /// Set the output sink for the reactor
155    pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
156        self.sink = Some(sink);
157    }
158
159    /// Get a handle to the shutdown flag
160    #[must_use]
161    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
162        Arc::clone(&self.shutdown)
163    }
164
165    /// Submit an event for processing
166    ///
167    /// # Errors
168    ///
169    /// Returns `ReactorError::QueueFull` if the event queue is at capacity
170    pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
171        if self.event_queue.len() >= self.config.event_buffer_size {
172            return Err(ReactorError::QueueFull {
173                capacity: self.config.event_buffer_size,
174            });
175        }
176
177        self.event_queue.push_back(event);
178        Ok(())
179    }
180
181    /// Submit multiple events for processing
182    ///
183    /// # Errors
184    ///
185    /// Returns `ReactorError::QueueFull` if there's insufficient capacity for all events
186    pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
187        let available = self.config.event_buffer_size - self.event_queue.len();
188        if events.len() > available {
189            return Err(ReactorError::QueueFull {
190                capacity: self.config.event_buffer_size,
191            });
192        }
193
194        self.event_queue.extend(events);
195        Ok(())
196    }
197
198    /// Processes one iteration of the event loop, filling `self.output_buffer`.
199    ///
200    /// Separated from [`poll`] so that [`run`] and [`shutdown`] can access
201    /// `self.output_buffer` directly without allocating.
202    fn process_events(&mut self) {
203        // Hot path guard - will panic on allocation when allocation-tracking is enabled
204        let _guard = HotPathGuard::enter("Reactor::poll");
205
206        // Task budget tracking - records metrics on drop
207        let _iteration_budget = TaskBudget::ring0_iteration();
208
209        let poll_start = Instant::now();
210        let processing_time = self.get_processing_time();
211
212        // 1. Fire expired timers
213        let fired_timers = self.timer_service.poll_timers(self.current_event_time);
214        for mut timer in fired_timers {
215            if let Some(idx) = timer.operator_index {
216                // Route to specific operator
217                if let Some(operator) = self.operators.get_mut(idx) {
218                    let timer_key = timer.key.take().unwrap_or_default();
219                    let timer_for_operator = crate::operator::Timer {
220                        key: timer_key,
221                        timestamp: timer.timestamp,
222                    };
223
224                    let mut ctx = OperatorContext {
225                        event_time: self.current_event_time,
226                        processing_time,
227                        timers: &mut self.timer_service,
228                        state: self.state_store.as_mut(),
229                        watermark_generator: self.watermark_generator.as_mut(),
230                        operator_index: idx,
231                    };
232
233                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
234                    self.output_buffer.extend(outputs);
235                }
236            } else {
237                // Legacy: Broadcast to all operators (warning: creates key contention)
238                for (idx, operator) in self.operators.iter_mut().enumerate() {
239                    // Move key out of timer (only first operator gets it!)
240                    let timer_key = timer.key.take().unwrap_or_default();
241                    let timer_for_operator = crate::operator::Timer {
242                        key: timer_key,
243                        timestamp: timer.timestamp,
244                    };
245
246                    let mut ctx = OperatorContext {
247                        event_time: self.current_event_time,
248                        processing_time,
249                        timers: &mut self.timer_service,
250                        state: self.state_store.as_mut(),
251                        watermark_generator: self.watermark_generator.as_mut(),
252                        operator_index: idx,
253                    };
254
255                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
256                    self.output_buffer.extend(outputs);
257                }
258            }
259        }
260
261        // 2. Process events
262        let mut events_in_batch = 0;
263        while let Some(event) = self.event_queue.pop_front() {
264            // Update current event time
265            if event.timestamp > self.current_event_time {
266                self.current_event_time = event.timestamp;
267            }
268
269            // Generate watermark if needed
270            if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
271                self.output_buffer
272                    .push(Output::Watermark(watermark.timestamp()));
273            }
274
275            // Process through operator chain using pre-allocated buffers
276            // Start with the event in buffer 1
277            self.operator_buffer_1.clear();
278            self.operator_buffer_1.push(Output::Event(event));
279
280            let mut current_buffer_is_1 = true;
281
282            for (idx, operator) in self.operators.iter_mut().enumerate() {
283                // Determine which buffer to read from and which to write to
284                let (current_buffer, next_buffer) = if current_buffer_is_1 {
285                    (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
286                } else {
287                    (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
288                };
289
290                next_buffer.clear();
291
292                for output in current_buffer.drain(..) {
293                    if let Output::Event(event) = output {
294                        let mut ctx = OperatorContext {
295                            event_time: self.current_event_time,
296                            processing_time,
297                            timers: &mut self.timer_service,
298                            state: self.state_store.as_mut(),
299                            watermark_generator: self.watermark_generator.as_mut(),
300                            operator_index: idx,
301                        };
302
303                        let operator_outputs = operator.process(&event, &mut ctx);
304                        next_buffer.extend(operator_outputs);
305                    } else {
306                        // Pass through watermarks and late events
307                        next_buffer.push(output);
308                    }
309                }
310
311                // Swap buffers for next iteration
312                current_buffer_is_1 = !current_buffer_is_1;
313            }
314
315            // Extend output buffer with final results
316            let final_buffer = if current_buffer_is_1 {
317                &mut self.operator_buffer_1
318            } else {
319                &mut self.operator_buffer_2
320            };
321            self.output_buffer.append(final_buffer);
322            self.events_processed += 1;
323            events_in_batch += 1;
324
325            // Check batch size limit
326            if events_in_batch >= self.config.batch_size {
327                break;
328            }
329
330            // Check time limit
331            if poll_start.elapsed() >= self.config.max_iteration_time {
332                break;
333            }
334        }
335    }
336
337    /// Run one iteration, appending outputs to a caller-provided buffer.
338    ///
339    /// The caller can reuse the same `Vec` across iterations, retaining
340    /// its capacity and avoiding per-poll allocation.
341    pub fn poll_into(&mut self, output: &mut Vec<Output>) {
342        self.process_events();
343        output.append(&mut self.output_buffer);
344    }
345
346    /// Advances the watermark to the given timestamp.
347    ///
348    /// Called when an external watermark message arrives (e.g., from TPC coordination).
349    /// Updates the reactor's event time tracking and watermark generator state.
350    /// Any resulting watermark output will be included in the next `poll()` result.
351    pub fn advance_watermark(&mut self, timestamp: i64) {
352        // Update current event time if this watermark is newer
353        if timestamp > self.current_event_time {
354            self.current_event_time = timestamp;
355        }
356
357        // Feed the timestamp to the watermark generator so it can advance
358        if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
359            self.output_buffer
360                .push(Output::Watermark(watermark.timestamp()));
361        }
362    }
363
364    /// Triggers a checkpoint by snapshotting all operator states.
365    ///
366    /// Called when a `CheckpointRequest` arrives from the control plane.
367    /// Collects the serialized state from each operator and returns it
368    /// for persistence by Ring 1.
369    pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
370        self.operators.iter().map(|op| op.checkpoint()).collect()
371    }
372
373    /// Triggers a checkpoint, appending operator states into a reusable buffer.
374    ///
375    /// Like [`trigger_checkpoint`](Self::trigger_checkpoint) but avoids
376    /// allocating a new `Vec` on every call — the caller provides a buffer
377    /// whose capacity is reused across checkpoint cycles.
378    pub fn trigger_checkpoint_into(&mut self, buf: &mut Vec<OperatorState>) {
379        buf.clear();
380        buf.extend(self.operators.iter().map(|op| op.checkpoint()));
381    }
382
383    /// Get current processing time in microseconds since reactor start
384    #[allow(clippy::cast_possible_truncation)] // Saturating conversion handles overflow on next line
385    fn get_processing_time(&self) -> i64 {
386        // Saturating conversion - after ~292 years this will saturate at i64::MAX
387        let micros = self.start_time.elapsed().as_micros();
388        if micros > i64::MAX as u128 {
389            i64::MAX
390        } else {
391            micros as i64
392        }
393    }
394
395    /// Get the number of events processed
396    #[must_use]
397    pub fn events_processed(&self) -> u64 {
398        self.events_processed
399    }
400
401    /// Get the number of events in the queue
402    #[must_use]
403    pub fn queue_size(&self) -> usize {
404        self.event_queue.len()
405    }
406
407    /// Set CPU affinity if configured
408    ///
409    /// # Errors
410    ///
411    /// Returns `ReactorError` if CPU affinity cannot be set (platform-specific)
412    #[allow(unused_variables)]
413    pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
414        if let Some(cpu_id) = self.config.cpu_affinity {
415            #[cfg(target_os = "linux")]
416            {
417                use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
418                use std::mem;
419
420                // SAFETY: We're calling libc functions with valid parameters.
421                // The cpu_set_t is properly initialized with CPU_ZERO.
422                // The process ID 0 refers to the current thread.
423                #[allow(unsafe_code)]
424                unsafe {
425                    let mut set: cpu_set_t = mem::zeroed();
426                    CPU_ZERO(&mut set);
427                    CPU_SET(cpu_id, &mut set);
428
429                    let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
430                    if result != 0 {
431                        return Err(ReactorError::InitializationFailed(format!(
432                            "Failed to set CPU affinity to core {}: {}",
433                            cpu_id,
434                            std::io::Error::last_os_error()
435                        )));
436                    }
437                }
438            }
439
440            #[cfg(target_os = "windows")]
441            {
442                use windows_sys::Win32::System::Threading::{
443                    GetCurrentThread, SetThreadAffinityMask,
444                };
445
446                // SAFETY: We're calling Windows API functions with valid parameters.
447                // GetCurrentThread returns a pseudo-handle that doesn't need to be closed.
448                // The mask is a valid CPU mask for the specified core.
449                #[allow(unsafe_code)]
450                unsafe {
451                    let mask: usize = 1 << cpu_id;
452                    let result = SetThreadAffinityMask(GetCurrentThread(), mask);
453                    if result == 0 {
454                        return Err(ReactorError::InitializationFailed(format!(
455                            "Failed to set CPU affinity to core {}: {}",
456                            cpu_id,
457                            std::io::Error::last_os_error()
458                        )));
459                    }
460                }
461            }
462
463            #[cfg(not(any(target_os = "linux", target_os = "windows")))]
464            {
465                tracing::warn!("CPU affinity is not implemented for this platform");
466            }
467        }
468        Ok(())
469    }
470
471    /// Runs the event loop continuously until shutdown
472    ///
473    /// # Errors
474    ///
475    /// Returns `ReactorError` if CPU affinity cannot be set or if shutdown fails
476    pub fn run(&mut self) -> Result<(), ReactorError> {
477        self.set_cpu_affinity()?;
478
479        while !self.shutdown.load(Ordering::Relaxed) {
480            // Process events into self.output_buffer (zero-alloc)
481            self.process_events();
482
483            // Send outputs to sink if configured.
484            // Sink drains output_buffer, preserving capacity for reuse.
485            if !self.output_buffer.is_empty() {
486                if let Some(sink) = &mut self.sink {
487                    if let Err(e) = sink.write(&mut self.output_buffer) {
488                        tracing::error!("Failed to write to sink: {e}");
489                        // Continue processing even if sink fails
490                    }
491                }
492                self.output_buffer.clear();
493                if self.output_buffer.capacity() > 65536 {
494                    self.output_buffer.shrink_to(4096);
495                }
496            }
497
498            // If no events to process, emit a CPU spin hint (PAUSE on x86,
499            // YIELD on ARM) instead of the heavier yield_now() syscall.
500            // In a thread-per-core design the reactor owns its CPU, so a
501            // lightweight spin hint is preferred over a kernel-mediated yield.
502            if self.event_queue.is_empty() {
503                std::hint::spin_loop();
504            }
505
506            // Periodically check for shutdown signal
507            if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
508                break;
509            }
510        }
511
512        // Flush sink before shutdown
513        if let Some(sink) = &mut self.sink {
514            if let Err(e) = sink.flush() {
515                tracing::error!("Failed to flush sink during shutdown: {e}");
516            }
517        }
518
519        Ok(())
520    }
521
522    /// Stops the reactor gracefully
523    ///
524    /// # Errors
525    ///
526    /// Currently does not return any errors, but may in the future if shutdown fails
527    pub fn shutdown(&mut self) -> Result<(), ReactorError> {
528        // Signal shutdown
529        self.shutdown.store(true, Ordering::Relaxed);
530
531        // Process remaining events (zero-alloc drain)
532        while !self.event_queue.is_empty() {
533            self.process_events();
534
535            if !self.output_buffer.is_empty() {
536                if let Some(sink) = &mut self.sink {
537                    if let Err(e) = sink.write(&mut self.output_buffer) {
538                        tracing::error!("Failed to write final outputs during shutdown: {e}");
539                    }
540                }
541                self.output_buffer.clear();
542            }
543        }
544
545        // Final flush
546        if let Some(sink) = &mut self.sink {
547            if let Err(e) = sink.flush() {
548                tracing::error!("Failed to flush sink during shutdown: {e}");
549            }
550        }
551
552        Ok(())
553    }
554}
555
556/// Errors that can occur in the reactor
557#[derive(Debug, thiserror::Error)]
558pub enum ReactorError {
559    /// Failed to initialize the reactor
560    #[error("Initialization failed: {0}")]
561    InitializationFailed(String),
562
563    /// Event processing error
564    #[error("Event processing failed: {0}")]
565    EventProcessingFailed(String),
566
567    /// Shutdown error
568    #[error("Shutdown failed: {0}")]
569    ShutdownFailed(String),
570
571    /// Event queue is full
572    #[error("Event queue full (capacity: {capacity})")]
573    QueueFull {
574        /// The configured capacity of the event queue
575        capacity: usize,
576    },
577}
578
579#[cfg(test)]
580use crate::operator::{CheckpointCompleteData, SideOutputData};
581
582#[cfg(test)]
583/// A simple sink that writes outputs to stdout (for testing).
584pub struct StdoutSink;
585
586#[cfg(test)]
587impl Sink for StdoutSink {
588    fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
589        for output in outputs.drain(..) {
590            match output {
591                Output::Event(event) => {
592                    println!(
593                        "Event: timestamp={}, data={:?}",
594                        event.timestamp, event.data
595                    );
596                }
597                Output::Watermark(timestamp) => {
598                    println!("Watermark: {timestamp}");
599                }
600                Output::LateEvent(event) => {
601                    println!(
602                        "Late Event (dropped): timestamp={}, data={:?}",
603                        event.timestamp, event.data
604                    );
605                }
606                Output::SideOutput(side_output) => {
607                    let SideOutputData { name, event } = *side_output;
608                    println!(
609                        "Side Output [{}]: timestamp={}, data={:?}",
610                        name, event.timestamp, event.data
611                    );
612                }
613                Output::Changelog(record) => {
614                    println!(
615                        "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
616                        record.operation,
617                        record.weight,
618                        record.emit_timestamp,
619                        record.event.timestamp,
620                        record.event.data
621                    );
622                }
623                Output::CheckpointComplete(data) => {
624                    let CheckpointCompleteData {
625                        checkpoint_id,
626                        operator_states,
627                    } = *data;
628                    println!(
629                        "Checkpoint: id={checkpoint_id}, operators={}",
630                        operator_states.len()
631                    );
632                }
633                Output::Barrier(barrier) => {
634                    println!(
635                        "Barrier: checkpoint_id={}, epoch={}",
636                        barrier.checkpoint_id, barrier.epoch
637                    );
638                }
639            }
640        }
641        Ok(())
642    }
643
644    fn flush(&mut self) -> Result<(), SinkError> {
645        Ok(())
646    }
647}
648
649#[cfg(test)]
650/// A buffering sink that collects outputs (for testing).
651#[derive(Default)]
652pub struct BufferingSink {
653    buffer: Vec<Output>,
654}
655
656#[cfg(test)]
657impl BufferingSink {
658    /// Create a new buffering sink.
659    #[must_use]
660    pub fn new() -> Self {
661        Self::default()
662    }
663
664    /// Get the buffered outputs.
665    #[must_use]
666    pub fn take_buffer(&mut self) -> Vec<Output> {
667        std::mem::take(&mut self.buffer)
668    }
669}
670
671#[cfg(test)]
672impl Sink for BufferingSink {
673    fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
674        self.buffer.append(outputs);
675        Ok(())
676    }
677
678    fn flush(&mut self) -> Result<(), SinkError> {
679        Ok(())
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use super::*;
686    use crate::operator::OutputVec;
687    use arrow_array::{Int64Array, RecordBatch};
688    use std::sync::Arc;
689
690    // Mock operator for testing
691    struct PassthroughOperator;
692
693    impl Operator for PassthroughOperator {
694        fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
695            let mut output = OutputVec::new();
696            output.push(Output::Event(event.clone()));
697            output
698        }
699
700        fn on_timer(
701            &mut self,
702            _timer: crate::operator::Timer,
703            _ctx: &mut OperatorContext,
704        ) -> OutputVec {
705            OutputVec::new()
706        }
707
708        fn checkpoint(&self) -> crate::operator::OperatorState {
709            crate::operator::OperatorState {
710                operator_id: "passthrough".to_string(),
711                data: vec![],
712            }
713        }
714
715        fn restore(
716            &mut self,
717            _state: crate::operator::OperatorState,
718        ) -> Result<(), crate::operator::OperatorError> {
719            Ok(())
720        }
721    }
722
723    #[test]
724    fn test_default_config() {
725        let config = ReactorConfig::default();
726        assert_eq!(config.batch_size, 1024);
727        assert_eq!(config.event_buffer_size, 65536);
728    }
729
730    #[test]
731    fn test_reactor_creation() {
732        let config = ReactorConfig::default();
733        let reactor = Reactor::new(config);
734        assert!(reactor.is_ok());
735    }
736
737    #[test]
738    fn test_reactor_add_operator() {
739        let config = ReactorConfig::default();
740        let mut reactor = Reactor::new(config).unwrap();
741
742        let operator = Box::new(PassthroughOperator);
743        reactor.add_operator(operator);
744
745        assert_eq!(reactor.operators.len(), 1);
746    }
747
748    #[test]
749    fn test_reactor_submit_event() {
750        let config = ReactorConfig::default();
751        let mut reactor = Reactor::new(config).unwrap();
752
753        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
754        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
755        let event = Event::new(12345, batch);
756
757        assert!(reactor.submit(event).is_ok());
758        assert_eq!(reactor.queue_size(), 1);
759    }
760
761    #[test]
762    fn test_reactor_poll_processes_events() {
763        let config = ReactorConfig::default();
764        let mut reactor = Reactor::new(config).unwrap();
765
766        // Add a passthrough operator
767        reactor.add_operator(Box::new(PassthroughOperator));
768
769        // Submit an event
770        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
771        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
772        let event = Event::new(12345, batch);
773
774        reactor.submit(event.clone()).unwrap();
775
776        // Poll should process the event
777        let mut outputs = Vec::new();
778        reactor.poll_into(&mut outputs);
779        assert!(!outputs.is_empty());
780        assert_eq!(reactor.events_processed(), 1);
781        assert_eq!(reactor.queue_size(), 0);
782    }
783
784    #[test]
785    fn test_reactor_queue_full() {
786        let config = ReactorConfig {
787            event_buffer_size: 2, // Very small buffer
788            ..ReactorConfig::default()
789        };
790        let mut reactor = Reactor::new(config).unwrap();
791
792        let array = Arc::new(Int64Array::from(vec![1]));
793        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
794
795        // Fill the queue
796        for i in 0..2 {
797            let event = Event::new(i64::from(i), batch.clone());
798            assert!(reactor.submit(event).is_ok());
799        }
800
801        // Next submit should fail
802        let event = Event::new(100, batch);
803        assert!(matches!(
804            reactor.submit(event),
805            Err(ReactorError::QueueFull { .. })
806        ));
807    }
808
809    #[test]
810    fn test_reactor_batch_processing() {
811        let config = ReactorConfig {
812            batch_size: 2,                              // Small batch size
813            max_iteration_time: Duration::from_secs(1), // time-limit not under test
814            ..ReactorConfig::default()
815        };
816        let mut reactor = Reactor::new(config).unwrap();
817
818        reactor.add_operator(Box::new(PassthroughOperator));
819
820        let array = Arc::new(Int64Array::from(vec![1]));
821        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
822
823        // Submit 5 events
824        for i in 0..5 {
825            let event = Event::new(i64::from(i), batch.clone());
826            reactor.submit(event).unwrap();
827        }
828
829        let mut buf = Vec::new();
830        // First poll should process only batch_size events
831        reactor.poll_into(&mut buf);
832        assert_eq!(reactor.events_processed(), 2);
833        assert_eq!(reactor.queue_size(), 3);
834
835        // Second poll should process 2 more
836        buf.clear();
837        reactor.poll_into(&mut buf);
838        assert_eq!(reactor.events_processed(), 4);
839        assert_eq!(reactor.queue_size(), 1);
840
841        // Third poll should process the last one
842        buf.clear();
843        reactor.poll_into(&mut buf);
844        assert_eq!(reactor.events_processed(), 5);
845        assert_eq!(reactor.queue_size(), 0);
846    }
847
848    #[test]
849    fn test_reactor_with_sink() {
850        let config = ReactorConfig::default();
851        let mut reactor = Reactor::new(config).unwrap();
852
853        // Add a buffering sink
854        let sink = Box::new(BufferingSink::new());
855        reactor.set_sink(sink);
856
857        // Add passthrough operator
858        reactor.add_operator(Box::new(PassthroughOperator));
859
860        let array = Arc::new(Int64Array::from(vec![42]));
861        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
862        let event = Event::new(1000, batch);
863
864        // Submit an event
865        reactor.submit(event).unwrap();
866
867        // Process
868        let mut outputs = Vec::new();
869        reactor.poll_into(&mut outputs);
870        // Should get the event output (and possibly a watermark)
871        assert!(!outputs.is_empty());
872
873        // Verify we got the event
874        assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
875    }
876
877    #[test]
878    fn test_reactor_shutdown() {
879        let config = ReactorConfig::default();
880        let mut reactor = Reactor::new(config).unwrap();
881
882        // Get shutdown handle
883        let shutdown_handle = reactor.shutdown_handle();
884        assert!(!shutdown_handle.load(Ordering::Relaxed));
885
886        let array = Arc::new(Int64Array::from(vec![1]));
887        let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
888
889        // Submit some events
890        for i in 0..5 {
891            reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
892        }
893
894        // Shutdown should process remaining events
895        reactor.shutdown().unwrap();
896        assert!(shutdown_handle.load(Ordering::Relaxed));
897        assert_eq!(reactor.queue_size(), 0);
898    }
899}