Skip to main content

laminar_connectors/files/
manifest.rs

1//! File ingestion manifest with bounded retention.
2//!
3//! Tracks which files have been ingested across checkpoints. Uses a two-tier
4//! approach: an active `HashMap` for exact deduplication of recent files, and
5//! a Bloom filter ([`xorf::Xor8`]) for probabilistic dedup of evicted entries.
6
7use std::collections::HashMap;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use serde::{Deserialize, Serialize};
11
12use crate::checkpoint::SourceCheckpoint;
13
14/// Metadata for a single ingested file.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct FileEntry {
17    /// File size in bytes at ingestion time.
18    pub size: u64,
19    /// Unix timestamp (ms) when the file was discovered.
20    pub discovered_at: u64,
21    /// Unix timestamp (ms) when the file was ingested.
22    pub ingested_at: u64,
23}
24
25/// Tracks ingested files with bounded retention.
26#[derive(Debug)]
27pub struct FileIngestionManifest {
28    /// Active entries — exact deduplication.
29    active: HashMap<String, FileEntry>,
30    /// Hashes of evicted file paths for Bloom filter rebuild.
31    bloom_hashes: Vec<u64>,
32    /// Compiled Bloom filter for probabilistic dedup of old files.
33    bloom: Option<xorf::Xor8>,
34}
35
36impl FileIngestionManifest {
37    /// Creates an empty manifest.
38    #[must_use]
39    pub fn new() -> Self {
40        Self {
41            active: HashMap::new(),
42            bloom_hashes: Vec::new(),
43            bloom: None,
44        }
45    }
46
47    /// Returns `true` if the path has been ingested (exact or probabilistic match).
48    pub fn contains(&self, path: &str) -> bool {
49        if self.active.contains_key(path) {
50            return true;
51        }
52        if let Some(bloom) = &self.bloom {
53            return xorf::Filter::contains(bloom, &hash_path(path));
54        }
55        // Fallback: linear scan for small bloom_hashes (< 2 entries can't build Xor8).
56        if !self.bloom_hashes.is_empty() {
57            let h = hash_path(path);
58            return self.bloom_hashes.contains(&h);
59        }
60        false
61    }
62
63    /// Checks whether a file was previously ingested with a different size.
64    ///
65    /// Returns `true` if the path is known AND the recorded size differs.
66    pub fn size_changed(&self, path: &str, current_size: u64) -> bool {
67        self.active
68            .get(path)
69            .is_some_and(|entry| entry.size != current_size)
70    }
71
72    /// Inserts a file entry into the active set.
73    pub fn insert(&mut self, path: String, entry: FileEntry) {
74        self.active.insert(path, entry);
75    }
76
77    /// Returns the number of active entries.
78    #[must_use]
79    pub fn active_count(&self) -> usize {
80        self.active.len()
81    }
82
83    /// Returns an iterator over active entries for introspection.
84    pub fn active_entries(&self) -> impl Iterator<Item = (&str, &FileEntry)> {
85        self.active.iter().map(|(k, v)| (k.as_str(), v))
86    }
87
88    /// Creates a read-only snapshot for the discovery engine's dedup checks.
89    ///
90    /// The snapshot contains the same active entries and bloom filter, but
91    /// will not receive future inserts. This is intentional — the source's
92    /// `poll_batch` does a belt-and-suspenders dedup check anyway.
93    #[must_use]
94    pub fn snapshot_for_dedup(&self) -> Self {
95        Self {
96            active: self.active.clone(),
97            bloom_hashes: self.bloom_hashes.clone(),
98            bloom: if self.bloom_hashes.len() >= 2 {
99                Some(xorf::Xor8::from(self.bloom_hashes.as_slice()))
100            } else {
101                None
102            },
103        }
104    }
105
106    /// Evicts entries that exceed the count cap or age limit.
107    ///
108    /// Evicted entries are added to the Bloom filter for probabilistic dedup.
109    pub fn maybe_evict(&mut self, max_count: usize, max_age_ms: u64) {
110        use std::collections::HashSet;
111
112        let now_ms = now_millis();
113
114        // Collect paths to evict (by age) — use HashSet for O(1) lookups.
115        let mut to_evict: HashSet<String> = self
116            .active
117            .iter()
118            .filter(|(_, entry)| now_ms.saturating_sub(entry.ingested_at) > max_age_ms)
119            .map(|(path, _)| path.clone())
120            .collect();
121
122        // If still over count cap, evict oldest by ingested_at.
123        let remaining = self.active.len().saturating_sub(to_evict.len());
124        if remaining > max_count {
125            let mut by_age: Vec<_> = self
126                .active
127                .iter()
128                .filter(|(path, _)| !to_evict.contains(path.as_str()))
129                .map(|(path, entry)| (path.clone(), entry.ingested_at))
130                .collect();
131            by_age.sort_by_key(|(_, ts)| *ts);
132
133            let excess = remaining - max_count;
134            for (path, _) in by_age.into_iter().take(excess) {
135                to_evict.insert(path);
136            }
137        }
138
139        if to_evict.is_empty() {
140            return;
141        }
142
143        // Evict and record hashes.
144        for path in &to_evict {
145            if self.active.remove(path).is_some() {
146                self.bloom_hashes.push(hash_path(path));
147            }
148        }
149
150        // Cap bloom_hashes to prevent unbounded growth. When over 1M entries,
151        // drop the oldest half. This increases FPR for very old files but
152        // those are unlikely to reappear.
153        const MAX_BLOOM_HASHES: usize = 1_000_000;
154        if self.bloom_hashes.len() > MAX_BLOOM_HASHES {
155            let drop_count = self.bloom_hashes.len() / 2;
156            self.bloom_hashes.drain(..drop_count);
157        }
158
159        self.rebuild_bloom();
160    }
161
162    /// Serializes the manifest into `SourceCheckpoint` fields.
163    pub fn to_checkpoint(&self, checkpoint: &mut SourceCheckpoint) {
164        let active_json = serde_json::to_string(&self.active).unwrap_or_else(|_| "{}".to_string());
165        checkpoint.set_offset("manifest", &active_json);
166
167        if !self.bloom_hashes.is_empty() {
168            let hashes_json =
169                serde_json::to_string(&self.bloom_hashes).unwrap_or_else(|_| "[]".to_string());
170            checkpoint.set_offset("manifest_bloom_hashes", &hashes_json);
171        }
172    }
173
174    /// Restores the manifest from a checkpoint.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if deserialization fails.
179    pub fn from_checkpoint(checkpoint: &SourceCheckpoint) -> Result<Self, serde_json::Error> {
180        let active: HashMap<String, FileEntry> = match checkpoint.get_offset("manifest") {
181            Some(json) => serde_json::from_str(json)?,
182            None => HashMap::new(),
183        };
184
185        let bloom_hashes: Vec<u64> = match checkpoint.get_offset("manifest_bloom_hashes") {
186            Some(json) => serde_json::from_str(json)?,
187            None => Vec::new(),
188        };
189
190        let mut manifest = Self {
191            active,
192            bloom_hashes,
193            bloom: None,
194        };
195        manifest.rebuild_bloom();
196        Ok(manifest)
197    }
198
199    /// Rebuilds the Bloom filter from accumulated hashes.
200    fn rebuild_bloom(&mut self) {
201        if self.bloom_hashes.is_empty() {
202            self.bloom = None;
203            return;
204        }
205        // xorf::Xor8 requires at least 2 keys to construct.
206        if self.bloom_hashes.len() < 2 {
207            // With only 1 hash we fall back to linear scan via contains().
208            // This is fine — it only happens transiently during the first eviction.
209            self.bloom = None;
210            return;
211        }
212        self.bloom = Some(xorf::Xor8::from(self.bloom_hashes.as_slice()));
213    }
214}
215
216impl Default for FileIngestionManifest {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222/// Stable hash for a file path (used for Bloom filter keys).
223fn hash_path(path: &str) -> u64 {
224    // Use a simple FNV-1a-style hash for deterministic cross-platform results.
225    let mut h: u64 = 0xcbf2_9ce4_8422_2325;
226    for byte in path.as_bytes() {
227        h ^= u64::from(*byte);
228        h = h.wrapping_mul(0x0100_0000_01b3);
229    }
230    h
231}
232
233#[allow(clippy::cast_possible_truncation)]
234fn now_millis() -> u64 {
235    SystemTime::now()
236        .duration_since(UNIX_EPOCH)
237        .unwrap_or_default()
238        .as_millis() as u64
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    fn make_entry(size: u64, ingested_at: u64) -> FileEntry {
246        FileEntry {
247            size,
248            discovered_at: ingested_at.saturating_sub(100),
249            ingested_at,
250        }
251    }
252
253    #[test]
254    fn test_insert_and_contains() {
255        let mut m = FileIngestionManifest::new();
256        assert!(!m.contains("a.csv"));
257        m.insert("a.csv".into(), make_entry(100, 1000));
258        assert!(m.contains("a.csv"));
259        assert!(!m.contains("b.csv"));
260    }
261
262    #[test]
263    fn test_size_changed() {
264        let mut m = FileIngestionManifest::new();
265        m.insert("a.csv".into(), make_entry(100, 1000));
266        assert!(!m.size_changed("a.csv", 100));
267        assert!(m.size_changed("a.csv", 200));
268        assert!(!m.size_changed("unknown.csv", 100));
269    }
270
271    #[test]
272    fn test_eviction_by_count() {
273        let mut m = FileIngestionManifest::new();
274        for i in 0..20 {
275            m.insert(format!("file_{i}.csv"), make_entry(100, 1000 + i));
276        }
277        assert_eq!(m.active_count(), 20);
278
279        // Evict to max 10, with no age limit.
280        m.maybe_evict(10, u64::MAX);
281        assert_eq!(m.active_count(), 10);
282        assert_eq!(m.bloom_hashes.len(), 10);
283    }
284
285    #[test]
286    fn test_evicted_files_detected_by_bloom() {
287        let mut m = FileIngestionManifest::new();
288        for i in 0..20 {
289            m.insert(format!("file_{i}.csv"), make_entry(100, 1000 + i));
290        }
291        m.maybe_evict(10, u64::MAX);
292
293        // Active files should be found.
294        let active_found = (0..20)
295            .filter(|i| m.contains(&format!("file_{i}.csv")))
296            .count();
297        // All 20 should be found (10 exact + 10 via bloom).
298        assert_eq!(active_found, 20);
299
300        // Unknown file should almost certainly not match (Xor8 FPR ~0.3%).
301        assert!(!m.contains("completely_unknown_file_xyz.csv"));
302    }
303
304    #[test]
305    fn test_checkpoint_roundtrip() {
306        let mut m = FileIngestionManifest::new();
307        m.insert("a.csv".into(), make_entry(100, 1000));
308        m.insert("b.csv".into(), make_entry(200, 2000));
309
310        let mut cp = SourceCheckpoint::new(1);
311        m.to_checkpoint(&mut cp);
312
313        let restored = FileIngestionManifest::from_checkpoint(&cp).unwrap();
314        assert_eq!(restored.active_count(), 2);
315        assert!(restored.contains("a.csv"));
316        assert!(restored.contains("b.csv"));
317    }
318
319    #[test]
320    fn test_checkpoint_roundtrip_with_bloom() {
321        let mut m = FileIngestionManifest::new();
322        for i in 0..20 {
323            m.insert(format!("file_{i}.csv"), make_entry(100, 1000 + i));
324        }
325        m.maybe_evict(5, u64::MAX);
326
327        let mut cp = SourceCheckpoint::new(1);
328        m.to_checkpoint(&mut cp);
329
330        let restored = FileIngestionManifest::from_checkpoint(&cp).unwrap();
331        assert_eq!(restored.active_count(), 5);
332        assert!(!restored.bloom_hashes.is_empty());
333        // Evicted files should still be detected via bloom.
334        let all_found = (0..20)
335            .filter(|i| restored.contains(&format!("file_{i}.csv")))
336            .count();
337        assert_eq!(all_found, 20);
338    }
339
340    #[test]
341    fn test_empty_manifest_checkpoint() {
342        let m = FileIngestionManifest::new();
343        let mut cp = SourceCheckpoint::new(0);
344        m.to_checkpoint(&mut cp);
345
346        let restored = FileIngestionManifest::from_checkpoint(&cp).unwrap();
347        assert_eq!(restored.active_count(), 0);
348    }
349}