laminar_connectors/kafka/
partitioner.rs1pub trait KafkaPartitioner: Send + Sync {
14 fn partition(&mut self, key: Option<&[u8]>, num_partitions: i32) -> Option<i32>;
19
20 fn reset(&mut self);
22}
23
24#[derive(Debug, Default)]
29pub struct KeyHashPartitioner;
30
31impl KeyHashPartitioner {
32 #[must_use]
34 pub fn new() -> Self {
35 Self
36 }
37}
38
39impl KafkaPartitioner for KeyHashPartitioner {
40 fn partition(&mut self, key: Option<&[u8]>, num_partitions: i32) -> Option<i32> {
41 key.map(|k| {
42 let hash = murmur2(k) & 0x7fff_ffff;
43 #[allow(
44 clippy::cast_possible_truncation,
45 clippy::cast_possible_wrap,
46 clippy::cast_sign_loss
47 )]
48 let partition = (hash % num_partitions as u32) as i32;
49 partition
50 })
51 }
52
53 fn reset(&mut self) {}
54}
55
56#[derive(Debug)]
58pub struct RoundRobinPartitioner {
59 counter: u64,
60}
61
62impl RoundRobinPartitioner {
63 #[must_use]
65 pub fn new() -> Self {
66 Self { counter: 0 }
67 }
68}
69
70impl Default for RoundRobinPartitioner {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl KafkaPartitioner for RoundRobinPartitioner {
77 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
78 fn partition(&mut self, _key: Option<&[u8]>, num_partitions: i32) -> Option<i32> {
79 let partition = (self.counter % num_partitions as u64) as i32;
80 self.counter += 1;
81 Some(partition)
82 }
83
84 fn reset(&mut self) {}
85}
86
87#[derive(Debug)]
92pub struct StickyPartitioner {
93 current_partition: i32,
94 records_in_batch: usize,
95 batch_threshold: usize,
96}
97
98impl StickyPartitioner {
99 #[must_use]
101 pub fn new(batch_threshold: usize) -> Self {
102 Self {
103 current_partition: 0,
104 records_in_batch: 0,
105 batch_threshold,
106 }
107 }
108}
109
110impl KafkaPartitioner for StickyPartitioner {
111 fn partition(&mut self, _key: Option<&[u8]>, num_partitions: i32) -> Option<i32> {
112 if self.records_in_batch >= self.batch_threshold {
113 self.current_partition = (self.current_partition + 1) % num_partitions;
114 self.records_in_batch = 0;
115 }
116 self.records_in_batch += 1;
117 Some(self.current_partition)
118 }
119
120 fn reset(&mut self) {
121 self.records_in_batch = 0;
122 }
123}
124
125fn murmur2(data: &[u8]) -> u32 {
129 let seed: u32 = 0x9747_b28c;
130 let m: u32 = 0x5bd1_e995;
131 let r: u32 = 24;
132
133 let len = data.len();
134 #[allow(clippy::cast_possible_truncation)] let mut h: u32 = seed ^ (len as u32);
136
137 let chunks = len / 4;
138 for i in 0..chunks {
139 let offset = i * 4;
140 let mut k = u32::from_le_bytes([
141 data[offset],
142 data[offset + 1],
143 data[offset + 2],
144 data[offset + 3],
145 ]);
146 k = k.wrapping_mul(m);
147 k ^= k >> r;
148 k = k.wrapping_mul(m);
149 h = h.wrapping_mul(m);
150 h ^= k;
151 }
152
153 let remainder = len % 4;
154 let tail_start = chunks * 4;
155 if remainder >= 3 {
156 h ^= u32::from(data[tail_start + 2]) << 16;
157 }
158 if remainder >= 2 {
159 h ^= u32::from(data[tail_start + 1]) << 8;
160 }
161 if remainder >= 1 {
162 h ^= u32::from(data[tail_start]);
163 h = h.wrapping_mul(m);
164 }
165
166 h ^= h >> 13;
167 h = h.wrapping_mul(m);
168 h ^= h >> 15;
169
170 h
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn test_murmur2_known_values() {
179 let h = murmur2(b"");
181 assert_ne!(h, 0);
182
183 let h = murmur2(b"key1");
185 assert_ne!(h, 0);
186 }
187
188 #[test]
189 fn test_murmur2_deterministic() {
190 let h1 = murmur2(b"test-key");
191 let h2 = murmur2(b"test-key");
192 assert_eq!(h1, h2);
193
194 let h3 = murmur2(b"different-key");
195 assert_ne!(h1, h3);
196 }
197
198 #[test]
199 fn test_key_hash_partitioner_with_key() {
200 let mut p = KeyHashPartitioner::new();
201 let partition = p.partition(Some(b"order-123"), 6);
202 assert!(partition.is_some());
203 let part = partition.unwrap();
204 assert!((0..6).contains(&part));
205
206 let partition2 = p.partition(Some(b"order-123"), 6);
208 assert_eq!(partition, partition2);
209 }
210
211 #[test]
212 fn test_key_hash_partitioner_no_key() {
213 let mut p = KeyHashPartitioner::new();
214 assert_eq!(p.partition(None, 6), None);
215 }
216
217 #[test]
218 fn test_round_robin_partitioner() {
219 let mut p = RoundRobinPartitioner::new();
220 assert_eq!(p.partition(None, 3), Some(0));
221 assert_eq!(p.partition(None, 3), Some(1));
222 assert_eq!(p.partition(None, 3), Some(2));
223 assert_eq!(p.partition(None, 3), Some(0)); }
225
226 #[test]
227 fn test_round_robin_ignores_key() {
228 let mut p = RoundRobinPartitioner::new();
229 assert_eq!(p.partition(Some(b"key"), 3), Some(0));
230 assert_eq!(p.partition(Some(b"key"), 3), Some(1));
231 }
232
233 #[test]
234 fn test_sticky_partitioner() {
235 let mut p = StickyPartitioner::new(3);
236
237 assert_eq!(p.partition(None, 4), Some(0));
239 assert_eq!(p.partition(None, 4), Some(0));
240 assert_eq!(p.partition(None, 4), Some(0));
241
242 assert_eq!(p.partition(None, 4), Some(1));
244 assert_eq!(p.partition(None, 4), Some(1));
245 assert_eq!(p.partition(None, 4), Some(1));
246
247 assert_eq!(p.partition(None, 4), Some(2));
249 }
250
251 #[test]
252 fn test_sticky_partitioner_reset() {
253 let mut p = StickyPartitioner::new(2);
254 p.partition(None, 3);
255 p.partition(None, 3);
256
257 p.reset(); assert_eq!(p.partition(None, 3), Some(0));
261 assert_eq!(p.partition(None, 3), Some(0));
262 assert_eq!(p.partition(None, 3), Some(1));
264 }
265}