1use super::lsn::Lsn;
13use super::types::PgColumn;
14
15const PG_EPOCH_OFFSET_US: i64 = 946_684_800_000_000;
18
19#[derive(Debug, Clone, PartialEq)]
21pub enum WalMessage {
22 Begin(BeginMessage),
24 Commit(CommitMessage),
26 Relation(RelationMessage),
28 Insert(InsertMessage),
30 Update(UpdateMessage),
32 Delete(DeleteMessage),
34 Truncate(TruncateMessage),
36 Origin(OriginMessage),
38 Type(TypeMessage),
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct BeginMessage {
45 pub final_lsn: Lsn,
47 pub commit_ts_ms: i64,
49 pub xid: u32,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct CommitMessage {
56 pub flags: u8,
58 pub commit_lsn: Lsn,
60 pub end_lsn: Lsn,
62 pub commit_ts_ms: i64,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct RelationMessage {
69 pub relation_id: u32,
71 pub namespace: String,
73 pub name: String,
75 pub replica_identity: u8,
77 pub columns: Vec<PgColumn>,
79}
80
81#[derive(Debug, Clone, PartialEq)]
83pub struct InsertMessage {
84 pub relation_id: u32,
86 pub new_tuple: TupleData,
88}
89
90#[derive(Debug, Clone, PartialEq)]
92pub struct UpdateMessage {
93 pub relation_id: u32,
95 pub old_tuple: Option<TupleData>,
97 pub new_tuple: TupleData,
99}
100
101#[derive(Debug, Clone, PartialEq)]
103pub struct DeleteMessage {
104 pub relation_id: u32,
106 pub old_tuple: TupleData,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct TruncateMessage {
113 pub relation_ids: Vec<u32>,
115 pub options: u8,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct OriginMessage {
122 pub origin_lsn: Lsn,
124 pub name: String,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct TypeMessage {
131 pub type_id: u32,
133 pub namespace: String,
135 pub name: String,
137}
138
139#[derive(Debug, Clone, PartialEq)]
141pub struct TupleData {
142 pub columns: Vec<ColumnValue>,
144}
145
146#[derive(Debug, Clone, PartialEq)]
148pub enum ColumnValue {
149 Null,
151 Unchanged,
153 Text(String),
155}
156
157impl ColumnValue {
158 #[must_use]
160 pub fn as_text(&self) -> Option<&str> {
161 match self {
162 ColumnValue::Text(s) => Some(s),
163 _ => None,
164 }
165 }
166
167 #[must_use]
169 pub fn is_null(&self) -> bool {
170 matches!(self, ColumnValue::Null)
171 }
172}
173
174#[derive(Debug, Clone, thiserror::Error)]
176pub enum DecoderError {
177 #[error("unexpected end of data at offset {offset}, need {needed} bytes")]
179 UnexpectedEof {
180 offset: usize,
182 needed: usize,
184 },
185
186 #[error("unknown message type: 0x{0:02X}")]
188 UnknownMessageType(u8),
189
190 #[error("invalid data: {0}")]
192 InvalidData(String),
193
194 #[error("invalid UTF-8 at offset {0}")]
196 InvalidUtf8(usize),
197}
198
199struct Cursor<'a> {
201 data: &'a [u8],
202 pos: usize,
203}
204
205impl<'a> Cursor<'a> {
206 fn new(data: &'a [u8]) -> Self {
207 Self { data, pos: 0 }
208 }
209
210 fn remaining(&self) -> usize {
211 self.data.len().saturating_sub(self.pos)
212 }
213
214 fn read_u8(&mut self) -> Result<u8, DecoderError> {
215 if self.pos >= self.data.len() {
216 return Err(DecoderError::UnexpectedEof {
217 offset: self.pos,
218 needed: 1,
219 });
220 }
221 let val = self.data[self.pos];
222 self.pos += 1;
223 Ok(val)
224 }
225
226 fn read_i16(&mut self) -> Result<i16, DecoderError> {
227 self.check_remaining(2)?;
228 let val = i16::from_be_bytes([self.data[self.pos], self.data[self.pos + 1]]);
229 self.pos += 2;
230 Ok(val)
231 }
232
233 fn read_i32(&mut self) -> Result<i32, DecoderError> {
234 self.check_remaining(4)?;
235 let bytes: [u8; 4] = self.data[self.pos..self.pos + 4].try_into().map_err(|_| {
236 DecoderError::UnexpectedEof {
237 offset: self.pos,
238 needed: 4,
239 }
240 })?;
241 let val = i32::from_be_bytes(bytes);
242 self.pos += 4;
243 Ok(val)
244 }
245
246 fn read_u32(&mut self) -> Result<u32, DecoderError> {
247 self.check_remaining(4)?;
248 let bytes: [u8; 4] = self.data[self.pos..self.pos + 4].try_into().map_err(|_| {
249 DecoderError::UnexpectedEof {
250 offset: self.pos,
251 needed: 4,
252 }
253 })?;
254 let val = u32::from_be_bytes(bytes);
255 self.pos += 4;
256 Ok(val)
257 }
258
259 fn read_i64(&mut self) -> Result<i64, DecoderError> {
260 self.check_remaining(8)?;
261 let bytes: [u8; 8] = self.data[self.pos..self.pos + 8].try_into().map_err(|_| {
262 DecoderError::UnexpectedEof {
263 offset: self.pos,
264 needed: 8,
265 }
266 })?;
267 let val = i64::from_be_bytes(bytes);
268 self.pos += 8;
269 Ok(val)
270 }
271
272 fn read_u64(&mut self) -> Result<u64, DecoderError> {
273 self.check_remaining(8)?;
274 let bytes: [u8; 8] = self.data[self.pos..self.pos + 8].try_into().map_err(|_| {
275 DecoderError::UnexpectedEof {
276 offset: self.pos,
277 needed: 8,
278 }
279 })?;
280 let val = u64::from_be_bytes(bytes);
281 self.pos += 8;
282 Ok(val)
283 }
284
285 fn read_cstring(&mut self) -> Result<String, DecoderError> {
287 let start = self.pos;
288 let nul_pos = self.data[self.pos..]
289 .iter()
290 .position(|&b| b == 0)
291 .ok_or(DecoderError::InvalidData("unterminated string".to_string()))?;
292
293 let s = std::str::from_utf8(&self.data[self.pos..self.pos + nul_pos])
294 .map_err(|_| DecoderError::InvalidUtf8(start))?;
295
296 self.pos += nul_pos + 1; Ok(s.to_string())
298 }
299
300 fn read_bytes(&mut self, len: usize) -> Result<&'a [u8], DecoderError> {
301 self.check_remaining(len)?;
302 let slice = &self.data[self.pos..self.pos + len];
303 self.pos += len;
304 Ok(slice)
305 }
306
307 fn check_remaining(&self, needed: usize) -> Result<(), DecoderError> {
308 if self.remaining() < needed {
309 return Err(DecoderError::UnexpectedEof {
310 offset: self.pos,
311 needed,
312 });
313 }
314 Ok(())
315 }
316}
317
318#[must_use]
321pub fn pg_timestamp_to_unix_ms(pg_us: i64) -> i64 {
322 (pg_us + PG_EPOCH_OFFSET_US) / 1000
323}
324
325pub fn decode_message(data: &[u8]) -> Result<WalMessage, DecoderError> {
332 if data.is_empty() {
333 return Err(DecoderError::InvalidData("empty message".to_string()));
334 }
335
336 let mut cur = Cursor::new(data);
337 let msg_type = cur.read_u8()?;
338
339 match msg_type {
340 b'B' => decode_begin(&mut cur),
341 b'C' => decode_commit(&mut cur),
342 b'R' => decode_relation(&mut cur),
343 b'I' => decode_insert(&mut cur),
344 b'U' => decode_update(&mut cur),
345 b'D' => decode_delete(&mut cur),
346 b'T' => decode_truncate(&mut cur),
347 b'O' => decode_origin(&mut cur),
348 b'Y' => decode_type(&mut cur),
349 _ => Err(DecoderError::UnknownMessageType(msg_type)),
350 }
351}
352
353fn decode_begin(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
354 let final_lsn = Lsn::new(cur.read_u64()?);
355 let commit_ts_us = cur.read_i64()?;
356 let xid = cur.read_u32()?;
357 Ok(WalMessage::Begin(BeginMessage {
358 final_lsn,
359 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
360 xid,
361 }))
362}
363
364fn decode_commit(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
365 let flags = cur.read_u8()?;
366 let commit_lsn = Lsn::new(cur.read_u64()?);
367 let end_lsn = Lsn::new(cur.read_u64()?);
368 let commit_ts_us = cur.read_i64()?;
369 Ok(WalMessage::Commit(CommitMessage {
370 flags,
371 commit_lsn,
372 end_lsn,
373 commit_ts_ms: pg_timestamp_to_unix_ms(commit_ts_us),
374 }))
375}
376
377fn decode_relation(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
378 let relation_id = cur.read_u32()?;
379 let namespace = cur.read_cstring()?;
380 let name = cur.read_cstring()?;
381 let replica_identity = cur.read_u8()?;
382 let n_cols_raw = cur.read_i16()?;
383 let n_cols = usize::try_from(n_cols_raw)
384 .map_err(|_| DecoderError::InvalidData(format!("negative column count: {n_cols_raw}")))?;
385
386 let mut columns = Vec::with_capacity(n_cols);
387 for _ in 0..n_cols {
388 let flags = cur.read_u8()?;
389 let col_name = cur.read_cstring()?;
390 let type_oid = cur.read_u32()?;
391 let type_modifier = cur.read_i32()?;
392 columns.push(PgColumn::new(
393 col_name,
394 type_oid,
395 type_modifier,
396 flags & 1 != 0,
397 ));
398 }
399
400 Ok(WalMessage::Relation(RelationMessage {
401 relation_id,
402 namespace,
403 name,
404 replica_identity,
405 columns,
406 }))
407}
408
409fn decode_insert(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
410 let relation_id = cur.read_u32()?;
411 let tag = cur.read_u8()?;
412 if tag != b'N' {
413 return Err(DecoderError::InvalidData(format!(
414 "expected 'N' tag in INSERT, got 0x{tag:02X}"
415 )));
416 }
417 let new_tuple = decode_tuple_data(cur)?;
418 Ok(WalMessage::Insert(InsertMessage {
419 relation_id,
420 new_tuple,
421 }))
422}
423
424fn decode_update(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
425 let relation_id = cur.read_u32()?;
426 let tag = cur.read_u8()?;
427
428 let (old_tuple, new_tuple) = match tag {
429 b'N' => (None, decode_tuple_data(cur)?),
431 b'K' | b'O' => {
433 let old = decode_tuple_data(cur)?;
434 let new_tag = cur.read_u8()?;
435 if new_tag != b'N' {
436 return Err(DecoderError::InvalidData(format!(
437 "expected 'N' tag after old tuple in UPDATE, got 0x{new_tag:02X}"
438 )));
439 }
440 let new = decode_tuple_data(cur)?;
441 (Some(old), new)
442 }
443 _ => {
444 return Err(DecoderError::InvalidData(format!(
445 "unexpected tag in UPDATE: 0x{tag:02X}"
446 )));
447 }
448 };
449
450 Ok(WalMessage::Update(UpdateMessage {
451 relation_id,
452 old_tuple,
453 new_tuple,
454 }))
455}
456
457fn decode_delete(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
458 let relation_id = cur.read_u32()?;
459 let tag = cur.read_u8()?;
460 if tag != b'K' && tag != b'O' {
461 return Err(DecoderError::InvalidData(format!(
462 "expected 'K' or 'O' tag in DELETE, got 0x{tag:02X}"
463 )));
464 }
465 let old_tuple = decode_tuple_data(cur)?;
466 Ok(WalMessage::Delete(DeleteMessage {
467 relation_id,
468 old_tuple,
469 }))
470}
471
472fn decode_truncate(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
473 let n_relations = cur.read_u32()? as usize;
474 let options = cur.read_u8()?;
475 let mut relation_ids = Vec::with_capacity(n_relations);
476 for _ in 0..n_relations {
477 relation_ids.push(cur.read_u32()?);
478 }
479 Ok(WalMessage::Truncate(TruncateMessage {
480 relation_ids,
481 options,
482 }))
483}
484
485fn decode_origin(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
486 let origin_lsn = Lsn::new(cur.read_u64()?);
487 let name = cur.read_cstring()?;
488 Ok(WalMessage::Origin(OriginMessage { origin_lsn, name }))
489}
490
491fn decode_type(cur: &mut Cursor<'_>) -> Result<WalMessage, DecoderError> {
492 let type_id = cur.read_u32()?;
493 let namespace = cur.read_cstring()?;
494 let name = cur.read_cstring()?;
495 Ok(WalMessage::Type(TypeMessage {
496 type_id,
497 namespace,
498 name,
499 }))
500}
501
502fn decode_tuple_data(cur: &mut Cursor<'_>) -> Result<TupleData, DecoderError> {
503 let n_cols_raw = cur.read_i16()?;
504 let n_cols = usize::try_from(n_cols_raw)
505 .map_err(|_| DecoderError::InvalidData(format!("negative column count: {n_cols_raw}")))?;
506 let mut columns = Vec::with_capacity(n_cols);
507
508 for _ in 0..n_cols {
509 let col_type = cur.read_u8()?;
510 match col_type {
511 b'n' => columns.push(ColumnValue::Null),
512 b'u' => columns.push(ColumnValue::Unchanged),
513 b't' => {
514 let len_raw = cur.read_i32()?;
515 let len = usize::try_from(len_raw).map_err(|_| {
516 DecoderError::InvalidData(format!("negative text length: {len_raw}"))
517 })?;
518 let data = cur.read_bytes(len)?;
519 let text = std::str::from_utf8(data)
520 .map_err(|_| DecoderError::InvalidUtf8(cur.pos - len))?;
521 columns.push(ColumnValue::Text(text.to_string()));
522 }
523 _ => {
524 return Err(DecoderError::InvalidData(format!(
525 "unknown column type: 0x{col_type:02X}"
526 )));
527 }
528 }
529 }
530
531 Ok(TupleData { columns })
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537
538 struct MessageBuilder {
542 buf: Vec<u8>,
543 }
544
545 impl MessageBuilder {
546 fn new(msg_type: u8) -> Self {
547 Self {
548 buf: vec![msg_type],
549 }
550 }
551
552 fn u8(mut self, v: u8) -> Self {
553 self.buf.push(v);
554 self
555 }
556
557 fn i16(mut self, v: i16) -> Self {
558 self.buf.extend_from_slice(&v.to_be_bytes());
559 self
560 }
561
562 fn i32(mut self, v: i32) -> Self {
563 self.buf.extend_from_slice(&v.to_be_bytes());
564 self
565 }
566
567 fn u32(mut self, v: u32) -> Self {
568 self.buf.extend_from_slice(&v.to_be_bytes());
569 self
570 }
571
572 fn i64(mut self, v: i64) -> Self {
573 self.buf.extend_from_slice(&v.to_be_bytes());
574 self
575 }
576
577 fn u64(mut self, v: u64) -> Self {
578 self.buf.extend_from_slice(&v.to_be_bytes());
579 self
580 }
581
582 fn cstring(mut self, s: &str) -> Self {
583 self.buf.extend_from_slice(s.as_bytes());
584 self.buf.push(0);
585 self
586 }
587
588 fn text_col(mut self, s: &str) -> Self {
589 self.buf.push(b't');
590 self.buf.extend_from_slice(&(s.len() as i32).to_be_bytes());
591 self.buf.extend_from_slice(s.as_bytes());
592 self
593 }
594
595 fn null_col(mut self) -> Self {
596 self.buf.push(b'n');
597 self
598 }
599
600 fn unchanged_col(mut self) -> Self {
601 self.buf.push(b'u');
602 self
603 }
604
605 fn build(self) -> Vec<u8> {
606 self.buf
607 }
608 }
609
610 #[test]
613 fn test_decode_begin() {
614 let pg_ts_us: i64 = 757_382_400_000_000;
617 let data = MessageBuilder::new(b'B')
618 .u64(0x1234_ABCD) .i64(pg_ts_us) .u32(42) .build();
622
623 let msg = decode_message(&data).unwrap();
624 match msg {
625 WalMessage::Begin(b) => {
626 assert_eq!(b.final_lsn.as_u64(), 0x1234_ABCD);
627 assert_eq!(b.xid, 42);
628 assert_eq!(b.commit_ts_ms, (pg_ts_us + PG_EPOCH_OFFSET_US) / 1000);
629 }
630 _ => panic!("expected Begin"),
631 }
632 }
633
634 #[test]
637 fn test_decode_commit() {
638 let pg_ts_us: i64 = 757_382_400_000_000;
639 let data = MessageBuilder::new(b'C')
640 .u8(0) .u64(0x100) .u64(0x200) .i64(pg_ts_us) .build();
645
646 let msg = decode_message(&data).unwrap();
647 match msg {
648 WalMessage::Commit(c) => {
649 assert_eq!(c.flags, 0);
650 assert_eq!(c.commit_lsn.as_u64(), 0x100);
651 assert_eq!(c.end_lsn.as_u64(), 0x200);
652 }
653 _ => panic!("expected Commit"),
654 }
655 }
656
657 #[test]
660 fn test_decode_relation() {
661 let data = MessageBuilder::new(b'R')
662 .u32(16384) .cstring("public") .cstring("users") .u8(b'd') .i16(2) .u8(1) .cstring("id")
670 .u32(20) .i32(-1) .u8(0) .cstring("name")
675 .u32(25) .i32(-1)
677 .build();
678
679 let msg = decode_message(&data).unwrap();
680 match msg {
681 WalMessage::Relation(r) => {
682 assert_eq!(r.relation_id, 16384);
683 assert_eq!(r.namespace, "public");
684 assert_eq!(r.name, "users");
685 assert_eq!(r.replica_identity, b'd');
686 assert_eq!(r.columns.len(), 2);
687 assert_eq!(r.columns[0].name, "id");
688 assert!(r.columns[0].is_key);
689 assert_eq!(r.columns[0].type_oid, 20);
690 assert_eq!(r.columns[1].name, "name");
691 assert!(!r.columns[1].is_key);
692 }
693 _ => panic!("expected Relation"),
694 }
695 }
696
697 #[test]
700 fn test_decode_insert() {
701 let data = MessageBuilder::new(b'I')
702 .u32(16384) .u8(b'N') .i16(3) .text_col("42") .text_col("Alice") .null_col() .build();
709
710 let msg = decode_message(&data).unwrap();
711 match msg {
712 WalMessage::Insert(ins) => {
713 assert_eq!(ins.relation_id, 16384);
714 assert_eq!(ins.new_tuple.columns.len(), 3);
715 assert_eq!(ins.new_tuple.columns[0].as_text(), Some("42"));
716 assert_eq!(ins.new_tuple.columns[1].as_text(), Some("Alice"));
717 assert!(ins.new_tuple.columns[2].is_null());
718 }
719 _ => panic!("expected Insert"),
720 }
721 }
722
723 #[test]
726 fn test_decode_update_no_old() {
727 let data = MessageBuilder::new(b'U')
728 .u32(16384)
729 .u8(b'N') .i16(2)
731 .text_col("42")
732 .text_col("Bob")
733 .build();
734
735 let msg = decode_message(&data).unwrap();
736 match msg {
737 WalMessage::Update(upd) => {
738 assert!(upd.old_tuple.is_none());
739 assert_eq!(upd.new_tuple.columns[1].as_text(), Some("Bob"));
740 }
741 _ => panic!("expected Update"),
742 }
743 }
744
745 #[test]
748 fn test_decode_update_with_old() {
749 let data = MessageBuilder::new(b'U')
750 .u32(16384)
751 .u8(b'O') .i16(2) .text_col("42")
754 .text_col("Alice")
755 .u8(b'N') .i16(2) .text_col("42")
758 .text_col("Bob")
759 .build();
760
761 let msg = decode_message(&data).unwrap();
762 match msg {
763 WalMessage::Update(upd) => {
764 assert!(upd.old_tuple.is_some());
765 let old = upd.old_tuple.unwrap();
766 assert_eq!(old.columns[1].as_text(), Some("Alice"));
767 assert_eq!(upd.new_tuple.columns[1].as_text(), Some("Bob"));
768 }
769 _ => panic!("expected Update"),
770 }
771 }
772
773 #[test]
776 fn test_decode_delete_key() {
777 let data = MessageBuilder::new(b'D')
778 .u32(16384)
779 .u8(b'K') .i16(1)
781 .text_col("42")
782 .build();
783
784 let msg = decode_message(&data).unwrap();
785 match msg {
786 WalMessage::Delete(del) => {
787 assert_eq!(del.relation_id, 16384);
788 assert_eq!(del.old_tuple.columns[0].as_text(), Some("42"));
789 }
790 _ => panic!("expected Delete"),
791 }
792 }
793
794 #[test]
795 fn test_decode_delete_full() {
796 let data = MessageBuilder::new(b'D')
797 .u32(16384)
798 .u8(b'O') .i16(2)
800 .text_col("42")
801 .text_col("Alice")
802 .build();
803
804 let msg = decode_message(&data).unwrap();
805 match msg {
806 WalMessage::Delete(del) => {
807 assert_eq!(del.old_tuple.columns.len(), 2);
808 }
809 _ => panic!("expected Delete"),
810 }
811 }
812
813 #[test]
816 fn test_decode_truncate() {
817 let data = MessageBuilder::new(b'T')
818 .u32(2) .u8(1) .u32(16384) .u32(16385) .build();
823
824 let msg = decode_message(&data).unwrap();
825 match msg {
826 WalMessage::Truncate(t) => {
827 assert_eq!(t.relation_ids, vec![16384, 16385]);
828 assert_eq!(t.options, 1);
829 }
830 _ => panic!("expected Truncate"),
831 }
832 }
833
834 #[test]
837 fn test_decode_origin() {
838 let data = MessageBuilder::new(b'O')
839 .u64(0xABCD)
840 .cstring("upstream")
841 .build();
842
843 let msg = decode_message(&data).unwrap();
844 match msg {
845 WalMessage::Origin(o) => {
846 assert_eq!(o.origin_lsn.as_u64(), 0xABCD);
847 assert_eq!(o.name, "upstream");
848 }
849 _ => panic!("expected Origin"),
850 }
851 }
852
853 #[test]
856 fn test_decode_type() {
857 let data = MessageBuilder::new(b'Y')
858 .u32(12345)
859 .cstring("public")
860 .cstring("my_enum")
861 .build();
862
863 let msg = decode_message(&data).unwrap();
864 match msg {
865 WalMessage::Type(t) => {
866 assert_eq!(t.type_id, 12345);
867 assert_eq!(t.namespace, "public");
868 assert_eq!(t.name, "my_enum");
869 }
870 _ => panic!("expected Type"),
871 }
872 }
873
874 #[test]
877 fn test_decode_insert_with_unchanged() {
878 let data = MessageBuilder::new(b'I')
879 .u32(16384)
880 .u8(b'N')
881 .i16(2)
882 .text_col("42")
883 .unchanged_col()
884 .build();
885
886 let msg = decode_message(&data).unwrap();
887 match msg {
888 WalMessage::Insert(ins) => {
889 assert_eq!(ins.new_tuple.columns[0].as_text(), Some("42"));
890 assert!(matches!(ins.new_tuple.columns[1], ColumnValue::Unchanged));
891 }
892 _ => panic!("expected Insert"),
893 }
894 }
895
896 #[test]
899 fn test_decode_empty_data() {
900 assert!(decode_message(&[]).is_err());
901 }
902
903 #[test]
904 fn test_decode_unknown_type() {
905 let err = decode_message(&[0xFF]).unwrap_err();
906 assert!(matches!(err, DecoderError::UnknownMessageType(0xFF)));
907 }
908
909 #[test]
910 fn test_decode_truncated_begin() {
911 let data = MessageBuilder::new(b'B').u32(0).build();
913 assert!(decode_message(&data).is_err());
914 }
915
916 #[test]
917 fn test_decode_invalid_insert_tag() {
918 let data = MessageBuilder::new(b'I')
919 .u32(16384)
920 .u8(b'X') .build();
922 assert!(decode_message(&data).is_err());
923 }
924
925 #[test]
928 fn test_pg_timestamp_to_unix_ms() {
929 assert_eq!(pg_timestamp_to_unix_ms(0), 946_684_800_000);
932
933 let pg_us: i64 = 757_382_400_000_000;
936 let expected_unix_ms = (pg_us + PG_EPOCH_OFFSET_US) / 1000;
937 assert_eq!(pg_timestamp_to_unix_ms(pg_us), expected_unix_ms);
938 }
939
940 #[test]
943 fn test_column_value_as_text() {
944 let text = ColumnValue::Text("hello".to_string());
945 assert_eq!(text.as_text(), Some("hello"));
946 assert!(!text.is_null());
947
948 let null = ColumnValue::Null;
949 assert_eq!(null.as_text(), None);
950 assert!(null.is_null());
951
952 let unchanged = ColumnValue::Unchanged;
953 assert_eq!(unchanged.as_text(), None);
954 assert!(!unchanged.is_null());
955 }
956
957 #[test]
960 fn test_decode_update_with_key_identity() {
961 let data = MessageBuilder::new(b'U')
962 .u32(16384)
963 .u8(b'K') .i16(1) .text_col("42")
966 .u8(b'N') .i16(2) .text_col("42")
969 .text_col("Updated")
970 .build();
971
972 let msg = decode_message(&data).unwrap();
973 match msg {
974 WalMessage::Update(upd) => {
975 let old = upd.old_tuple.unwrap();
976 assert_eq!(old.columns.len(), 1);
977 assert_eq!(upd.new_tuple.columns.len(), 2);
978 }
979 _ => panic!("expected Update"),
980 }
981 }
982}