1use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output};
31use crate::state::{AHashMapStore, StateStore};
32use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
33
34pub trait Sink: Send {
36 fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError>;
45
46 fn flush(&mut self) -> Result<(), SinkError>;
52}
53
54#[derive(Debug, thiserror::Error)]
56pub enum SinkError {
57 #[error("Write failed: {0}")]
59 WriteFailed(String),
60
61 #[error("Flush failed: {0}")]
63 FlushFailed(String),
64
65 #[error("Sink is closed")]
67 Closed,
68}
69
70#[derive(Debug, Clone)]
72pub struct ReactorConfig {
73 pub batch_size: usize,
75 pub cpu_affinity: Option<usize>,
77 pub max_iteration_time: Duration,
79 pub event_buffer_size: usize,
81 pub max_out_of_orderness: i64,
83}
84
85impl Default for ReactorConfig {
86 fn default() -> Self {
87 Self {
88 batch_size: 1024,
89 cpu_affinity: None,
90 max_iteration_time: Duration::from_micros(5),
91 event_buffer_size: 65536,
92 max_out_of_orderness: 1000, }
94 }
95}
96
97pub struct Reactor {
99 config: ReactorConfig,
100 operators: Vec<Box<dyn Operator>>,
101 timer_service: TimerService,
102 event_queue: VecDeque<Event>,
103 output_buffer: Vec<Output>,
104 state_store: Box<dyn StateStore>,
105 watermark_generator: Box<dyn WatermarkGenerator>,
106 current_event_time: i64,
107 start_time: Instant,
108 events_processed: u64,
109 operator_buffer_1: Vec<Output>,
112 operator_buffer_2: Vec<Output>,
113 sink: Option<Box<dyn Sink>>,
115 shutdown: Arc<AtomicBool>,
117}
118
119impl Reactor {
120 pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
126 let event_queue = VecDeque::with_capacity(config.event_buffer_size);
127 let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
128 config.max_out_of_orderness,
129 ));
130
131 Ok(Self {
132 config,
133 operators: Vec::new(),
134 timer_service: TimerService::new(),
135 event_queue,
136 output_buffer: Vec::with_capacity(1024),
137 state_store: Box::new(AHashMapStore::new()),
138 watermark_generator,
139 current_event_time: 0,
140 start_time: Instant::now(),
141 events_processed: 0,
142 operator_buffer_1: Vec::with_capacity(256),
143 operator_buffer_2: Vec::with_capacity(256),
144 sink: None,
145 shutdown: Arc::new(AtomicBool::new(false)),
146 })
147 }
148
149 pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
151 self.operators.push(operator);
152 }
153
154 pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
156 self.sink = Some(sink);
157 }
158
159 #[must_use]
161 pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
162 Arc::clone(&self.shutdown)
163 }
164
165 pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
171 if self.event_queue.len() >= self.config.event_buffer_size {
172 return Err(ReactorError::QueueFull {
173 capacity: self.config.event_buffer_size,
174 });
175 }
176
177 self.event_queue.push_back(event);
178 Ok(())
179 }
180
181 pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
187 let available = self.config.event_buffer_size - self.event_queue.len();
188 if events.len() > available {
189 return Err(ReactorError::QueueFull {
190 capacity: self.config.event_buffer_size,
191 });
192 }
193
194 self.event_queue.extend(events);
195 Ok(())
196 }
197
198 fn process_events(&mut self) {
203 let _guard = HotPathGuard::enter("Reactor::poll");
205
206 let _iteration_budget = TaskBudget::ring0_iteration();
208
209 let poll_start = Instant::now();
210 let processing_time = self.get_processing_time();
211
212 let fired_timers = self.timer_service.poll_timers(self.current_event_time);
214 for mut timer in fired_timers {
215 if let Some(idx) = timer.operator_index {
216 if let Some(operator) = self.operators.get_mut(idx) {
218 let timer_key = timer.key.take().unwrap_or_default();
219 let timer_for_operator = crate::operator::Timer {
220 key: timer_key,
221 timestamp: timer.timestamp,
222 };
223
224 let mut ctx = OperatorContext {
225 event_time: self.current_event_time,
226 processing_time,
227 timers: &mut self.timer_service,
228 state: self.state_store.as_mut(),
229 watermark_generator: self.watermark_generator.as_mut(),
230 operator_index: idx,
231 };
232
233 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
234 self.output_buffer.extend(outputs);
235 }
236 } else {
237 for (idx, operator) in self.operators.iter_mut().enumerate() {
239 let timer_key = timer.key.take().unwrap_or_default();
241 let timer_for_operator = crate::operator::Timer {
242 key: timer_key,
243 timestamp: timer.timestamp,
244 };
245
246 let mut ctx = OperatorContext {
247 event_time: self.current_event_time,
248 processing_time,
249 timers: &mut self.timer_service,
250 state: self.state_store.as_mut(),
251 watermark_generator: self.watermark_generator.as_mut(),
252 operator_index: idx,
253 };
254
255 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
256 self.output_buffer.extend(outputs);
257 }
258 }
259 }
260
261 let mut events_in_batch = 0;
263 while let Some(event) = self.event_queue.pop_front() {
264 if event.timestamp > self.current_event_time {
266 self.current_event_time = event.timestamp;
267 }
268
269 if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
271 self.output_buffer
272 .push(Output::Watermark(watermark.timestamp()));
273 }
274
275 self.operator_buffer_1.clear();
278 self.operator_buffer_1.push(Output::Event(event));
279
280 let mut current_buffer_is_1 = true;
281
282 for (idx, operator) in self.operators.iter_mut().enumerate() {
283 let (current_buffer, next_buffer) = if current_buffer_is_1 {
285 (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
286 } else {
287 (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
288 };
289
290 next_buffer.clear();
291
292 for output in current_buffer.drain(..) {
293 if let Output::Event(event) = output {
294 let mut ctx = OperatorContext {
295 event_time: self.current_event_time,
296 processing_time,
297 timers: &mut self.timer_service,
298 state: self.state_store.as_mut(),
299 watermark_generator: self.watermark_generator.as_mut(),
300 operator_index: idx,
301 };
302
303 let operator_outputs = operator.process(&event, &mut ctx);
304 next_buffer.extend(operator_outputs);
305 } else {
306 next_buffer.push(output);
308 }
309 }
310
311 current_buffer_is_1 = !current_buffer_is_1;
313 }
314
315 let final_buffer = if current_buffer_is_1 {
317 &mut self.operator_buffer_1
318 } else {
319 &mut self.operator_buffer_2
320 };
321 self.output_buffer.append(final_buffer);
322 self.events_processed += 1;
323 events_in_batch += 1;
324
325 if events_in_batch >= self.config.batch_size {
327 break;
328 }
329
330 if poll_start.elapsed() >= self.config.max_iteration_time {
332 break;
333 }
334 }
335 }
336
337 pub fn poll_into(&mut self, output: &mut Vec<Output>) {
342 self.process_events();
343 output.append(&mut self.output_buffer);
344 }
345
346 pub fn advance_watermark(&mut self, timestamp: i64) {
352 if timestamp > self.current_event_time {
354 self.current_event_time = timestamp;
355 }
356
357 if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
359 self.output_buffer
360 .push(Output::Watermark(watermark.timestamp()));
361 }
362 }
363
364 pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
370 self.operators.iter().map(|op| op.checkpoint()).collect()
371 }
372
373 pub fn trigger_checkpoint_into(&mut self, buf: &mut Vec<OperatorState>) {
379 buf.clear();
380 buf.extend(self.operators.iter().map(|op| op.checkpoint()));
381 }
382
383 #[allow(clippy::cast_possible_truncation)] fn get_processing_time(&self) -> i64 {
386 let micros = self.start_time.elapsed().as_micros();
388 if micros > i64::MAX as u128 {
389 i64::MAX
390 } else {
391 micros as i64
392 }
393 }
394
395 #[must_use]
397 pub fn events_processed(&self) -> u64 {
398 self.events_processed
399 }
400
401 #[must_use]
403 pub fn queue_size(&self) -> usize {
404 self.event_queue.len()
405 }
406
407 #[allow(unused_variables)]
413 pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
414 if let Some(cpu_id) = self.config.cpu_affinity {
415 #[cfg(target_os = "linux")]
416 {
417 use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
418 use std::mem;
419
420 #[allow(unsafe_code)]
424 unsafe {
425 let mut set: cpu_set_t = mem::zeroed();
426 CPU_ZERO(&mut set);
427 CPU_SET(cpu_id, &mut set);
428
429 let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
430 if result != 0 {
431 return Err(ReactorError::InitializationFailed(format!(
432 "Failed to set CPU affinity to core {}: {}",
433 cpu_id,
434 std::io::Error::last_os_error()
435 )));
436 }
437 }
438 }
439
440 #[cfg(target_os = "windows")]
441 {
442 use windows_sys::Win32::System::Threading::{
443 GetCurrentThread, SetThreadAffinityMask,
444 };
445
446 #[allow(unsafe_code)]
450 unsafe {
451 let mask: usize = 1 << cpu_id;
452 let result = SetThreadAffinityMask(GetCurrentThread(), mask);
453 if result == 0 {
454 return Err(ReactorError::InitializationFailed(format!(
455 "Failed to set CPU affinity to core {}: {}",
456 cpu_id,
457 std::io::Error::last_os_error()
458 )));
459 }
460 }
461 }
462
463 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
464 {
465 tracing::warn!("CPU affinity is not implemented for this platform");
466 }
467 }
468 Ok(())
469 }
470
471 pub fn run(&mut self) -> Result<(), ReactorError> {
477 self.set_cpu_affinity()?;
478
479 while !self.shutdown.load(Ordering::Relaxed) {
480 self.process_events();
482
483 if !self.output_buffer.is_empty() {
486 if let Some(sink) = &mut self.sink {
487 if let Err(e) = sink.write(&mut self.output_buffer) {
488 tracing::error!("Failed to write to sink: {e}");
489 }
491 }
492 self.output_buffer.clear();
493 if self.output_buffer.capacity() > 65536 {
494 self.output_buffer.shrink_to(4096);
495 }
496 }
497
498 if self.event_queue.is_empty() {
503 std::hint::spin_loop();
504 }
505
506 if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
508 break;
509 }
510 }
511
512 if let Some(sink) = &mut self.sink {
514 if let Err(e) = sink.flush() {
515 tracing::error!("Failed to flush sink during shutdown: {e}");
516 }
517 }
518
519 Ok(())
520 }
521
522 pub fn shutdown(&mut self) -> Result<(), ReactorError> {
528 self.shutdown.store(true, Ordering::Relaxed);
530
531 while !self.event_queue.is_empty() {
533 self.process_events();
534
535 if !self.output_buffer.is_empty() {
536 if let Some(sink) = &mut self.sink {
537 if let Err(e) = sink.write(&mut self.output_buffer) {
538 tracing::error!("Failed to write final outputs during shutdown: {e}");
539 }
540 }
541 self.output_buffer.clear();
542 }
543 }
544
545 if let Some(sink) = &mut self.sink {
547 if let Err(e) = sink.flush() {
548 tracing::error!("Failed to flush sink during shutdown: {e}");
549 }
550 }
551
552 Ok(())
553 }
554}
555
556#[derive(Debug, thiserror::Error)]
558pub enum ReactorError {
559 #[error("Initialization failed: {0}")]
561 InitializationFailed(String),
562
563 #[error("Event processing failed: {0}")]
565 EventProcessingFailed(String),
566
567 #[error("Shutdown failed: {0}")]
569 ShutdownFailed(String),
570
571 #[error("Event queue full (capacity: {capacity})")]
573 QueueFull {
574 capacity: usize,
576 },
577}
578
579#[cfg(test)]
580use crate::operator::{CheckpointCompleteData, SideOutputData};
581
582#[cfg(test)]
583pub struct StdoutSink;
585
586#[cfg(test)]
587impl Sink for StdoutSink {
588 fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
589 for output in outputs.drain(..) {
590 match output {
591 Output::Event(event) => {
592 println!(
593 "Event: timestamp={}, data={:?}",
594 event.timestamp, event.data
595 );
596 }
597 Output::Watermark(timestamp) => {
598 println!("Watermark: {timestamp}");
599 }
600 Output::LateEvent(event) => {
601 println!(
602 "Late Event (dropped): timestamp={}, data={:?}",
603 event.timestamp, event.data
604 );
605 }
606 Output::SideOutput(side_output) => {
607 let SideOutputData { name, event } = *side_output;
608 println!(
609 "Side Output [{}]: timestamp={}, data={:?}",
610 name, event.timestamp, event.data
611 );
612 }
613 Output::Changelog(record) => {
614 println!(
615 "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
616 record.operation,
617 record.weight,
618 record.emit_timestamp,
619 record.event.timestamp,
620 record.event.data
621 );
622 }
623 Output::CheckpointComplete(data) => {
624 let CheckpointCompleteData {
625 checkpoint_id,
626 operator_states,
627 } = *data;
628 println!(
629 "Checkpoint: id={checkpoint_id}, operators={}",
630 operator_states.len()
631 );
632 }
633 Output::Barrier(barrier) => {
634 println!(
635 "Barrier: checkpoint_id={}, epoch={}",
636 barrier.checkpoint_id, barrier.epoch
637 );
638 }
639 }
640 }
641 Ok(())
642 }
643
644 fn flush(&mut self) -> Result<(), SinkError> {
645 Ok(())
646 }
647}
648
649#[cfg(test)]
650#[derive(Default)]
652pub struct BufferingSink {
653 buffer: Vec<Output>,
654}
655
656#[cfg(test)]
657impl BufferingSink {
658 #[must_use]
660 pub fn new() -> Self {
661 Self::default()
662 }
663
664 #[must_use]
666 pub fn take_buffer(&mut self) -> Vec<Output> {
667 std::mem::take(&mut self.buffer)
668 }
669}
670
671#[cfg(test)]
672impl Sink for BufferingSink {
673 fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
674 self.buffer.append(outputs);
675 Ok(())
676 }
677
678 fn flush(&mut self) -> Result<(), SinkError> {
679 Ok(())
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use super::*;
686 use crate::operator::OutputVec;
687 use arrow_array::{Int64Array, RecordBatch};
688 use std::sync::Arc;
689
690 struct PassthroughOperator;
692
693 impl Operator for PassthroughOperator {
694 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
695 let mut output = OutputVec::new();
696 output.push(Output::Event(event.clone()));
697 output
698 }
699
700 fn on_timer(
701 &mut self,
702 _timer: crate::operator::Timer,
703 _ctx: &mut OperatorContext,
704 ) -> OutputVec {
705 OutputVec::new()
706 }
707
708 fn checkpoint(&self) -> crate::operator::OperatorState {
709 crate::operator::OperatorState {
710 operator_id: "passthrough".to_string(),
711 data: vec![],
712 }
713 }
714
715 fn restore(
716 &mut self,
717 _state: crate::operator::OperatorState,
718 ) -> Result<(), crate::operator::OperatorError> {
719 Ok(())
720 }
721 }
722
723 #[test]
724 fn test_default_config() {
725 let config = ReactorConfig::default();
726 assert_eq!(config.batch_size, 1024);
727 assert_eq!(config.event_buffer_size, 65536);
728 }
729
730 #[test]
731 fn test_reactor_creation() {
732 let config = ReactorConfig::default();
733 let reactor = Reactor::new(config);
734 assert!(reactor.is_ok());
735 }
736
737 #[test]
738 fn test_reactor_add_operator() {
739 let config = ReactorConfig::default();
740 let mut reactor = Reactor::new(config).unwrap();
741
742 let operator = Box::new(PassthroughOperator);
743 reactor.add_operator(operator);
744
745 assert_eq!(reactor.operators.len(), 1);
746 }
747
748 #[test]
749 fn test_reactor_submit_event() {
750 let config = ReactorConfig::default();
751 let mut reactor = Reactor::new(config).unwrap();
752
753 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
754 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
755 let event = Event::new(12345, batch);
756
757 assert!(reactor.submit(event).is_ok());
758 assert_eq!(reactor.queue_size(), 1);
759 }
760
761 #[test]
762 fn test_reactor_poll_processes_events() {
763 let config = ReactorConfig::default();
764 let mut reactor = Reactor::new(config).unwrap();
765
766 reactor.add_operator(Box::new(PassthroughOperator));
768
769 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
771 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
772 let event = Event::new(12345, batch);
773
774 reactor.submit(event.clone()).unwrap();
775
776 let mut outputs = Vec::new();
778 reactor.poll_into(&mut outputs);
779 assert!(!outputs.is_empty());
780 assert_eq!(reactor.events_processed(), 1);
781 assert_eq!(reactor.queue_size(), 0);
782 }
783
784 #[test]
785 fn test_reactor_queue_full() {
786 let config = ReactorConfig {
787 event_buffer_size: 2, ..ReactorConfig::default()
789 };
790 let mut reactor = Reactor::new(config).unwrap();
791
792 let array = Arc::new(Int64Array::from(vec![1]));
793 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
794
795 for i in 0..2 {
797 let event = Event::new(i64::from(i), batch.clone());
798 assert!(reactor.submit(event).is_ok());
799 }
800
801 let event = Event::new(100, batch);
803 assert!(matches!(
804 reactor.submit(event),
805 Err(ReactorError::QueueFull { .. })
806 ));
807 }
808
809 #[test]
810 fn test_reactor_batch_processing() {
811 let config = ReactorConfig {
812 batch_size: 2, max_iteration_time: Duration::from_secs(1), ..ReactorConfig::default()
815 };
816 let mut reactor = Reactor::new(config).unwrap();
817
818 reactor.add_operator(Box::new(PassthroughOperator));
819
820 let array = Arc::new(Int64Array::from(vec![1]));
821 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
822
823 for i in 0..5 {
825 let event = Event::new(i64::from(i), batch.clone());
826 reactor.submit(event).unwrap();
827 }
828
829 let mut buf = Vec::new();
830 reactor.poll_into(&mut buf);
832 assert_eq!(reactor.events_processed(), 2);
833 assert_eq!(reactor.queue_size(), 3);
834
835 buf.clear();
837 reactor.poll_into(&mut buf);
838 assert_eq!(reactor.events_processed(), 4);
839 assert_eq!(reactor.queue_size(), 1);
840
841 buf.clear();
843 reactor.poll_into(&mut buf);
844 assert_eq!(reactor.events_processed(), 5);
845 assert_eq!(reactor.queue_size(), 0);
846 }
847
848 #[test]
849 fn test_reactor_with_sink() {
850 let config = ReactorConfig::default();
851 let mut reactor = Reactor::new(config).unwrap();
852
853 let sink = Box::new(BufferingSink::new());
855 reactor.set_sink(sink);
856
857 reactor.add_operator(Box::new(PassthroughOperator));
859
860 let array = Arc::new(Int64Array::from(vec![42]));
861 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
862 let event = Event::new(1000, batch);
863
864 reactor.submit(event).unwrap();
866
867 let mut outputs = Vec::new();
869 reactor.poll_into(&mut outputs);
870 assert!(!outputs.is_empty());
872
873 assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
875 }
876
877 #[test]
878 fn test_reactor_shutdown() {
879 let config = ReactorConfig::default();
880 let mut reactor = Reactor::new(config).unwrap();
881
882 let shutdown_handle = reactor.shutdown_handle();
884 assert!(!shutdown_handle.load(Ordering::Relaxed));
885
886 let array = Arc::new(Int64Array::from(vec![1]));
887 let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
888
889 for i in 0..5 {
891 reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
892 }
893
894 reactor.shutdown().unwrap();
896 assert!(shutdown_handle.load(Ordering::Relaxed));
897 assert_eq!(reactor.queue_size(), 0);
898 }
899}