laminar_core/compiler/
policy.rs1use std::time::Duration;
8
9#[derive(Debug, Clone)]
18pub struct BatchPolicy {
19 pub max_rows: usize,
21 pub max_latency: Duration,
23 pub flush_on_watermark: bool,
29}
30
31impl Default for BatchPolicy {
32 fn default() -> Self {
33 Self {
34 max_rows: 1024,
35 max_latency: Duration::from_millis(10),
36 flush_on_watermark: true,
37 }
38 }
39}
40
41impl BatchPolicy {
42 #[must_use]
44 pub const fn with_max_rows(mut self, max_rows: usize) -> Self {
45 self.max_rows = max_rows;
46 self
47 }
48
49 #[must_use]
51 pub const fn with_max_latency(mut self, max_latency: Duration) -> Self {
52 self.max_latency = max_latency;
53 self
54 }
55
56 #[must_use]
58 pub const fn with_flush_on_watermark(mut self, flush_on_watermark: bool) -> Self {
59 self.flush_on_watermark = flush_on_watermark;
60 self
61 }
62}
63
64#[derive(Debug, Clone, Default)]
66pub enum BackpressureStrategy {
67 #[default]
69 DropNewest,
70 PauseSource,
72 SpillToDisk {
74 max_spill_bytes: usize,
76 },
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82
83 #[test]
84 fn batch_policy_defaults() {
85 let policy = BatchPolicy::default();
86 assert_eq!(policy.max_rows, 1024);
87 assert_eq!(policy.max_latency, Duration::from_millis(10));
88 assert!(policy.flush_on_watermark);
89 }
90
91 #[test]
92 fn batch_policy_builder() {
93 let policy = BatchPolicy::default()
94 .with_max_rows(512)
95 .with_max_latency(Duration::from_millis(5))
96 .with_flush_on_watermark(false);
97
98 assert_eq!(policy.max_rows, 512);
99 assert_eq!(policy.max_latency, Duration::from_millis(5));
100 assert!(!policy.flush_on_watermark);
101 }
102
103 #[test]
104 fn backpressure_strategy_default() {
105 let strategy = BackpressureStrategy::default();
106 assert!(matches!(strategy, BackpressureStrategy::DropNewest));
107 }
108
109 #[test]
110 fn backpressure_strategy_spill() {
111 let strategy = BackpressureStrategy::SpillToDisk {
112 max_spill_bytes: 1024 * 1024,
113 };
114 match strategy {
115 BackpressureStrategy::SpillToDisk { max_spill_bytes } => {
116 assert_eq!(max_spill_bytes, 1024 * 1024);
117 }
118 _ => panic!("expected SpillToDisk"),
119 }
120 }
121
122 #[test]
123 fn batch_policy_debug() {
124 let policy = BatchPolicy::default();
125 let debug = format!("{policy:?}");
126 assert!(debug.contains("BatchPolicy"));
127 assert!(debug.contains("max_rows"));
128 assert!(debug.contains("1024"));
129 }
130
131 #[test]
132 fn batch_policy_clone() {
133 let original = BatchPolicy::default().with_max_rows(256);
134 let cloned = original.clone();
135 assert_eq!(cloned.max_rows, 256);
136 assert_eq!(cloned.max_latency, original.max_latency);
137 assert_eq!(cloned.flush_on_watermark, original.flush_on_watermark);
138 }
139}