laminar_connectors/files/
manifest.rs1use std::collections::HashMap;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use serde::{Deserialize, Serialize};
11
12use crate::checkpoint::SourceCheckpoint;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct FileEntry {
17 pub size: u64,
19 pub discovered_at: u64,
21 pub ingested_at: u64,
23}
24
25#[derive(Debug)]
27pub struct FileIngestionManifest {
28 active: HashMap<String, FileEntry>,
30 bloom_hashes: Vec<u64>,
32 bloom: Option<xorf::Xor8>,
34}
35
36impl FileIngestionManifest {
37 #[must_use]
39 pub fn new() -> Self {
40 Self {
41 active: HashMap::new(),
42 bloom_hashes: Vec::new(),
43 bloom: None,
44 }
45 }
46
47 pub fn contains(&self, path: &str) -> bool {
49 if self.active.contains_key(path) {
50 return true;
51 }
52 if let Some(bloom) = &self.bloom {
53 return xorf::Filter::contains(bloom, &hash_path(path));
54 }
55 if !self.bloom_hashes.is_empty() {
57 let h = hash_path(path);
58 return self.bloom_hashes.contains(&h);
59 }
60 false
61 }
62
63 pub fn size_changed(&self, path: &str, current_size: u64) -> bool {
67 self.active
68 .get(path)
69 .is_some_and(|entry| entry.size != current_size)
70 }
71
72 pub fn insert(&mut self, path: String, entry: FileEntry) {
74 self.active.insert(path, entry);
75 }
76
77 #[must_use]
79 pub fn active_count(&self) -> usize {
80 self.active.len()
81 }
82
83 pub fn active_entries(&self) -> impl Iterator<Item = (&str, &FileEntry)> {
85 self.active.iter().map(|(k, v)| (k.as_str(), v))
86 }
87
88 #[must_use]
94 pub fn snapshot_for_dedup(&self) -> Self {
95 Self {
96 active: self.active.clone(),
97 bloom_hashes: self.bloom_hashes.clone(),
98 bloom: if self.bloom_hashes.len() >= 2 {
99 Some(xorf::Xor8::from(self.bloom_hashes.as_slice()))
100 } else {
101 None
102 },
103 }
104 }
105
106 pub fn maybe_evict(&mut self, max_count: usize, max_age_ms: u64) {
110 use std::collections::HashSet;
111
112 let now_ms = now_millis();
113
114 let mut to_evict: HashSet<String> = self
116 .active
117 .iter()
118 .filter(|(_, entry)| now_ms.saturating_sub(entry.ingested_at) > max_age_ms)
119 .map(|(path, _)| path.clone())
120 .collect();
121
122 let remaining = self.active.len().saturating_sub(to_evict.len());
124 if remaining > max_count {
125 let mut by_age: Vec<_> = self
126 .active
127 .iter()
128 .filter(|(path, _)| !to_evict.contains(path.as_str()))
129 .map(|(path, entry)| (path.clone(), entry.ingested_at))
130 .collect();
131 by_age.sort_by_key(|(_, ts)| *ts);
132
133 let excess = remaining - max_count;
134 for (path, _) in by_age.into_iter().take(excess) {
135 to_evict.insert(path);
136 }
137 }
138
139 if to_evict.is_empty() {
140 return;
141 }
142
143 for path in &to_evict {
145 if self.active.remove(path).is_some() {
146 self.bloom_hashes.push(hash_path(path));
147 }
148 }
149
150 const MAX_BLOOM_HASHES: usize = 1_000_000;
154 if self.bloom_hashes.len() > MAX_BLOOM_HASHES {
155 let drop_count = self.bloom_hashes.len() / 2;
156 self.bloom_hashes.drain(..drop_count);
157 }
158
159 self.rebuild_bloom();
160 }
161
162 pub fn to_checkpoint(&self, checkpoint: &mut SourceCheckpoint) {
164 let active_json = serde_json::to_string(&self.active).unwrap_or_else(|_| "{}".to_string());
165 checkpoint.set_offset("manifest", &active_json);
166
167 if !self.bloom_hashes.is_empty() {
168 let hashes_json =
169 serde_json::to_string(&self.bloom_hashes).unwrap_or_else(|_| "[]".to_string());
170 checkpoint.set_offset("manifest_bloom_hashes", &hashes_json);
171 }
172 }
173
174 pub fn from_checkpoint(checkpoint: &SourceCheckpoint) -> Result<Self, serde_json::Error> {
180 let active: HashMap<String, FileEntry> = match checkpoint.get_offset("manifest") {
181 Some(json) => serde_json::from_str(json)?,
182 None => HashMap::new(),
183 };
184
185 let bloom_hashes: Vec<u64> = match checkpoint.get_offset("manifest_bloom_hashes") {
186 Some(json) => serde_json::from_str(json)?,
187 None => Vec::new(),
188 };
189
190 let mut manifest = Self {
191 active,
192 bloom_hashes,
193 bloom: None,
194 };
195 manifest.rebuild_bloom();
196 Ok(manifest)
197 }
198
199 fn rebuild_bloom(&mut self) {
201 if self.bloom_hashes.is_empty() {
202 self.bloom = None;
203 return;
204 }
205 if self.bloom_hashes.len() < 2 {
207 self.bloom = None;
210 return;
211 }
212 self.bloom = Some(xorf::Xor8::from(self.bloom_hashes.as_slice()));
213 }
214}
215
216impl Default for FileIngestionManifest {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222fn hash_path(path: &str) -> u64 {
224 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
226 for byte in path.as_bytes() {
227 h ^= u64::from(*byte);
228 h = h.wrapping_mul(0x0100_0000_01b3);
229 }
230 h
231}
232
233#[allow(clippy::cast_possible_truncation)]
234fn now_millis() -> u64 {
235 SystemTime::now()
236 .duration_since(UNIX_EPOCH)
237 .unwrap_or_default()
238 .as_millis() as u64
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 fn make_entry(size: u64, ingested_at: u64) -> FileEntry {
246 FileEntry {
247 size,
248 discovered_at: ingested_at.saturating_sub(100),
249 ingested_at,
250 }
251 }
252
253 #[test]
254 fn test_insert_and_contains() {
255 let mut m = FileIngestionManifest::new();
256 assert!(!m.contains("a.csv"));
257 m.insert("a.csv".into(), make_entry(100, 1000));
258 assert!(m.contains("a.csv"));
259 assert!(!m.contains("b.csv"));
260 }
261
262 #[test]
263 fn test_size_changed() {
264 let mut m = FileIngestionManifest::new();
265 m.insert("a.csv".into(), make_entry(100, 1000));
266 assert!(!m.size_changed("a.csv", 100));
267 assert!(m.size_changed("a.csv", 200));
268 assert!(!m.size_changed("unknown.csv", 100));
269 }
270
271 #[test]
272 fn test_eviction_by_count() {
273 let mut m = FileIngestionManifest::new();
274 for i in 0..20 {
275 m.insert(format!("file_{i}.csv"), make_entry(100, 1000 + i));
276 }
277 assert_eq!(m.active_count(), 20);
278
279 m.maybe_evict(10, u64::MAX);
281 assert_eq!(m.active_count(), 10);
282 assert_eq!(m.bloom_hashes.len(), 10);
283 }
284
285 #[test]
286 fn test_evicted_files_detected_by_bloom() {
287 let mut m = FileIngestionManifest::new();
288 for i in 0..20 {
289 m.insert(format!("file_{i}.csv"), make_entry(100, 1000 + i));
290 }
291 m.maybe_evict(10, u64::MAX);
292
293 let active_found = (0..20)
295 .filter(|i| m.contains(&format!("file_{i}.csv")))
296 .count();
297 assert_eq!(active_found, 20);
299
300 assert!(!m.contains("completely_unknown_file_xyz.csv"));
302 }
303
304 #[test]
305 fn test_checkpoint_roundtrip() {
306 let mut m = FileIngestionManifest::new();
307 m.insert("a.csv".into(), make_entry(100, 1000));
308 m.insert("b.csv".into(), make_entry(200, 2000));
309
310 let mut cp = SourceCheckpoint::new(1);
311 m.to_checkpoint(&mut cp);
312
313 let restored = FileIngestionManifest::from_checkpoint(&cp).unwrap();
314 assert_eq!(restored.active_count(), 2);
315 assert!(restored.contains("a.csv"));
316 assert!(restored.contains("b.csv"));
317 }
318
319 #[test]
320 fn test_checkpoint_roundtrip_with_bloom() {
321 let mut m = FileIngestionManifest::new();
322 for i in 0..20 {
323 m.insert(format!("file_{i}.csv"), make_entry(100, 1000 + i));
324 }
325 m.maybe_evict(5, u64::MAX);
326
327 let mut cp = SourceCheckpoint::new(1);
328 m.to_checkpoint(&mut cp);
329
330 let restored = FileIngestionManifest::from_checkpoint(&cp).unwrap();
331 assert_eq!(restored.active_count(), 5);
332 assert!(!restored.bloom_hashes.is_empty());
333 let all_found = (0..20)
335 .filter(|i| restored.contains(&format!("file_{i}.csv")))
336 .count();
337 assert_eq!(all_found, 20);
338 }
339
340 #[test]
341 fn test_empty_manifest_checkpoint() {
342 let m = FileIngestionManifest::new();
343 let mut cp = SourceCheckpoint::new(0);
344 m.to_checkpoint(&mut cp);
345
346 let restored = FileIngestionManifest::from_checkpoint(&cp).unwrap();
347 assert_eq!(restored.active_count(), 0);
348 }
349}