laminar_connectors/cdc/mysql/
gtid.rs1use std::fmt;
16use std::str::FromStr;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct Gtid {
25 source_id: uuid::Uuid,
27 transaction_id: u64,
29}
30
31impl Gtid {
32 #[must_use]
34 pub fn new(source_id: uuid::Uuid, transaction_id: u64) -> Self {
35 Self {
36 source_id,
37 transaction_id,
38 }
39 }
40
41 #[must_use]
43 pub fn source_id(&self) -> uuid::Uuid {
44 self.source_id
45 }
46
47 #[must_use]
49 pub fn transaction_id(&self) -> u64 {
50 self.transaction_id
51 }
52
53 pub fn from_string(s: &str) -> Result<Self, GtidParseError> {
59 s.parse()
60 }
61}
62
63impl fmt::Display for Gtid {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 write!(f, "{}:{}", self.source_id, self.transaction_id)
66 }
67}
68
69impl FromStr for Gtid {
70 type Err = GtidParseError;
71
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 let parts: Vec<&str> = s.split(':').collect();
74 if parts.len() != 2 {
75 return Err(GtidParseError::InvalidFormat(s.to_string()));
76 }
77
78 let source_id = uuid::Uuid::parse_str(parts[0])
79 .map_err(|e| GtidParseError::InvalidUuid(e.to_string()))?;
80
81 let transaction_id = parts[1]
82 .parse::<u64>()
83 .map_err(|_| GtidParseError::InvalidTransactionId(parts[1].to_string()))?;
84
85 Ok(Self {
86 source_id,
87 transaction_id,
88 })
89 }
90}
91
92impl PartialOrd for Gtid {
93 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
94 Some(self.cmp(other))
95 }
96}
97
98impl Ord for Gtid {
99 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
100 match self.source_id.cmp(&other.source_id) {
102 std::cmp::Ordering::Equal => self.transaction_id.cmp(&other.transaction_id),
103 other => other,
104 }
105 }
106}
107
108#[derive(Debug, Clone, PartialEq, Eq, Default)]
113pub struct GtidSet {
114 sets: std::collections::HashMap<uuid::Uuid, Vec<GtidRange>>,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct GtidRange {
121 pub start: u64,
123 pub end: u64,
125}
126
127impl GtidRange {
128 #[must_use]
130 pub fn single(id: u64) -> Self {
131 Self { start: id, end: id }
132 }
133
134 #[must_use]
136 pub fn range(start: u64, end: u64) -> Self {
137 Self { start, end }
138 }
139
140 #[must_use]
142 pub fn contains(&self, id: u64) -> bool {
143 id >= self.start && id <= self.end
144 }
145}
146
147impl GtidSet {
148 #[must_use]
150 pub fn new() -> Self {
151 Self::default()
152 }
153
154 pub fn add(&mut self, gtid: &Gtid) {
156 let ranges = self.sets.entry(gtid.source_id).or_default();
157
158 let tid = gtid.transaction_id;
160
161 ranges.push(GtidRange::single(tid));
164 }
165
166 #[must_use]
168 pub fn contains(&self, gtid: &Gtid) -> bool {
169 self.sets
170 .get(>id.source_id)
171 .is_some_and(|ranges| ranges.iter().any(|r| r.contains(gtid.transaction_id)))
172 }
173
174 #[must_use]
176 pub fn is_empty(&self) -> bool {
177 self.sets.is_empty()
178 }
179
180 #[must_use]
182 pub fn source_count(&self) -> usize {
183 self.sets.len()
184 }
185
186 pub fn iter_sets(&self) -> impl Iterator<Item = (&uuid::Uuid, &Vec<GtidRange>)> {
188 self.sets.iter()
189 }
190}
191
192impl fmt::Display for GtidSet {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 let mut parts = Vec::new();
195 for (source_id, ranges) in &self.sets {
196 for range in ranges {
197 if range.start == range.end {
198 parts.push(format!("{source_id}:{}", range.start));
199 } else {
200 parts.push(format!("{source_id}:{}-{}", range.start, range.end));
201 }
202 }
203 }
204 write!(f, "{}", parts.join(","))
205 }
206}
207
208impl FromStr for GtidSet {
209 type Err = GtidParseError;
210
211 fn from_str(s: &str) -> Result<Self, Self::Err> {
212 if s.is_empty() {
213 return Ok(Self::new());
214 }
215
216 let mut sets = std::collections::HashMap::new();
217
218 for part in s.split(',') {
219 let part = part.trim();
220 if part.is_empty() {
221 continue;
222 }
223
224 let colon_pos = part
226 .rfind(':')
227 .ok_or_else(|| GtidParseError::InvalidFormat(part.to_string()))?;
228
229 let uuid_str = &part[..colon_pos];
230 let range_str = &part[colon_pos + 1..];
231
232 let source_id = uuid::Uuid::parse_str(uuid_str)
233 .map_err(|e| GtidParseError::InvalidUuid(e.to_string()))?;
234
235 let range = if let Some(dash_pos) = range_str.find('-') {
236 let start = range_str[..dash_pos]
237 .parse::<u64>()
238 .map_err(|_| GtidParseError::InvalidTransactionId(range_str.to_string()))?;
239 let end = range_str[dash_pos + 1..]
240 .parse::<u64>()
241 .map_err(|_| GtidParseError::InvalidTransactionId(range_str.to_string()))?;
242 GtidRange::range(start, end)
243 } else {
244 let id = range_str
245 .parse::<u64>()
246 .map_err(|_| GtidParseError::InvalidTransactionId(range_str.to_string()))?;
247 GtidRange::single(id)
248 };
249
250 sets.entry(source_id).or_insert_with(Vec::new).push(range);
251 }
252
253 Ok(Self { sets })
254 }
255}
256
257#[derive(Debug, Clone, thiserror::Error)]
259#[allow(clippy::enum_variant_names)] pub enum GtidParseError {
261 #[error("invalid GTID format: {0}")]
263 InvalidFormat(String),
264
265 #[error("invalid UUID: {0}")]
267 InvalidUuid(String),
268
269 #[error("invalid transaction ID: {0}")]
271 InvalidTransactionId(String),
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 const TEST_UUID: &str = "3E11FA47-71CA-11E1-9E33-C80AA9429562";
279
280 #[test]
281 fn test_gtid_parse() {
282 let gtid: Gtid = format!("{TEST_UUID}:23").parse().unwrap();
283 assert_eq!(gtid.transaction_id(), 23);
284 assert_eq!(gtid.source_id(), uuid::Uuid::parse_str(TEST_UUID).unwrap());
285 }
286
287 #[test]
288 fn test_gtid_display() {
289 let gtid: Gtid = format!("{TEST_UUID}:42").parse().unwrap();
290 let s = gtid.to_string();
291 assert!(s.contains(":42"));
292 }
293
294 #[test]
295 fn test_gtid_parse_invalid() {
296 assert!("invalid".parse::<Gtid>().is_err());
297 assert!("not-a-uuid:123".parse::<Gtid>().is_err());
298 assert!(format!("{TEST_UUID}:not-a-number").parse::<Gtid>().is_err());
299 }
300
301 #[test]
302 fn test_gtid_ordering() {
303 let gtid1: Gtid = format!("{TEST_UUID}:1").parse().unwrap();
304 let gtid2: Gtid = format!("{TEST_UUID}:2").parse().unwrap();
305 assert!(gtid1 < gtid2);
306 }
307
308 #[test]
309 fn test_gtid_set_empty() {
310 let set = GtidSet::new();
311 assert!(set.is_empty());
312 assert_eq!(set.source_count(), 0);
313 }
314
315 #[test]
316 fn test_gtid_set_add_and_contains() {
317 let mut set = GtidSet::new();
318 let gtid: Gtid = format!("{TEST_UUID}:5").parse().unwrap();
319
320 assert!(!set.contains(>id));
321 set.add(>id);
322 assert!(set.contains(>id));
323 assert_eq!(set.source_count(), 1);
324 }
325
326 #[test]
327 fn test_gtid_set_parse_single() {
328 let set: GtidSet = format!("{TEST_UUID}:1").parse().unwrap();
329 let gtid: Gtid = format!("{TEST_UUID}:1").parse().unwrap();
330 assert!(set.contains(>id));
331 }
332
333 #[test]
334 fn test_gtid_set_parse_range() {
335 let set: GtidSet = format!("{TEST_UUID}:1-10").parse().unwrap();
336 let gtid1: Gtid = format!("{TEST_UUID}:1").parse().unwrap();
337 let gtid5: Gtid = format!("{TEST_UUID}:5").parse().unwrap();
338 let gtid10: Gtid = format!("{TEST_UUID}:10").parse().unwrap();
339 let gtid11: Gtid = format!("{TEST_UUID}:11").parse().unwrap();
340
341 assert!(set.contains(>id1));
342 assert!(set.contains(>id5));
343 assert!(set.contains(>id10));
344 assert!(!set.contains(>id11));
345 }
346
347 #[test]
348 fn test_gtid_set_parse_multiple() {
349 let uuid2 = "4E11FA47-71CA-11E1-9E33-C80AA9429563";
350 let set: GtidSet = format!("{TEST_UUID}:1,{uuid2}:5").parse().unwrap();
351 assert_eq!(set.source_count(), 2);
352 }
353
354 #[test]
355 fn test_gtid_set_display() {
356 let mut set = GtidSet::new();
357 let gtid: Gtid = format!("{TEST_UUID}:42").parse().unwrap();
358 set.add(>id);
359 let s = set.to_string();
360 assert!(s.contains(":42"));
361 }
362
363 #[test]
364 fn test_gtid_range_contains() {
365 let range = GtidRange::range(5, 10);
366 assert!(!range.contains(4));
367 assert!(range.contains(5));
368 assert!(range.contains(7));
369 assert!(range.contains(10));
370 assert!(!range.contains(11));
371 }
372
373 #[test]
374 fn test_gtid_range_single() {
375 let range = GtidRange::single(5);
376 assert!(!range.contains(4));
377 assert!(range.contains(5));
378 assert!(!range.contains(6));
379 }
380}