Skip to main content

laminar_connectors/kafka/
partitioner.rs

1//! Kafka partitioning strategies.
2//!
3//! [`KafkaPartitioner`] determines which Kafka partition each record is
4//! sent to. Three strategies are provided:
5//!
6//! - [`KeyHashPartitioner`]: Murmur2 hash (Kafka-compatible default)
7//! - [`RoundRobinPartitioner`]: Cycle through partitions
8//! - [`StickyPartitioner`]: Batch to one partition until threshold
9
10/// Trait for determining the target Kafka partition for a record.
11///
12/// Implementations may be stateful (e.g., round-robin counter).
13pub trait KafkaPartitioner: Send + Sync {
14    /// Returns the target partition for the given key.
15    ///
16    /// Returns `None` if the partitioner defers to the broker (librdkafka)
17    /// default partitioning.
18    fn partition(&mut self, key: Option<&[u8]>, num_partitions: i32) -> Option<i32>;
19
20    /// Resets the partitioner state (e.g., on epoch boundary).
21    fn reset(&mut self);
22}
23
24/// Key-hash partitioner using Murmur2 (Kafka-compatible).
25///
26/// Produces the same partition assignment as Kafka's `DefaultPartitioner`
27/// for the same key bytes.
28#[derive(Debug, Default)]
29pub struct KeyHashPartitioner;
30
31impl KeyHashPartitioner {
32    /// Uses Kafka-compatible Murmur2 hash on the key bytes.
33    #[must_use]
34    pub fn new() -> Self {
35        Self
36    }
37}
38
39impl KafkaPartitioner for KeyHashPartitioner {
40    fn partition(&mut self, key: Option<&[u8]>, num_partitions: i32) -> Option<i32> {
41        key.map(|k| {
42            let hash = murmur2(k) & 0x7fff_ffff;
43            #[allow(
44                clippy::cast_possible_truncation,
45                clippy::cast_possible_wrap,
46                clippy::cast_sign_loss
47            )]
48            let partition = (hash % num_partitions as u32) as i32;
49            partition
50        })
51    }
52
53    fn reset(&mut self) {}
54}
55
56/// Round-robin partitioner distributing records evenly across partitions.
57#[derive(Debug)]
58pub struct RoundRobinPartitioner {
59    counter: u64,
60}
61
62impl RoundRobinPartitioner {
63    /// Cycles through partitions sequentially, ignoring keys.
64    #[must_use]
65    pub fn new() -> Self {
66        Self { counter: 0 }
67    }
68}
69
70impl Default for RoundRobinPartitioner {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl KafkaPartitioner for RoundRobinPartitioner {
77    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
78    fn partition(&mut self, _key: Option<&[u8]>, num_partitions: i32) -> Option<i32> {
79        let partition = (self.counter % num_partitions as u64) as i32;
80        self.counter += 1;
81        Some(partition)
82    }
83
84    fn reset(&mut self) {}
85}
86
87/// Sticky partitioner that batches records to the same partition.
88///
89/// Once `batch_threshold` records have been sent to one partition, rotates
90/// to the next. Reduces broker round-trips (Kafka KIP-794).
91#[derive(Debug)]
92pub struct StickyPartitioner {
93    current_partition: i32,
94    records_in_batch: usize,
95    batch_threshold: usize,
96}
97
98impl StickyPartitioner {
99    /// Creates a new sticky partitioner with the given batch threshold.
100    #[must_use]
101    pub fn new(batch_threshold: usize) -> Self {
102        Self {
103            current_partition: 0,
104            records_in_batch: 0,
105            batch_threshold,
106        }
107    }
108}
109
110impl KafkaPartitioner for StickyPartitioner {
111    fn partition(&mut self, _key: Option<&[u8]>, num_partitions: i32) -> Option<i32> {
112        if self.records_in_batch >= self.batch_threshold {
113            self.current_partition = (self.current_partition + 1) % num_partitions;
114            self.records_in_batch = 0;
115        }
116        self.records_in_batch += 1;
117        Some(self.current_partition)
118    }
119
120    fn reset(&mut self) {
121        self.records_in_batch = 0;
122    }
123}
124
125/// Murmur2 hash function compatible with Kafka's `DefaultPartitioner`.
126///
127/// This is the 32-bit version used by Kafka for key-based partitioning.
128fn murmur2(data: &[u8]) -> u32 {
129    let seed: u32 = 0x9747_b28c;
130    let m: u32 = 0x5bd1_e995;
131    let r: u32 = 24;
132
133    let len = data.len();
134    #[allow(clippy::cast_possible_truncation)] // MurmurHash2 spec: seed XOR with 32-bit length
135    let mut h: u32 = seed ^ (len as u32);
136
137    let chunks = len / 4;
138    for i in 0..chunks {
139        let offset = i * 4;
140        let mut k = u32::from_le_bytes([
141            data[offset],
142            data[offset + 1],
143            data[offset + 2],
144            data[offset + 3],
145        ]);
146        k = k.wrapping_mul(m);
147        k ^= k >> r;
148        k = k.wrapping_mul(m);
149        h = h.wrapping_mul(m);
150        h ^= k;
151    }
152
153    let remainder = len % 4;
154    let tail_start = chunks * 4;
155    if remainder >= 3 {
156        h ^= u32::from(data[tail_start + 2]) << 16;
157    }
158    if remainder >= 2 {
159        h ^= u32::from(data[tail_start + 1]) << 8;
160    }
161    if remainder >= 1 {
162        h ^= u32::from(data[tail_start]);
163        h = h.wrapping_mul(m);
164    }
165
166    h ^= h >> 13;
167    h = h.wrapping_mul(m);
168    h ^= h >> 15;
169
170    h
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn test_murmur2_known_values() {
179        // Empty key: seed goes through final mixing.
180        let h = murmur2(b"");
181        assert_ne!(h, 0);
182
183        // Known values consistent with Kafka's Java implementation.
184        let h = murmur2(b"key1");
185        assert_ne!(h, 0);
186    }
187
188    #[test]
189    fn test_murmur2_deterministic() {
190        let h1 = murmur2(b"test-key");
191        let h2 = murmur2(b"test-key");
192        assert_eq!(h1, h2);
193
194        let h3 = murmur2(b"different-key");
195        assert_ne!(h1, h3);
196    }
197
198    #[test]
199    fn test_key_hash_partitioner_with_key() {
200        let mut p = KeyHashPartitioner::new();
201        let partition = p.partition(Some(b"order-123"), 6);
202        assert!(partition.is_some());
203        let part = partition.unwrap();
204        assert!((0..6).contains(&part));
205
206        // Same key → same partition
207        let partition2 = p.partition(Some(b"order-123"), 6);
208        assert_eq!(partition, partition2);
209    }
210
211    #[test]
212    fn test_key_hash_partitioner_no_key() {
213        let mut p = KeyHashPartitioner::new();
214        assert_eq!(p.partition(None, 6), None);
215    }
216
217    #[test]
218    fn test_round_robin_partitioner() {
219        let mut p = RoundRobinPartitioner::new();
220        assert_eq!(p.partition(None, 3), Some(0));
221        assert_eq!(p.partition(None, 3), Some(1));
222        assert_eq!(p.partition(None, 3), Some(2));
223        assert_eq!(p.partition(None, 3), Some(0)); // wraps
224    }
225
226    #[test]
227    fn test_round_robin_ignores_key() {
228        let mut p = RoundRobinPartitioner::new();
229        assert_eq!(p.partition(Some(b"key"), 3), Some(0));
230        assert_eq!(p.partition(Some(b"key"), 3), Some(1));
231    }
232
233    #[test]
234    fn test_sticky_partitioner() {
235        let mut p = StickyPartitioner::new(3);
236
237        // First 3 records go to partition 0
238        assert_eq!(p.partition(None, 4), Some(0));
239        assert_eq!(p.partition(None, 4), Some(0));
240        assert_eq!(p.partition(None, 4), Some(0));
241
242        // 4th record rotates to partition 1
243        assert_eq!(p.partition(None, 4), Some(1));
244        assert_eq!(p.partition(None, 4), Some(1));
245        assert_eq!(p.partition(None, 4), Some(1));
246
247        // 7th record rotates to partition 2
248        assert_eq!(p.partition(None, 4), Some(2));
249    }
250
251    #[test]
252    fn test_sticky_partitioner_reset() {
253        let mut p = StickyPartitioner::new(2);
254        p.partition(None, 3);
255        p.partition(None, 3);
256
257        p.reset(); // resets count but keeps current partition
258
259        // After reset, stays on current partition
260        assert_eq!(p.partition(None, 3), Some(0));
261        assert_eq!(p.partition(None, 3), Some(0));
262        // Now rotates
263        assert_eq!(p.partition(None, 3), Some(1));
264    }
265}