Skip to main content

laminar_connectors/cdc/mysql/
gtid.rs

1//! MySQL GTID (Global Transaction Identifier) type.
2//!
3//! GTIDs uniquely identify transactions across a MySQL replication topology.
4//! Format: `source_id:transaction_id` where source_id is a UUID.
5//!
6//! # Examples
7//!
8//! ```
9//! use laminar_connectors::cdc::mysql::Gtid;
10//!
11//! let gtid: Gtid = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23".parse().unwrap();
12//! assert_eq!(gtid.transaction_id(), 23);
13//! ```
14
15use std::fmt;
16use std::str::FromStr;
17
18/// A MySQL Global Transaction Identifier (GTID).
19///
20/// GTIDs have the format `source_id:transaction_id` where:
21/// - `source_id` is a UUID identifying the server that originated the transaction
22/// - `transaction_id` is a 64-bit sequence number
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct Gtid {
25    /// Server UUID (source_id).
26    source_id: uuid::Uuid,
27    /// Transaction sequence number.
28    transaction_id: u64,
29}
30
31impl Gtid {
32    /// Creates a new GTID from components.
33    #[must_use]
34    pub fn new(source_id: uuid::Uuid, transaction_id: u64) -> Self {
35        Self {
36            source_id,
37            transaction_id,
38        }
39    }
40
41    /// Returns the server UUID (source_id).
42    #[must_use]
43    pub fn source_id(&self) -> uuid::Uuid {
44        self.source_id
45    }
46
47    /// Returns the transaction sequence number.
48    #[must_use]
49    pub fn transaction_id(&self) -> u64 {
50        self.transaction_id
51    }
52
53    /// Creates GTID from raw string representation.
54    ///
55    /// # Errors
56    ///
57    /// Returns error if the string is not a valid GTID format.
58    pub fn from_string(s: &str) -> Result<Self, GtidParseError> {
59        s.parse()
60    }
61}
62
63impl fmt::Display for Gtid {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        write!(f, "{}:{}", self.source_id, self.transaction_id)
66    }
67}
68
69impl FromStr for Gtid {
70    type Err = GtidParseError;
71
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        let parts: Vec<&str> = s.split(':').collect();
74        if parts.len() != 2 {
75            return Err(GtidParseError::InvalidFormat(s.to_string()));
76        }
77
78        let source_id = uuid::Uuid::parse_str(parts[0])
79            .map_err(|e| GtidParseError::InvalidUuid(e.to_string()))?;
80
81        let transaction_id = parts[1]
82            .parse::<u64>()
83            .map_err(|_| GtidParseError::InvalidTransactionId(parts[1].to_string()))?;
84
85        Ok(Self {
86            source_id,
87            transaction_id,
88        })
89    }
90}
91
92impl PartialOrd for Gtid {
93    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
94        Some(self.cmp(other))
95    }
96}
97
98impl Ord for Gtid {
99    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
100        // Compare by source_id first, then transaction_id
101        match self.source_id.cmp(&other.source_id) {
102            std::cmp::Ordering::Equal => self.transaction_id.cmp(&other.transaction_id),
103            other => other,
104        }
105    }
106}
107
108/// A set of GTIDs representing a position in the replication stream.
109///
110/// Format: `uuid1:interval1,uuid2:interval2,...`
111/// where interval can be `n` or `n-m` (inclusive range).
112#[derive(Debug, Clone, PartialEq, Eq, Default)]
113pub struct GtidSet {
114    /// Map of source_id UUID to executed transaction ranges.
115    sets: std::collections::HashMap<uuid::Uuid, Vec<GtidRange>>,
116}
117
118/// A range of transaction IDs for a single source.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct GtidRange {
121    /// Start transaction ID (inclusive).
122    pub start: u64,
123    /// End transaction ID (inclusive).
124    pub end: u64,
125}
126
127impl GtidRange {
128    /// Creates a single-transaction range.
129    #[must_use]
130    pub fn single(id: u64) -> Self {
131        Self { start: id, end: id }
132    }
133
134    /// Creates a range from start to end (inclusive).
135    #[must_use]
136    pub fn range(start: u64, end: u64) -> Self {
137        Self { start, end }
138    }
139
140    /// Returns true if this range contains the given transaction ID.
141    #[must_use]
142    pub fn contains(&self, id: u64) -> bool {
143        id >= self.start && id <= self.end
144    }
145}
146
147impl GtidSet {
148    /// Creates an empty GTID set.
149    #[must_use]
150    pub fn new() -> Self {
151        Self::default()
152    }
153
154    /// Adds a GTID to the set.
155    pub fn add(&mut self, gtid: &Gtid) {
156        let ranges = self.sets.entry(gtid.source_id).or_default();
157
158        // Try to extend existing range or add new one
159        let tid = gtid.transaction_id;
160
161        // Simple implementation: just add as new range
162        // A production implementation would merge adjacent ranges
163        ranges.push(GtidRange::single(tid));
164    }
165
166    /// Returns true if the set contains the given GTID.
167    #[must_use]
168    pub fn contains(&self, gtid: &Gtid) -> bool {
169        self.sets
170            .get(&gtid.source_id)
171            .is_some_and(|ranges| ranges.iter().any(|r| r.contains(gtid.transaction_id)))
172    }
173
174    /// Returns true if the set is empty.
175    #[must_use]
176    pub fn is_empty(&self) -> bool {
177        self.sets.is_empty()
178    }
179
180    /// Returns the number of unique source IDs.
181    #[must_use]
182    pub fn source_count(&self) -> usize {
183        self.sets.len()
184    }
185
186    /// Iterates over source UUIDs and their transaction ranges.
187    pub fn iter_sets(&self) -> impl Iterator<Item = (&uuid::Uuid, &Vec<GtidRange>)> {
188        self.sets.iter()
189    }
190}
191
192impl fmt::Display for GtidSet {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        let mut parts = Vec::new();
195        for (source_id, ranges) in &self.sets {
196            for range in ranges {
197                if range.start == range.end {
198                    parts.push(format!("{source_id}:{}", range.start));
199                } else {
200                    parts.push(format!("{source_id}:{}-{}", range.start, range.end));
201                }
202            }
203        }
204        write!(f, "{}", parts.join(","))
205    }
206}
207
208impl FromStr for GtidSet {
209    type Err = GtidParseError;
210
211    fn from_str(s: &str) -> Result<Self, Self::Err> {
212        if s.is_empty() {
213            return Ok(Self::new());
214        }
215
216        let mut sets = std::collections::HashMap::new();
217
218        for part in s.split(',') {
219            let part = part.trim();
220            if part.is_empty() {
221                continue;
222            }
223
224            // Format: uuid:n or uuid:n-m
225            let colon_pos = part
226                .rfind(':')
227                .ok_or_else(|| GtidParseError::InvalidFormat(part.to_string()))?;
228
229            let uuid_str = &part[..colon_pos];
230            let range_str = &part[colon_pos + 1..];
231
232            let source_id = uuid::Uuid::parse_str(uuid_str)
233                .map_err(|e| GtidParseError::InvalidUuid(e.to_string()))?;
234
235            let range = if let Some(dash_pos) = range_str.find('-') {
236                let start = range_str[..dash_pos]
237                    .parse::<u64>()
238                    .map_err(|_| GtidParseError::InvalidTransactionId(range_str.to_string()))?;
239                let end = range_str[dash_pos + 1..]
240                    .parse::<u64>()
241                    .map_err(|_| GtidParseError::InvalidTransactionId(range_str.to_string()))?;
242                GtidRange::range(start, end)
243            } else {
244                let id = range_str
245                    .parse::<u64>()
246                    .map_err(|_| GtidParseError::InvalidTransactionId(range_str.to_string()))?;
247                GtidRange::single(id)
248            };
249
250            sets.entry(source_id).or_insert_with(Vec::new).push(range);
251        }
252
253        Ok(Self { sets })
254    }
255}
256
257/// Errors from parsing GTID strings.
258#[derive(Debug, Clone, thiserror::Error)]
259#[allow(clippy::enum_variant_names)] // "Invalid" prefix is descriptive for parse errors
260pub enum GtidParseError {
261    /// Invalid GTID format.
262    #[error("invalid GTID format: {0}")]
263    InvalidFormat(String),
264
265    /// Invalid UUID in GTID.
266    #[error("invalid UUID: {0}")]
267    InvalidUuid(String),
268
269    /// Invalid transaction ID.
270    #[error("invalid transaction ID: {0}")]
271    InvalidTransactionId(String),
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    const TEST_UUID: &str = "3E11FA47-71CA-11E1-9E33-C80AA9429562";
279
280    #[test]
281    fn test_gtid_parse() {
282        let gtid: Gtid = format!("{TEST_UUID}:23").parse().unwrap();
283        assert_eq!(gtid.transaction_id(), 23);
284        assert_eq!(gtid.source_id(), uuid::Uuid::parse_str(TEST_UUID).unwrap());
285    }
286
287    #[test]
288    fn test_gtid_display() {
289        let gtid: Gtid = format!("{TEST_UUID}:42").parse().unwrap();
290        let s = gtid.to_string();
291        assert!(s.contains(":42"));
292    }
293
294    #[test]
295    fn test_gtid_parse_invalid() {
296        assert!("invalid".parse::<Gtid>().is_err());
297        assert!("not-a-uuid:123".parse::<Gtid>().is_err());
298        assert!(format!("{TEST_UUID}:not-a-number").parse::<Gtid>().is_err());
299    }
300
301    #[test]
302    fn test_gtid_ordering() {
303        let gtid1: Gtid = format!("{TEST_UUID}:1").parse().unwrap();
304        let gtid2: Gtid = format!("{TEST_UUID}:2").parse().unwrap();
305        assert!(gtid1 < gtid2);
306    }
307
308    #[test]
309    fn test_gtid_set_empty() {
310        let set = GtidSet::new();
311        assert!(set.is_empty());
312        assert_eq!(set.source_count(), 0);
313    }
314
315    #[test]
316    fn test_gtid_set_add_and_contains() {
317        let mut set = GtidSet::new();
318        let gtid: Gtid = format!("{TEST_UUID}:5").parse().unwrap();
319
320        assert!(!set.contains(&gtid));
321        set.add(&gtid);
322        assert!(set.contains(&gtid));
323        assert_eq!(set.source_count(), 1);
324    }
325
326    #[test]
327    fn test_gtid_set_parse_single() {
328        let set: GtidSet = format!("{TEST_UUID}:1").parse().unwrap();
329        let gtid: Gtid = format!("{TEST_UUID}:1").parse().unwrap();
330        assert!(set.contains(&gtid));
331    }
332
333    #[test]
334    fn test_gtid_set_parse_range() {
335        let set: GtidSet = format!("{TEST_UUID}:1-10").parse().unwrap();
336        let gtid1: Gtid = format!("{TEST_UUID}:1").parse().unwrap();
337        let gtid5: Gtid = format!("{TEST_UUID}:5").parse().unwrap();
338        let gtid10: Gtid = format!("{TEST_UUID}:10").parse().unwrap();
339        let gtid11: Gtid = format!("{TEST_UUID}:11").parse().unwrap();
340
341        assert!(set.contains(&gtid1));
342        assert!(set.contains(&gtid5));
343        assert!(set.contains(&gtid10));
344        assert!(!set.contains(&gtid11));
345    }
346
347    #[test]
348    fn test_gtid_set_parse_multiple() {
349        let uuid2 = "4E11FA47-71CA-11E1-9E33-C80AA9429563";
350        let set: GtidSet = format!("{TEST_UUID}:1,{uuid2}:5").parse().unwrap();
351        assert_eq!(set.source_count(), 2);
352    }
353
354    #[test]
355    fn test_gtid_set_display() {
356        let mut set = GtidSet::new();
357        let gtid: Gtid = format!("{TEST_UUID}:42").parse().unwrap();
358        set.add(&gtid);
359        let s = set.to_string();
360        assert!(s.contains(":42"));
361    }
362
363    #[test]
364    fn test_gtid_range_contains() {
365        let range = GtidRange::range(5, 10);
366        assert!(!range.contains(4));
367        assert!(range.contains(5));
368        assert!(range.contains(7));
369        assert!(range.contains(10));
370        assert!(!range.contains(11));
371    }
372
373    #[test]
374    fn test_gtid_range_single() {
375        let range = GtidRange::single(5);
376        assert!(!range.contains(4));
377        assert!(range.contains(5));
378        assert!(!range.contains(6));
379    }
380}