Skip to main content

laminar_storage/per_core_wal/
manager.rs

1//! Per-core WAL manager for coordinating multiple core writers.
2
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use laminar_core::storage_io::IoCompletion;
8
9use super::entry::PerCoreWalEntry;
10use super::error::PerCoreWalError;
11use super::reader::PerCoreWalReader;
12use super::writer::CoreWalWriter;
13
14/// Configuration for per-core WAL.
15#[derive(Debug, Clone)]
16pub struct PerCoreWalConfig {
17    /// Base directory for WAL segments.
18    pub base_dir: PathBuf,
19    /// Number of cores (determines number of segments).
20    pub num_cores: usize,
21    /// Segment file name pattern (default: "wal-{core_id}.log").
22    pub segment_pattern: String,
23}
24
25impl PerCoreWalConfig {
26    /// Creates a new per-core WAL configuration.
27    #[must_use]
28    pub fn new(base_dir: &Path, num_cores: usize) -> Self {
29        Self {
30            base_dir: base_dir.to_path_buf(),
31            num_cores,
32            segment_pattern: "wal-{core_id}.log".to_string(),
33        }
34    }
35
36    /// Sets a custom segment file pattern.
37    ///
38    /// The pattern must contain `{core_id}` which will be replaced with the core ID.
39    #[must_use]
40    pub fn with_segment_pattern(mut self, pattern: &str) -> Self {
41        self.segment_pattern = pattern.to_string();
42        self
43    }
44
45    /// Returns the path to a segment file for a given core.
46    #[must_use]
47    pub fn segment_path(&self, core_id: usize) -> PathBuf {
48        let filename = self
49            .segment_pattern
50            .replace("{core_id}", &core_id.to_string());
51        self.base_dir.join(filename)
52    }
53}
54
55/// Per-core WAL manager.
56///
57/// Coordinates all core WAL writers and provides epoch management.
58/// Each core has its own writer, eliminating cross-core synchronization on writes.
59pub struct PerCoreWalManager {
60    /// Configuration.
61    config: PerCoreWalConfig,
62    /// Per-core writers (index = `core_id`).
63    writers: Vec<CoreWalWriter>,
64    /// Global epoch counter (shared, atomic).
65    global_epoch: Arc<AtomicU64>,
66}
67
68impl PerCoreWalManager {
69    /// Creates a new per-core WAL manager.
70    ///
71    /// Creates segment files for all cores if they don't exist.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if directory creation or writer initialization fails.
76    pub fn new(config: PerCoreWalConfig) -> Result<Self, PerCoreWalError> {
77        // Ensure base directory exists
78        std::fs::create_dir_all(&config.base_dir)?;
79
80        // Create writers for each core
81        let mut writers = Vec::with_capacity(config.num_cores);
82        for core_id in 0..config.num_cores {
83            let path = config.segment_path(core_id);
84            let writer = CoreWalWriter::new(core_id, &path)?;
85            writers.push(writer);
86        }
87
88        Ok(Self {
89            config,
90            writers,
91            global_epoch: Arc::new(AtomicU64::new(0)),
92        })
93    }
94
95    /// Opens an existing per-core WAL manager.
96    ///
97    /// All segment files must exist.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if any segment file doesn't exist or can't be opened.
102    pub fn open(config: PerCoreWalConfig) -> Result<Self, PerCoreWalError> {
103        let mut writers = Vec::with_capacity(config.num_cores);
104        let mut max_epoch = 0u64;
105
106        for core_id in 0..config.num_cores {
107            let path = config.segment_path(core_id);
108            if !path.exists() {
109                return Err(PerCoreWalError::SegmentNotFound { core_id, path });
110            }
111
112            // Single pass: read all valid entries, tracking max epoch and
113            // the last valid position. Avoids reading the file twice.
114            let mut reader = PerCoreWalReader::open(core_id, &path)?;
115            let mut valid_end = 0u64;
116            loop {
117                let pos_before = reader.position();
118                match reader.read_next()? {
119                    super::reader::WalReadResult::Entry(entry) => {
120                        max_epoch = max_epoch.max(entry.epoch);
121                        valid_end = reader.position();
122                    }
123                    super::reader::WalReadResult::Eof => break,
124                    super::reader::WalReadResult::TornWrite { .. }
125                    | super::reader::WalReadResult::ChecksumMismatch { .. }
126                    | super::reader::WalReadResult::Corrupted { .. } => {
127                        valid_end = pos_before;
128                        break;
129                    }
130                }
131            }
132
133            let writer = CoreWalWriter::open_at(core_id, &path, valid_end)?;
134            writers.push(writer);
135        }
136
137        Ok(Self {
138            config,
139            writers,
140            global_epoch: Arc::new(AtomicU64::new(max_epoch)),
141        })
142    }
143
144    /// Returns the configuration.
145    #[must_use]
146    pub fn config(&self) -> &PerCoreWalConfig {
147        &self.config
148    }
149
150    /// Returns the number of cores.
151    #[must_use]
152    pub fn num_cores(&self) -> usize {
153        self.config.num_cores
154    }
155
156    /// Returns the current global epoch.
157    #[must_use]
158    pub fn epoch(&self) -> u64 {
159        self.global_epoch.load(Ordering::Acquire)
160    }
161
162    /// Returns a shared reference to the global epoch counter.
163    #[must_use]
164    pub fn epoch_ref(&self) -> Arc<AtomicU64> {
165        Arc::clone(&self.global_epoch)
166    }
167
168    /// Gets the writer for a specific core.
169    ///
170    /// # Panics
171    ///
172    /// Panics if `core_id >= num_cores`.
173    #[must_use]
174    pub fn writer(&mut self, core_id: usize) -> &mut CoreWalWriter {
175        &mut self.writers[core_id]
176    }
177
178    /// Gets the writer for a specific core, checking bounds.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if `core_id >= num_cores`.
183    pub fn writer_checked(
184        &mut self,
185        core_id: usize,
186    ) -> Result<&mut CoreWalWriter, PerCoreWalError> {
187        if core_id >= self.config.num_cores {
188            return Err(PerCoreWalError::InvalidCoreId {
189                core_id,
190                max_core_id: self.config.num_cores - 1,
191            });
192        }
193        Ok(&mut self.writers[core_id])
194    }
195
196    /// Advances the global epoch and returns the new epoch.
197    #[must_use]
198    pub fn advance_epoch(&self) -> u64 {
199        self.global_epoch.fetch_add(1, Ordering::AcqRel) + 1
200    }
201
202    /// Sets the epoch on all writers.
203    pub fn set_epoch_all(&mut self, epoch: u64) {
204        for writer in &mut self.writers {
205            writer.set_epoch(epoch);
206        }
207    }
208
209    /// Syncs all segment files.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if any sync fails.
214    pub fn sync_all(&mut self) -> Result<(), PerCoreWalError> {
215        for writer in &mut self.writers {
216            writer.sync()?;
217        }
218        Ok(())
219    }
220
221    /// Returns the current write positions of all writers (includes un-synced data).
222    ///
223    /// For checkpoint manifests, prefer [`synced_positions()`](Self::synced_positions)
224    /// which only reports durable data.
225    #[must_use]
226    pub fn positions(&self) -> Vec<u64> {
227        self.writers.iter().map(CoreWalWriter::position).collect()
228    }
229
230    /// Returns the last synced (durable) positions of all writers.
231    ///
232    /// Only data up to these positions is guaranteed to survive a crash.
233    /// Use this for checkpoint manifests.
234    #[must_use]
235    pub fn synced_positions(&self) -> Vec<u64> {
236        self.writers
237            .iter()
238            .map(CoreWalWriter::synced_position)
239            .collect()
240    }
241
242    /// Truncates all segments at the specified positions.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if any truncation fails.
247    pub fn truncate_all(&mut self, positions: &[u64]) -> Result<(), PerCoreWalError> {
248        if positions.len() != self.config.num_cores {
249            return Err(PerCoreWalError::InvalidCoreId {
250                core_id: positions.len(),
251                max_core_id: self.config.num_cores - 1,
252            });
253        }
254
255        for (core_id, &position) in positions.iter().enumerate() {
256            self.writers[core_id].truncate(position)?;
257        }
258        Ok(())
259    }
260
261    /// Resets (truncates to zero) all segments.
262    ///
263    /// Used after successful checkpoint.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if any reset fails.
268    pub fn reset_all(&mut self) -> Result<(), PerCoreWalError> {
269        for writer in &mut self.writers {
270            writer.reset()?;
271        }
272        Ok(())
273    }
274
275    /// Merges entries from all segments, sorted by (epoch, timestamp).
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if reading any segment fails.
280    pub fn merge_segments(&self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
281        let mut entries = Vec::new();
282
283        for core_id in 0..self.config.num_cores {
284            let path = self.config.segment_path(core_id);
285            if path.exists() {
286                let mut reader = PerCoreWalReader::open(core_id, &path)?;
287                entries.extend(reader.read_all()?);
288            }
289        }
290
291        // Sort by (epoch, timestamp_ns, core_id, sequence)
292        entries.sort();
293
294        Ok(entries)
295    }
296
297    /// Merges entries from all segments up to a specific epoch.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if reading any segment fails.
302    pub fn merge_segments_up_to_epoch(
303        &self,
304        max_epoch: u64,
305    ) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
306        let mut entries = Vec::new();
307
308        for core_id in 0..self.config.num_cores {
309            let path = self.config.segment_path(core_id);
310            if path.exists() {
311                let mut reader = PerCoreWalReader::open(core_id, &path)?;
312                entries.extend(reader.read_up_to_epoch(max_epoch)?);
313            }
314        }
315
316        // Sort by (epoch, timestamp_ns, core_id, sequence)
317        entries.sort();
318
319        Ok(entries)
320    }
321
322    /// Writes an epoch barrier to all segments.
323    ///
324    /// Used during checkpoint to mark epoch boundaries.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if any write fails.
329    pub fn write_epoch_barrier_all(&mut self) -> Result<(), PerCoreWalError> {
330        for writer in &mut self.writers {
331            writer.append_epoch_barrier()?;
332        }
333        Ok(())
334    }
335
336    /// Check I/O completions against all writers' pending sync tokens.
337    ///
338    /// Call this after polling `StorageIo::poll_completions`. Each writer
339    /// that has a pending sync token matching a completion will update its
340    /// `synced_position`.
341    ///
342    /// Returns the number of writers whose syncs completed successfully.
343    ///
344    /// # Errors
345    ///
346    /// Returns the first `PerCoreWalError` encountered if any writer's
347    /// sync failed.
348    pub fn check_all_completions(
349        &mut self,
350        completions: &[IoCompletion],
351    ) -> Result<usize, PerCoreWalError> {
352        if completions.is_empty() {
353            return Ok(0);
354        }
355        let mut count = 0;
356        let mut first_err: Option<PerCoreWalError> = None;
357        for writer in &mut self.writers {
358            match writer.check_completions(completions) {
359                Ok(true) => count += 1,
360                Ok(false) => {}
361                Err(e) => {
362                    if first_err.is_none() {
363                        first_err = Some(e);
364                    }
365                }
366            }
367        }
368        match first_err {
369            Some(e) => Err(e),
370            None => Ok(count),
371        }
372    }
373
374    /// Returns `true` if any writer has a pending sync.
375    #[must_use]
376    pub fn any_sync_pending(&self) -> bool {
377        self.writers.iter().any(CoreWalWriter::is_sync_pending)
378    }
379
380    /// Returns total size of all segments.
381    #[must_use]
382    pub fn total_size(&self) -> u64 {
383        self.writers.iter().map(CoreWalWriter::position).sum()
384    }
385
386    /// Returns segment path for a core.
387    #[must_use]
388    pub fn segment_path(&self, core_id: usize) -> PathBuf {
389        self.config.segment_path(core_id)
390    }
391}
392
393impl std::fmt::Debug for PerCoreWalManager {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        f.debug_struct("PerCoreWalManager")
396            .field("config", &self.config)
397            .field("num_cores", &self.config.num_cores)
398            .field("epoch", &self.epoch())
399            .field("total_size", &self.total_size())
400            .finish_non_exhaustive()
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use tempfile::TempDir;
408
409    fn setup_manager(num_cores: usize) -> (PerCoreWalManager, TempDir) {
410        let temp_dir = TempDir::new().unwrap();
411        let config = PerCoreWalConfig::new(temp_dir.path(), num_cores);
412        let manager = PerCoreWalManager::new(config).unwrap();
413        (manager, temp_dir)
414    }
415
416    #[test]
417    fn test_manager_creation() {
418        let (manager, _temp_dir) = setup_manager(4);
419        assert_eq!(manager.num_cores(), 4);
420        assert_eq!(manager.epoch(), 0);
421    }
422
423    #[test]
424    fn test_writer_access() {
425        let (mut manager, _temp_dir) = setup_manager(4);
426
427        let writer = manager.writer(0);
428        assert_eq!(writer.core_id(), 0);
429
430        let writer = manager.writer(3);
431        assert_eq!(writer.core_id(), 3);
432    }
433
434    #[test]
435    fn test_writer_checked_invalid() {
436        let (mut manager, _temp_dir) = setup_manager(4);
437
438        let result = manager.writer_checked(5);
439        assert!(matches!(result, Err(PerCoreWalError::InvalidCoreId { .. })));
440    }
441
442    #[test]
443    fn test_advance_epoch() {
444        let (manager, _temp_dir) = setup_manager(4);
445
446        assert_eq!(manager.epoch(), 0);
447        let new_epoch = manager.advance_epoch();
448        assert_eq!(new_epoch, 1);
449        assert_eq!(manager.epoch(), 1);
450
451        let new_epoch = manager.advance_epoch();
452        assert_eq!(new_epoch, 2);
453    }
454
455    #[test]
456    fn test_set_epoch_all() {
457        let (mut manager, _temp_dir) = setup_manager(4);
458
459        manager.set_epoch_all(5);
460
461        for core_id in 0..4 {
462            assert_eq!(manager.writer(core_id).epoch(), 5);
463        }
464    }
465
466    #[test]
467    fn test_parallel_writes() {
468        let (mut manager, _temp_dir) = setup_manager(4);
469        manager.set_epoch_all(1);
470
471        // Write to different cores
472        manager.writer(0).append_put(b"key0", b"value0").unwrap();
473        manager.writer(1).append_put(b"key1", b"value1").unwrap();
474        manager.writer(2).append_put(b"key2", b"value2").unwrap();
475        manager.writer(3).append_put(b"key3", b"value3").unwrap();
476
477        manager.sync_all().unwrap();
478
479        // Verify positions
480        let positions = manager.positions();
481        assert!(positions[0] > 0);
482        assert!(positions[1] > 0);
483        assert!(positions[2] > 0);
484        assert!(positions[3] > 0);
485    }
486
487    #[test]
488    fn test_merge_segments() {
489        let (mut manager, _temp_dir) = setup_manager(2);
490
491        // Epoch 1 writes
492        manager.set_epoch_all(1);
493        manager.writer(0).append_put(b"key0a", b"value0a").unwrap();
494        manager.writer(1).append_put(b"key1a", b"value1a").unwrap();
495
496        // Epoch 2 writes
497        manager.set_epoch_all(2);
498        manager.writer(0).append_put(b"key0b", b"value0b").unwrap();
499        manager.writer(1).append_put(b"key1b", b"value1b").unwrap();
500
501        manager.sync_all().unwrap();
502
503        // Merge and verify ordering
504        let entries = manager.merge_segments().unwrap();
505        assert_eq!(entries.len(), 4);
506
507        // Epoch 1 entries should come first
508        assert_eq!(entries[0].epoch, 1);
509        assert_eq!(entries[1].epoch, 1);
510        // Epoch 2 entries next
511        assert_eq!(entries[2].epoch, 2);
512        assert_eq!(entries[3].epoch, 2);
513    }
514
515    #[test]
516    fn test_merge_segments_up_to_epoch() {
517        let (mut manager, _temp_dir) = setup_manager(2);
518
519        manager.set_epoch_all(1);
520        manager.writer(0).append_put(b"key0a", b"value0a").unwrap();
521
522        manager.set_epoch_all(2);
523        manager.writer(0).append_put(b"key0b", b"value0b").unwrap();
524
525        manager.set_epoch_all(3);
526        manager.writer(0).append_put(b"key0c", b"value0c").unwrap();
527
528        manager.sync_all().unwrap();
529
530        let entries = manager.merge_segments_up_to_epoch(2).unwrap();
531        assert_eq!(entries.len(), 2); // Only epochs 1 and 2
532    }
533
534    #[test]
535    fn test_reset_all() {
536        let (mut manager, _temp_dir) = setup_manager(2);
537
538        manager.writer(0).append_put(b"key0", b"value0").unwrap();
539        manager.writer(1).append_put(b"key1", b"value1").unwrap();
540        manager.sync_all().unwrap();
541
542        assert!(manager.total_size() > 0);
543
544        manager.reset_all().unwrap();
545
546        assert_eq!(manager.total_size(), 0);
547        assert_eq!(manager.positions(), vec![0, 0]);
548    }
549
550    #[test]
551    fn test_truncate_all() {
552        let (mut manager, _temp_dir) = setup_manager(2);
553
554        manager.writer(0).append_put(b"key0", b"value0").unwrap();
555        let pos0 = manager.writer(0).position();
556
557        manager.writer(0).append_put(b"key0b", b"value0b").unwrap();
558        manager.writer(1).append_put(b"key1", b"value1").unwrap();
559
560        manager.sync_all().unwrap();
561
562        manager.truncate_all(&[pos0, 0]).unwrap();
563
564        let positions = manager.positions();
565        assert_eq!(positions[0], pos0);
566        assert_eq!(positions[1], 0);
567    }
568
569    #[test]
570    fn test_write_epoch_barrier_all() {
571        let (mut manager, _temp_dir) = setup_manager(2);
572        manager.set_epoch_all(1);
573
574        manager.write_epoch_barrier_all().unwrap();
575        manager.sync_all().unwrap();
576
577        let entries = manager.merge_segments().unwrap();
578        assert_eq!(entries.len(), 2);
579        for entry in entries {
580            assert!(matches!(
581                entry.operation,
582                super::super::entry::WalOperation::EpochBarrier { .. }
583            ));
584        }
585    }
586
587    #[test]
588    fn test_open_existing() {
589        let temp_dir = TempDir::new().unwrap();
590        let config = PerCoreWalConfig::new(temp_dir.path(), 2);
591
592        // Create and write
593        {
594            let mut manager = PerCoreWalManager::new(config.clone()).unwrap();
595            manager.set_epoch_all(5);
596            manager.writer(0).append_put(b"key0", b"value0").unwrap();
597            manager.writer(1).append_put(b"key1", b"value1").unwrap();
598            manager.sync_all().unwrap();
599        }
600
601        // Reopen
602        let manager = PerCoreWalManager::open(config).unwrap();
603        assert_eq!(manager.epoch(), 5);
604
605        let entries = manager.merge_segments().unwrap();
606        assert_eq!(entries.len(), 2);
607    }
608
609    #[test]
610    fn test_segment_path() {
611        let config = PerCoreWalConfig::new(Path::new("/data/wal"), 4);
612        assert_eq!(config.segment_path(0), PathBuf::from("/data/wal/wal-0.log"));
613        assert_eq!(config.segment_path(3), PathBuf::from("/data/wal/wal-3.log"));
614
615        let custom = config.with_segment_pattern("segment-{core_id}.wal");
616        assert_eq!(
617            custom.segment_path(1),
618            PathBuf::from("/data/wal/segment-1.wal")
619        );
620    }
621
622    #[test]
623    fn test_debug_format() {
624        let (manager, _temp_dir) = setup_manager(4);
625        let debug_str = format!("{manager:?}");
626        assert!(debug_str.contains("PerCoreWalManager"));
627        assert!(debug_str.contains("num_cores"));
628    }
629}