laminar_core/cluster/control/
catalog_manifest.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use object_store::path::Path as OsPath;
19use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
20use serde::{Deserialize, Serialize};
21
22const MANIFEST_PATH: &str = "catalog/manifest.json";
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub struct CatalogManifestEntry {
27 pub name: String,
29 pub ddl: String,
31}
32
33#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
35pub struct CatalogManifest {
36 pub version: u64,
39 pub updated_at_ms: i64,
41 pub entries: Vec<CatalogManifestEntry>,
43}
44
45pub struct CatalogManifestStore {
47 store: Arc<dyn ObjectStore>,
48}
49
50impl std::fmt::Debug for CatalogManifestStore {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("CatalogManifestStore")
53 .finish_non_exhaustive()
54 }
55}
56
57#[derive(Debug, thiserror::Error)]
59pub enum CatalogManifestError {
60 #[error("object store I/O: {0}")]
62 Io(String),
63 #[error("JSON: {0}")]
65 Json(#[from] serde_json::Error),
66}
67
68impl CatalogManifestStore {
69 #[must_use]
71 pub fn new(store: Arc<dyn ObjectStore>) -> Self {
72 Self { store }
73 }
74
75 fn path() -> OsPath {
76 OsPath::from(MANIFEST_PATH)
77 }
78
79 pub async fn load(&self) -> Result<Option<CatalogManifest>, CatalogManifestError> {
84 match self.store.get(&Self::path()).await {
85 Ok(res) => {
86 let bytes = res
87 .bytes()
88 .await
89 .map_err(|e| CatalogManifestError::Io(e.to_string()))?;
90 Ok(Some(serde_json::from_slice(&bytes)?))
91 }
92 Err(object_store::Error::NotFound { .. }) => Ok(None),
93 Err(e) => Err(CatalogManifestError::Io(e.to_string())),
94 }
95 }
96
97 pub async fn save(&self, manifest: &CatalogManifest) -> Result<(), CatalogManifestError> {
103 let bytes = serde_json::to_vec_pretty(manifest)?;
104 self.store
105 .put(&Self::path(), PutPayload::from(Bytes::from(bytes)))
106 .await
107 .map_err(|e| CatalogManifestError::Io(e.to_string()))?;
108 Ok(())
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use object_store::local::LocalFileSystem;
116 use tempfile::tempdir;
117
118 fn store_in(dir: &std::path::Path) -> CatalogManifestStore {
119 let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
120 CatalogManifestStore::new(fs)
121 }
122
123 fn entry(name: &str, ddl: &str) -> CatalogManifestEntry {
124 CatalogManifestEntry {
125 name: name.to_string(),
126 ddl: ddl.to_string(),
127 }
128 }
129
130 #[tokio::test]
131 async fn load_missing_returns_none() {
132 let dir = tempdir().unwrap();
133 let s = store_in(dir.path());
134 assert!(s.load().await.unwrap().is_none());
135 }
136
137 #[tokio::test]
138 async fn save_then_load_preserves_order() {
139 let dir = tempdir().unwrap();
140 let s = store_in(dir.path());
141 let manifest = CatalogManifest {
142 version: 3,
143 updated_at_ms: 123,
144 entries: vec![
145 entry("src", "CREATE SOURCE src (k BIGINT)"),
146 entry("mv1", "CREATE MATERIALIZED VIEW mv1 AS SELECT k FROM src"),
147 entry("mv2", "CREATE MATERIALIZED VIEW mv2 AS SELECT k FROM mv1"),
148 ],
149 };
150 s.save(&manifest).await.unwrap();
151 let loaded = s.load().await.unwrap().unwrap();
152 assert_eq!(loaded, manifest);
153 assert_eq!(loaded.entries[0].name, "src");
155 assert_eq!(loaded.entries[2].name, "mv2");
156 }
157
158 #[tokio::test]
159 async fn save_overwrites_in_place() {
160 let dir = tempdir().unwrap();
161 let s = store_in(dir.path());
162 s.save(&CatalogManifest {
163 version: 1,
164 updated_at_ms: 1,
165 entries: vec![entry("a", "CREATE SOURCE a (k BIGINT)")],
166 })
167 .await
168 .unwrap();
169 let v2 = CatalogManifest {
170 version: 2,
171 updated_at_ms: 2,
172 entries: vec![
173 entry("a", "CREATE SOURCE a (k BIGINT)"),
174 entry("b", "CREATE SOURCE b (k BIGINT)"),
175 ],
176 };
177 s.save(&v2).await.unwrap();
178 assert_eq!(s.load().await.unwrap().unwrap(), v2);
179 }
180}