Skip to main content

laminar_connectors/cdc/postgres/
decoder.rs

1//! `PostgreSQL` `pgoutput` logical replication protocol decoder.
2//!
3//! Implements a binary protocol parser for the `pgoutput` output plugin
4//! used by `PostgreSQL` logical replication (PG 10+). Parses WAL stream
5//! bytes into structured [`WalMessage`] variants.
6//!
7//! # Protocol Reference
8//!
9//! See `PostgreSQL` docs: "Logical Replication Message Formats"
10//! (<https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html>)
11
12use super::lsn::Lsn;
13use super::types::PgColumn;
14
15/// Offset from `PostgreSQL` epoch (2000-01-01) to Unix epoch (1970-01-01)
16/// in microseconds.
17const PG_EPOCH_OFFSET_US: i64 = 946_684_800_000_000;
18
19/// A decoded WAL message from the `pgoutput` protocol.
20#[derive(Debug, Clone, PartialEq)]
21pub enum WalMessage {
22    /// Transaction begin.
23    Begin(BeginMessage),
24    /// Transaction commit.
25    Commit(CommitMessage),
26    /// Relation (table) metadata.
27    Relation(RelationMessage),
28    /// Row inserted.
29    Insert(InsertMessage),
30    /// Row updated.
31    Update(UpdateMessage),
32    /// Row deleted.
33    Delete(DeleteMessage),
34    /// Table(s) truncated.
35    Truncate(TruncateMessage),
36    /// Origin information.
37    Origin(OriginMessage),
38    /// Custom type definition.
39    Type(TypeMessage),
40}
41
42/// Transaction begin message.
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct BeginMessage {
45    /// LSN of the final record of the transaction.
46    pub final_lsn: Lsn,
47    /// Commit timestamp in milliseconds since Unix epoch.
48    pub commit_ts_ms: i64,
49    /// Transaction ID (XID).
50    pub xid: u32,
51}
52
53/// Transaction commit message.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct CommitMessage {
56    /// Flags (currently unused by `PostgreSQL`).
57    pub flags: u8,
58    /// LSN of the commit record.
59    pub commit_lsn: Lsn,
60    /// End LSN of the transaction.
61    pub end_lsn: Lsn,
62    /// Commit timestamp in milliseconds since Unix epoch.
63    pub commit_ts_ms: i64,
64}
65
66/// Relation (table schema) message.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct RelationMessage {
69    /// Relation OID.
70    pub relation_id: u32,
71    /// Schema (namespace) name.
72    pub namespace: String,
73    /// Table name.
74    pub name: String,
75    /// Replica identity setting: 'd', 'n', 'f', or 'i'.
76    pub replica_identity: u8,
77    /// Column descriptors.
78    pub columns: Vec<PgColumn>,
79}
80
81/// Row insert message.
82#[derive(Debug, Clone, PartialEq)]
83pub struct InsertMessage {
84    /// Relation OID of the target table.
85    pub relation_id: u32,
86    /// The new row data.
87    pub new_tuple: TupleData,
88}
89
90/// Row update message.
91#[derive(Debug, Clone, PartialEq)]
92pub struct UpdateMessage {
93    /// Relation OID of the target table.
94    pub relation_id: u32,
95    /// Old row data (present when replica identity is FULL or INDEX).
96    pub old_tuple: Option<TupleData>,
97    /// The new row data.
98    pub new_tuple: TupleData,
99}
100
101/// Row delete message.
102#[derive(Debug, Clone, PartialEq)]
103pub struct DeleteMessage {
104    /// Relation OID of the target table.
105    pub relation_id: u32,
106    /// The old row data (key columns only unless REPLICA IDENTITY FULL).
107    pub old_tuple: TupleData,
108}
109
110/// Table truncate message.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct TruncateMessage {
113    /// Relation OIDs of truncated tables.
114    pub relation_ids: Vec<u32>,
115    /// Option flags: bit 0 = CASCADE, bit 1 = RESTART IDENTITY.
116    pub options: u8,
117}
118
119/// Origin message (for replication from a downstream).
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct OriginMessage {
122    /// Origin LSN.
123    pub origin_lsn: Lsn,
124    /// Origin name.
125    pub name: String,
126}
127
128/// Custom type definition message.
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct TypeMessage {
131    /// Type OID.
132    pub type_id: u32,
133    /// Schema (namespace) name.
134    pub namespace: String,
135    /// Type name.
136    pub name: String,
137}
138
139/// Tuple data containing column values.
140#[derive(Debug, Clone, PartialEq)]
141pub struct TupleData {
142    /// Column values in ordinal order.
143    pub columns: Vec<ColumnValue>,
144}
145
146/// A single column value in a tuple.
147#[derive(Debug, Clone, PartialEq)]
148pub enum ColumnValue {
149    /// NULL value.
150    Null,
151    /// Unchanged TOAST value (not sent by server).
152    Unchanged,
153    /// Text-format value.
154    Text(String),
155}
156
157impl ColumnValue {
158    /// Returns the text value if present.
159    #[must_use]
160    pub fn as_text(&self) -> Option<&str> {
161        match self {
162            ColumnValue::Text(s) => Some(s),
163            _ => None,
164        }
165    }
166
167    /// Returns `true` if the value is NULL.
168    #[must_use]
169    pub fn is_null(&self) -> bool {
170        matches!(self, ColumnValue::Null)
171    }
172}
173
174/// Errors from the `pgoutput` protocol decoder.
175#[derive(Debug, Clone, thiserror::Error)]
176pub enum DecoderError {
177    /// Not enough bytes to read the expected value.
178    #[error("unexpected end of data at offset {offset}, need {needed} bytes")]
179    UnexpectedEof {
180        /// Current position in the buffer.
181        offset: usize,
182        /// Number of bytes needed.
183        needed: usize,
184    },
185
186    /// Invalid message type byte.
187    #[error("unknown message type: 0x{0:02X}")]
188    UnknownMessageType(u8),
189
190    /// Invalid or corrupted data.
191    #[error("invalid data: {0}")]
192    InvalidData(String),
193
194    /// Invalid UTF-8 in a string field.
195    #[error("invalid UTF-8 at offset {0}")]
196    InvalidUtf8(usize),
197}
198
199/// A cursor for reading binary data from a byte buffer.
200struct Cursor<'a> {
201    data: &'a [u8],
202    pos: usize,
203}
204
205impl<'a> Cursor<'a> {
206    fn new(data: &'a [u8]) -> Self {
207        Self { data, pos: 0 }
208    }
209
210    fn remaining(&self) -> usize {
211        self.data.len().saturating_sub(self.pos)
212    }
213
214    fn read_u8(&mut self) -> Result<u8, DecoderError> {
215        if self.pos >= self.data.len() {
216            return Err(DecoderError::UnexpectedEof {
217                offset: self.pos,
218                needed: 1,
219            });
220        }
221        let val = self.data[self.pos];
222        self.pos += 1;
223        Ok(val)
224    }
225
226    fn read_i16(&mut self) -> Result<i16, DecoderError> {
227        self.check_remaining(2)?;
228        let val = i16::from_be_bytes([self.data[self.pos], self.data[self.pos + 1]]);
229        self.pos += 2;
230        Ok(val)
231    }
232
233    fn read_i32(&mut self) -> Result<i32, DecoderError> {
234        self.check_remaining(4)?;
235        let bytes: [u8; 4] = self.data[self.pos..self.pos + 4].try_into().map_err(|_| {
236            DecoderError::UnexpectedEof {
237                offset: self.pos,
238                needed: 4,
239            }
240        })?;
241        let val = i32::from_be_bytes(bytes);
242        self.pos += 4;
243        Ok(val)
244    }
245
246    fn read_u32(&mut self) -> Result<u32, DecoderError> {
247        self.check_remaining(4)?;
248        let bytes: [u8; 4] = self.data[self.pos..self.pos + 4].try_into().map_err(|_| {
249            DecoderError::UnexpectedEof {
250                offset: self.pos,
251                needed: 4,
252            }
253        })?;
254        let val = u32::from_be_bytes(bytes);
255        self.pos += 4;
256        Ok(val)
257    }
258
259    fn read_i64(&mut self) -> Result<i64, DecoderError> {
260        self.check_remaining(8)?;
261        let bytes: [u8; 8] = self.data[self.pos..self.pos + 8].try_into().map_err(|_| {
262            DecoderError::UnexpectedEof {
263                offset: self.pos,
264                needed: 8,
265            }
266        })?;
267        let val = i64::from_be_bytes(bytes);
268        self.pos += 8;
269        Ok(val)
270    }
271
272    fn read_u64(&mut self) -> Result<u64, DecoderError> {
273        self.check_remaining(8)?;
274        let bytes: [u8; 8] = self.data[self.pos..self.pos + 8].try_into().map_err(|_| {
275            DecoderError::UnexpectedEof {
276                offset: self.pos,
277                needed: 8,
278            }
279        })?;
280        let val = u64::from_be_bytes(bytes);
281        self.pos += 8;
282        Ok(val)
283    }
284
285    /// Reads a null-terminated string.
286    fn read_cstring(&mut self) -> Result<String, DecoderError> {
287        let start = self.pos;
288        let nul_pos = self.data[self.pos..]
289            .iter()
290            .position(|&b| b == 0)
291            .ok_or(DecoderError::InvalidData("unterminated string".to_string()))?;
292
293        let s = std::str::from_utf8(&self.data[self.pos..self.pos + nul_pos])
294            .map_err(|_| DecoderError::InvalidUtf8(start))?;
295
296        self.pos += nul_pos + 1; // skip the NUL byte
297        Ok(s.to_string())
298    }
299
300    fn read_bytes(&mut self, len: usize) -> Result<&'a [u8], DecoderError> {
301        self.check_remaining(len)?;
302        let slice = &self.data[self.pos..self.pos + len];
303        self.pos += len;
304        Ok(slice)
305    }
306
307    fn check_remaining(&self, needed: usize) -> Result<(), DecoderError> {
308        if self.remaining() < needed {
309            return Err(DecoderError::UnexpectedEof {
310                offset: self.pos,
311                needed,
312            });
313        }
314        Ok(())
315    }
316}
317
318/// Converts a `PostgreSQL` timestamp (microseconds since 2000-01-01) to
319/// milliseconds since Unix epoch (1970-01-01).
320#[must_use]
321pub fn pg_timestamp_to_unix_ms(pg_us: i64) -> i64 {
322    (pg_us + PG_EPOCH_OFFSET_US) / 1000
323}
324
325/// Decodes a single `pgoutput` WAL message from raw bytes.
326///
327/// # Errors
328///
329/// Returns [`DecoderError`] if the data is truncated, malformed, or
330/// contains an unknown message type.
331pub fn decode_message(data: &[u8]) -> Result<WalMessage, DecoderError> {
332    if data.is_empty() {
333        return Err(DecoderError::InvalidData("empty message".to_string()));
334    }
335
336    let mut cur = Cursor::new(data);
337    let msg_type = cur.read_u8()?;
338
339    match msg_type {
340        b'B' => decode_begin(&mut cur),
341        b'C' => decode_commit(&mut cur),
342        b'R' => decode_relation(&mut cur),
343        b'I' => decode_insert(&mut cur),
344        b'U' => decode_update(&mut cur),
345        b'D' => decode_delete(&mut cur),
346        b'T' => decode_truncate(&mut cur),
347        b'O' => decode_origin(&mut cur),
348        b'Y' => decode_type(&mut cur),
349        _ => Err(DecoderError::UnknownMessageType(msg_type)),
350    }
351}
352
353fn decode_begin(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
354    let final_lsn = Lsn::new(cur.read_u64()?);
355    let commit_ts_us = cur.read_i64()?;
356    let xid = cur.read_u32()?;
357    Ok(WalMessage::Begin(BeginMessage {
358        final_lsn,
359        commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
360        xid,
361    }))
362}
363
364fn decode_commit(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
365    let flags = cur.read_u8()?;
366    let commit_lsn = Lsn::new(cur.read_u64()?);
367    let end_lsn = Lsn::new(cur.read_u64()?);
368    let commit_ts_us = cur.read_i64()?;
369    Ok(WalMessage::Commit(CommitMessage {
370        flags,
371        commit_lsn,
372        end_lsn,
373        commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
374    }))
375}
376
377fn decode_relation(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
378    let relation_id = cur.read_u32()?;
379    let namespace = cur.read_cstring()?;
380    let name = cur.read_cstring()?;
381    let replica_identity = cur.read_u8()?;
382    let n_cols_raw = cur.read_i16()?;
383    let n_cols = usize::try_from(n_cols_raw)
384        .map_err(|_| DecoderError::InvalidData(format!("negative column count: {n_cols_raw}")))?;
385
386    let mut columns = Vec::with_capacity(n_cols);
387    for _ in 0..n_cols {
388        let flags = cur.read_u8()?;
389        let col_name = cur.read_cstring()?;
390        let type_oid = cur.read_u32()?;
391        let type_modifier = cur.read_i32()?;
392        columns.push(PgColumn::new(
393            col_name,
394            type_oid,
395            type_modifier,
396            flags & 1 != 0,
397        ));
398    }
399
400    Ok(WalMessage::Relation(RelationMessage {
401        relation_id,
402        namespace,
403        name,
404        replica_identity,
405        columns,
406    }))
407}
408
409fn decode_insert(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
410    let relation_id = cur.read_u32()?;
411    let tag = cur.read_u8()?;
412    if tag != b'N' {
413        return Err(DecoderError::InvalidData(format!(
414            "expected 'N' tag in INSERT, got 0x{tag:02X}"
415        )));
416    }
417    let new_tuple = decode_tuple_data(cur)?;
418    Ok(WalMessage::Insert(InsertMessage {
419        relation_id,
420        new_tuple,
421    }))
422}
423
424fn decode_update(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
425    let relation_id = cur.read_u32()?;
426    let tag = cur.read_u8()?;
427
428    let (old_tuple, new_tuple) = match tag {
429        // No old tuple, just new
430        b'N' => (None, decode_tuple_data(cur)?),
431        // Old key tuple followed by new
432        b'K' | b'O' => {
433            let old = decode_tuple_data(cur)?;
434            let new_tag = cur.read_u8()?;
435            if new_tag != b'N' {
436                return Err(DecoderError::InvalidData(format!(
437                    "expected 'N' tag after old tuple in UPDATE, got 0x{new_tag:02X}"
438                )));
439            }
440            let new = decode_tuple_data(cur)?;
441            (Some(old), new)
442        }
443        _ => {
444            return Err(DecoderError::InvalidData(format!(
445                "unexpected tag in UPDATE: 0x{tag:02X}"
446            )));
447        }
448    };
449
450    Ok(WalMessage::Update(UpdateMessage {
451        relation_id,
452        old_tuple,
453        new_tuple,
454    }))
455}
456
457fn decode_delete(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
458    let relation_id = cur.read_u32()?;
459    let tag = cur.read_u8()?;
460    if tag != b'K' && tag != b'O' {
461        return Err(DecoderError::InvalidData(format!(
462            "expected 'K' or 'O' tag in DELETE, got 0x{tag:02X}"
463        )));
464    }
465    let old_tuple = decode_tuple_data(cur)?;
466    Ok(WalMessage::Delete(DeleteMessage {
467        relation_id,
468        old_tuple,
469    }))
470}
471
472fn decode_truncate(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
473    let n_relations = cur.read_u32()? as usize;
474    let options = cur.read_u8()?;
475    let mut relation_ids = Vec::with_capacity(n_relations);
476    for _ in 0..n_relations {
477        relation_ids.push(cur.read_u32()?);
478    }
479    Ok(WalMessage::Truncate(TruncateMessage {
480        relation_ids,
481        options,
482    }))
483}
484
485fn decode_origin(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
486    let origin_lsn = Lsn::new(cur.read_u64()?);
487    let name = cur.read_cstring()?;
488    Ok(WalMessage::Origin(OriginMessage { origin_lsn, name }))
489}
490
491fn decode_type(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
492    let type_id = cur.read_u32()?;
493    let namespace = cur.read_cstring()?;
494    let name = cur.read_cstring()?;
495    Ok(WalMessage::Type(TypeMessage {
496        type_id,
497        namespace,
498        name,
499    }))
500}
501
502fn decode_tuple_data(cur: &mut Cursor<'_>) -> Result<TupleData, DecoderError> {
503    let n_cols_raw = cur.read_i16()?;
504    let n_cols = usize::try_from(n_cols_raw)
505        .map_err(|_| DecoderError::InvalidData(format!("negative column count: {n_cols_raw}")))?;
506    let mut columns = Vec::with_capacity(n_cols);
507
508    for _ in 0..n_cols {
509        let col_type = cur.read_u8()?;
510        match col_type {
511            b'n' => columns.push(ColumnValue::Null),
512            b'u' => columns.push(ColumnValue::Unchanged),
513            b't' => {
514                let len_raw = cur.read_i32()?;
515                let len = usize::try_from(len_raw).map_err(|_| {
516                    DecoderError::InvalidData(format!("negative text length: {len_raw}"))
517                })?;
518                let data = cur.read_bytes(len)?;
519                let text = std::str::from_utf8(data)
520                    .map_err(|_| DecoderError::InvalidUtf8(cur.pos - len))?;
521                columns.push(ColumnValue::Text(text.to_string()));
522            }
523            _ => {
524                return Err(DecoderError::InvalidData(format!(
525                    "unknown column type: 0x{col_type:02X}"
526                )));
527            }
528        }
529    }
530
531    Ok(TupleData { columns })
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537
538    // ── Test helpers: build binary pgoutput messages ──
539
540    /// Helper to build binary messages for testing.
541    struct MessageBuilder {
542        buf: Vec<u8>,
543    }
544
545    impl MessageBuilder {
546        fn new(msg_type: u8) -> Self {
547            Self {
548                buf: vec![msg_type],
549            }
550        }
551
552        fn u8(mut self, v: u8) -> Self {
553            self.buf.push(v);
554            self
555        }
556
557        fn i16(mut self, v: i16) -> Self {
558            self.buf.extend_from_slice(&v.to_be_bytes());
559            self
560        }
561
562        fn i32(mut self, v: i32) -> Self {
563            self.buf.extend_from_slice(&v.to_be_bytes());
564            self
565        }
566
567        fn u32(mut self, v: u32) -> Self {
568            self.buf.extend_from_slice(&v.to_be_bytes());
569            self
570        }
571
572        fn i64(mut self, v: i64) -> Self {
573            self.buf.extend_from_slice(&v.to_be_bytes());
574            self
575        }
576
577        fn u64(mut self, v: u64) -> Self {
578            self.buf.extend_from_slice(&v.to_be_bytes());
579            self
580        }
581
582        fn cstring(mut self, s: &str) -> Self {
583            self.buf.extend_from_slice(s.as_bytes());
584            self.buf.push(0);
585            self
586        }
587
588        fn text_col(mut self, s: &str) -> Self {
589            self.buf.push(b't');
590            self.buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
591            self.buf.extend_from_slice(s.as_bytes());
592            self
593        }
594
595        fn null_col(mut self) -> Self {
596            self.buf.push(b'n');
597            self
598        }
599
600        fn unchanged_col(mut self) -> Self {
601            self.buf.push(b'u');
602            self
603        }
604
605        fn build(self) -> Vec<u8> {
606            self.buf
607        }
608    }
609
610    // ── Begin ──
611
612    #[test]
613    fn test_decode_begin() {
614        // Timestamp: 2024-01-01 00:00:00 UTC in PG microseconds
615        // PG epoch = 2000-01-01, so 24 years = ~757382400 seconds
616        let pg_ts_us: i64 = 757_382_400_000_000;
617        let data = MessageBuilder::new(b'B')
618            .u64(0x1234_ABCD) // final_lsn
619            .i64(pg_ts_us) // commit_ts (PG epoch)
620            .u32(42) // xid
621            .build();
622
623        let msg = decode_message(&data).unwrap();
624        match msg {
625            WalMessage::Begin(b) => {
626                assert_eq!(b.final_lsn.as_u64(), 0x1234_ABCD);
627                assert_eq!(b.xid, 42);
628                assert_eq!(b.commit_ts_ms, (pg_ts_us + PG_EPOCH_OFFSET_US) / 1000);
629            }
630            _ => panic!("expected Begin"),
631        }
632    }
633
634    // ── Commit ──
635
636    #[test]
637    fn test_decode_commit() {
638        let pg_ts_us: i64 = 757_382_400_000_000;
639        let data = MessageBuilder::new(b'C')
640            .u8(0) // flags
641            .u64(0x100) // commit_lsn
642            .u64(0x200) // end_lsn
643            .i64(pg_ts_us) // commit_ts
644            .build();
645
646        let msg = decode_message(&data).unwrap();
647        match msg {
648            WalMessage::Commit(c) => {
649                assert_eq!(c.flags, 0);
650                assert_eq!(c.commit_lsn.as_u64(), 0x100);
651                assert_eq!(c.end_lsn.as_u64(), 0x200);
652            }
653            _ => panic!("expected Commit"),
654        }
655    }
656
657    // ── Relation ──
658
659    #[test]
660    fn test_decode_relation() {
661        let data = MessageBuilder::new(b'R')
662            .u32(16384) // relation_id
663            .cstring("public") // namespace
664            .cstring("users") // name
665            .u8(b'd') // replica_identity = default
666            .i16(2) // n_cols
667            // Column 1: id (key)
668            .u8(1) // flags = key
669            .cstring("id")
670            .u32(20) // int8 OID
671            .i32(-1) // type_modifier
672            // Column 2: name (not key)
673            .u8(0) // flags
674            .cstring("name")
675            .u32(25) // text OID
676            .i32(-1)
677            .build();
678
679        let msg = decode_message(&data).unwrap();
680        match msg {
681            WalMessage::Relation(r) => {
682                assert_eq!(r.relation_id, 16384);
683                assert_eq!(r.namespace, "public");
684                assert_eq!(r.name, "users");
685                assert_eq!(r.replica_identity, b'd');
686                assert_eq!(r.columns.len(), 2);
687                assert_eq!(r.columns[0].name, "id");
688                assert!(r.columns[0].is_key);
689                assert_eq!(r.columns[0].type_oid, 20);
690                assert_eq!(r.columns[1].name, "name");
691                assert!(!r.columns[1].is_key);
692            }
693            _ => panic!("expected Relation"),
694        }
695    }
696
697    // ── Insert ──
698
699    #[test]
700    fn test_decode_insert() {
701        let data = MessageBuilder::new(b'I')
702            .u32(16384) // relation_id
703            .u8(b'N') // new tuple tag
704            .i16(3) // n_cols
705            .text_col("42") // id
706            .text_col("Alice") // name
707            .null_col() // nullable field
708            .build();
709
710        let msg = decode_message(&data).unwrap();
711        match msg {
712            WalMessage::Insert(ins) => {
713                assert_eq!(ins.relation_id, 16384);
714                assert_eq!(ins.new_tuple.columns.len(), 3);
715                assert_eq!(ins.new_tuple.columns[0].as_text(), Some("42"));
716                assert_eq!(ins.new_tuple.columns[1].as_text(), Some("Alice"));
717                assert!(ins.new_tuple.columns[2].is_null());
718            }
719            _ => panic!("expected Insert"),
720        }
721    }
722
723    // ── Update (no old tuple) ──
724
725    #[test]
726    fn test_decode_update_no_old() {
727        let data = MessageBuilder::new(b'U')
728            .u32(16384)
729            .u8(b'N') // new tuple directly (no old)
730            .i16(2)
731            .text_col("42")
732            .text_col("Bob")
733            .build();
734
735        let msg = decode_message(&data).unwrap();
736        match msg {
737            WalMessage::Update(upd) => {
738                assert!(upd.old_tuple.is_none());
739                assert_eq!(upd.new_tuple.columns[1].as_text(), Some("Bob"));
740            }
741            _ => panic!("expected Update"),
742        }
743    }
744
745    // ── Update (with old tuple, REPLICA IDENTITY FULL) ──
746
747    #[test]
748    fn test_decode_update_with_old() {
749        let data = MessageBuilder::new(b'U')
750            .u32(16384)
751            .u8(b'O') // old tuple (FULL identity)
752            .i16(2) // old: 2 cols
753            .text_col("42")
754            .text_col("Alice")
755            .u8(b'N') // new tuple tag
756            .i16(2) // new: 2 cols
757            .text_col("42")
758            .text_col("Bob")
759            .build();
760
761        let msg = decode_message(&data).unwrap();
762        match msg {
763            WalMessage::Update(upd) => {
764                assert!(upd.old_tuple.is_some());
765                let old = upd.old_tuple.unwrap();
766                assert_eq!(old.columns[1].as_text(), Some("Alice"));
767                assert_eq!(upd.new_tuple.columns[1].as_text(), Some("Bob"));
768            }
769            _ => panic!("expected Update"),
770        }
771    }
772
773    // ── Delete ──
774
775    #[test]
776    fn test_decode_delete_key() {
777        let data = MessageBuilder::new(b'D')
778            .u32(16384)
779            .u8(b'K') // key columns only
780            .i16(1)
781            .text_col("42")
782            .build();
783
784        let msg = decode_message(&data).unwrap();
785        match msg {
786            WalMessage::Delete(del) => {
787                assert_eq!(del.relation_id, 16384);
788                assert_eq!(del.old_tuple.columns[0].as_text(), Some("42"));
789            }
790            _ => panic!("expected Delete"),
791        }
792    }
793
794    #[test]
795    fn test_decode_delete_full() {
796        let data = MessageBuilder::new(b'D')
797            .u32(16384)
798            .u8(b'O') // full old row
799            .i16(2)
800            .text_col("42")
801            .text_col("Alice")
802            .build();
803
804        let msg = decode_message(&data).unwrap();
805        match msg {
806            WalMessage::Delete(del) => {
807                assert_eq!(del.old_tuple.columns.len(), 2);
808            }
809            _ => panic!("expected Delete"),
810        }
811    }
812
813    // ── Truncate ──
814
815    #[test]
816    fn test_decode_truncate() {
817        let data = MessageBuilder::new(b'T')
818            .u32(2) // 2 relations
819            .u8(1) // CASCADE
820            .u32(16384) // first relation
821            .u32(16385) // second relation
822            .build();
823
824        let msg = decode_message(&data).unwrap();
825        match msg {
826            WalMessage::Truncate(t) => {
827                assert_eq!(t.relation_ids, vec![16384, 16385]);
828                assert_eq!(t.options, 1);
829            }
830            _ => panic!("expected Truncate"),
831        }
832    }
833
834    // ── Origin ──
835
836    #[test]
837    fn test_decode_origin() {
838        let data = MessageBuilder::new(b'O')
839            .u64(0xABCD)
840            .cstring("upstream")
841            .build();
842
843        let msg = decode_message(&data).unwrap();
844        match msg {
845            WalMessage::Origin(o) => {
846                assert_eq!(o.origin_lsn.as_u64(), 0xABCD);
847                assert_eq!(o.name, "upstream");
848            }
849            _ => panic!("expected Origin"),
850        }
851    }
852
853    // ── Type ──
854
855    #[test]
856    fn test_decode_type() {
857        let data = MessageBuilder::new(b'Y')
858            .u32(12345)
859            .cstring("public")
860            .cstring("my_enum")
861            .build();
862
863        let msg = decode_message(&data).unwrap();
864        match msg {
865            WalMessage::Type(t) => {
866                assert_eq!(t.type_id, 12345);
867                assert_eq!(t.namespace, "public");
868                assert_eq!(t.name, "my_enum");
869            }
870            _ => panic!("expected Type"),
871        }
872    }
873
874    // ── Tuple data with unchanged TOAST column ──
875
876    #[test]
877    fn test_decode_insert_with_unchanged() {
878        let data = MessageBuilder::new(b'I')
879            .u32(16384)
880            .u8(b'N')
881            .i16(2)
882            .text_col("42")
883            .unchanged_col()
884            .build();
885
886        let msg = decode_message(&data).unwrap();
887        match msg {
888            WalMessage::Insert(ins) => {
889                assert_eq!(ins.new_tuple.columns[0].as_text(), Some("42"));
890                assert!(matches!(ins.new_tuple.columns[1], ColumnValue::Unchanged));
891            }
892            _ => panic!("expected Insert"),
893        }
894    }
895
896    // ── Error cases ──
897
898    #[test]
899    fn test_decode_empty_data() {
900        assert!(decode_message(&[]).is_err());
901    }
902
903    #[test]
904    fn test_decode_unknown_type() {
905        let err = decode_message(&[0xFF]).unwrap_err();
906        assert!(matches!(err, DecoderError::UnknownMessageType(0xFF)));
907    }
908
909    #[test]
910    fn test_decode_truncated_begin() {
911        // Begin needs 20 bytes after type, only give 4
912        let data = MessageBuilder::new(b'B').u32(0).build();
913        assert!(decode_message(&data).is_err());
914    }
915
916    #[test]
917    fn test_decode_invalid_insert_tag() {
918        let data = MessageBuilder::new(b'I')
919            .u32(16384)
920            .u8(b'X') // invalid tag
921            .build();
922        assert!(decode_message(&data).is_err());
923    }
924
925    // ── Timestamp conversion ──
926
927    #[test]
928    fn test_pg_timestamp_to_unix_ms() {
929        // 2000-01-01 00:00:00 UTC in PG epoch = 0
930        // In Unix epoch = 946684800 seconds = 946684800000 ms
931        assert_eq!(pg_timestamp_to_unix_ms(0), 946_684_800_000);
932
933        // 2024-01-01 00:00:00 UTC
934        // PG epoch: 757382400 seconds = 757382400000000 us
935        let pg_us: i64 = 757_382_400_000_000;
936        let expected_unix_ms = (pg_us + PG_EPOCH_OFFSET_US) / 1000;
937        assert_eq!(pg_timestamp_to_unix_ms(pg_us), expected_unix_ms);
938    }
939
940    // ── ColumnValue methods ──
941
942    #[test]
943    fn test_column_value_as_text() {
944        let text = ColumnValue::Text("hello".to_string());
945        assert_eq!(text.as_text(), Some("hello"));
946        assert!(!text.is_null());
947
948        let null = ColumnValue::Null;
949        assert_eq!(null.as_text(), None);
950        assert!(null.is_null());
951
952        let unchanged = ColumnValue::Unchanged;
953        assert_eq!(unchanged.as_text(), None);
954        assert!(!unchanged.is_null());
955    }
956
957    // ── Update with key identity ──
958
959    #[test]
960    fn test_decode_update_with_key_identity() {
961        let data = MessageBuilder::new(b'U')
962            .u32(16384)
963            .u8(b'K') // key identity
964            .i16(1) // old: 1 col (key only)
965            .text_col("42")
966            .u8(b'N') // new tuple
967            .i16(2) // new: 2 cols
968            .text_col("42")
969            .text_col("Updated")
970            .build();
971
972        let msg = decode_message(&data).unwrap();
973        match msg {
974            WalMessage::Update(upd) => {
975                let old = upd.old_tuple.unwrap();
976                assert_eq!(old.columns.len(), 1);
977                assert_eq!(upd.new_tuple.columns.len(), 2);
978            }
979            _ => panic!("expected Update"),
980        }
981    }
982}