Skip to main content

laminar_core/cluster/control/
catalog_manifest.rs

1//! Cluster-wide catalog manifest: the ordered DDL needed to rebuild a
2//! node's logical catalog (sources, tables, streams, materialized views,
3//! sinks).
4//!
5//! Unlike [`AssignmentSnapshot`](super::AssignmentSnapshot) (one immutable
6//! object per version, CAS-rotated), the catalog manifest is a single
7//! mutable object at `catalog/manifest.json` that the cluster overwrites as
8//! DDL changes. A node joining or restarting reads it and replays any DDL it
9//! lacks locally so its operator graph exists before the pipeline starts —
10//! the data sharded across vnodes then flows into the rebuilt views.
11//!
12//! Entries are stored in creation order so a dependent view (one selecting
13//! from another) is replayed after its dependency.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use object_store::path::Path as OsPath;
19use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
20use serde::{Deserialize, Serialize};
21
22const MANIFEST_PATH: &str = "catalog/manifest.json";
23
24/// One catalog object's defining DDL.
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub struct CatalogManifestEntry {
27    /// Object name (source/sink/stream/view/table identifier).
28    pub name: String,
29    /// The exact DDL text that created it, replayed verbatim on restore.
30    pub ddl: String,
31}
32
33/// The full ordered set of DDL defining a cluster's catalog.
34#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
35pub struct CatalogManifest {
36    /// Monotonic version, bumped on each save. Diagnostic only — the
37    /// manifest is overwritten in place, not CAS-rotated.
38    pub version: u64,
39    /// Wall-clock of the last save, millis since epoch.
40    pub updated_at_ms: i64,
41    /// DDL entries in creation (dependency-safe) order.
42    pub entries: Vec<CatalogManifestEntry>,
43}
44
45/// I/O wrapper for the [`CatalogManifest`] on an object store.
46pub struct CatalogManifestStore {
47    store: Arc<dyn ObjectStore>,
48}
49
50impl std::fmt::Debug for CatalogManifestStore {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("CatalogManifestStore")
53            .finish_non_exhaustive()
54    }
55}
56
57/// Errors loading or saving a [`CatalogManifest`].
58#[derive(Debug, thiserror::Error)]
59pub enum CatalogManifestError {
60    /// Underlying object store I/O failure.
61    #[error("object store I/O: {0}")]
62    Io(String),
63    /// JSON de/serialization failure.
64    #[error("JSON: {0}")]
65    Json(#[from] serde_json::Error),
66}
67
68impl CatalogManifestStore {
69    /// Wrap a pre-constructed object store.
70    #[must_use]
71    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
72        Self { store }
73    }
74
75    fn path() -> OsPath {
76        OsPath::from(MANIFEST_PATH)
77    }
78
79    /// Load the current catalog manifest; `Ok(None)` on a fresh cluster.
80    ///
81    /// # Errors
82    /// Object-store I/O or JSON decode failure.
83    pub async fn load(&self) -> Result<Option<CatalogManifest>, CatalogManifestError> {
84        match self.store.get(&Self::path()).await {
85            Ok(res) => {
86                let bytes = res
87                    .bytes()
88                    .await
89                    .map_err(|e| CatalogManifestError::Io(e.to_string()))?;
90                Ok(Some(serde_json::from_slice(&bytes)?))
91            }
92            Err(object_store::Error::NotFound { .. }) => Ok(None),
93            Err(e) => Err(CatalogManifestError::Io(e.to_string())),
94        }
95    }
96
97    /// Overwrite the catalog manifest. Last writer wins — the leader (or any
98    /// node executing catalog DDL) publishes the latest full catalog.
99    ///
100    /// # Errors
101    /// Object-store I/O or JSON encode failure.
102    pub async fn save(&self, manifest: &CatalogManifest) -> Result<(), CatalogManifestError> {
103        let bytes = serde_json::to_vec_pretty(manifest)?;
104        self.store
105            .put(&Self::path(), PutPayload::from(Bytes::from(bytes)))
106            .await
107            .map_err(|e| CatalogManifestError::Io(e.to_string()))?;
108        Ok(())
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use object_store::local::LocalFileSystem;
116    use tempfile::tempdir;
117
118    fn store_in(dir: &std::path::Path) -> CatalogManifestStore {
119        let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
120        CatalogManifestStore::new(fs)
121    }
122
123    fn entry(name: &str, ddl: &str) -> CatalogManifestEntry {
124        CatalogManifestEntry {
125            name: name.to_string(),
126            ddl: ddl.to_string(),
127        }
128    }
129
130    #[tokio::test]
131    async fn load_missing_returns_none() {
132        let dir = tempdir().unwrap();
133        let s = store_in(dir.path());
134        assert!(s.load().await.unwrap().is_none());
135    }
136
137    #[tokio::test]
138    async fn save_then_load_preserves_order() {
139        let dir = tempdir().unwrap();
140        let s = store_in(dir.path());
141        let manifest = CatalogManifest {
142            version: 3,
143            updated_at_ms: 123,
144            entries: vec![
145                entry("src", "CREATE SOURCE src (k BIGINT)"),
146                entry("mv1", "CREATE MATERIALIZED VIEW mv1 AS SELECT k FROM src"),
147                entry("mv2", "CREATE MATERIALIZED VIEW mv2 AS SELECT k FROM mv1"),
148            ],
149        };
150        s.save(&manifest).await.unwrap();
151        let loaded = s.load().await.unwrap().unwrap();
152        assert_eq!(loaded, manifest);
153        // Order is dependency-safe: src before mv1 before mv2.
154        assert_eq!(loaded.entries[0].name, "src");
155        assert_eq!(loaded.entries[2].name, "mv2");
156    }
157
158    #[tokio::test]
159    async fn save_overwrites_in_place() {
160        let dir = tempdir().unwrap();
161        let s = store_in(dir.path());
162        s.save(&CatalogManifest {
163            version: 1,
164            updated_at_ms: 1,
165            entries: vec![entry("a", "CREATE SOURCE a (k BIGINT)")],
166        })
167        .await
168        .unwrap();
169        let v2 = CatalogManifest {
170            version: 2,
171            updated_at_ms: 2,
172            entries: vec![
173                entry("a", "CREATE SOURCE a (k BIGINT)"),
174                entry("b", "CREATE SOURCE b (k BIGINT)"),
175            ],
176        };
177        s.save(&v2).await.unwrap();
178        assert_eq!(s.load().await.unwrap().unwrap(), v2);
179    }
180}