Skip to main content

laminar_connectors/files/
mod.rs

1//! AutoLoader-style file source and sink connectors.
2//!
3//! Watches local directories or cloud storage for new files, infers and
4//! evolves schemas automatically, and checkpoints exactly which files have
5//! been processed.
6//!
7//! # Formats
8//!
9//! - CSV (via [`CsvDecoder`](crate::schema::CsvDecoder))
10//! - JSON Lines (via [`JsonDecoder`](crate::schema::JsonDecoder))
11//! - Plain text (via [`TextLineDecoder`](crate::files::text_decoder::TextLineDecoder))
12//! - Apache Parquet (via [`ParquetDecoder`](crate::schema::parquet::ParquetDecoder))
13//!
14//! # Example DDL
15//!
16//! ```sql
17//! CREATE SOURCE logs (ts BIGINT, level VARCHAR, message VARCHAR)
18//! WITH (
19//!     connector = 'files',
20//!     path = '/data/logs/*.csv',
21//!     format = 'csv'
22//! );
23//! ```
24
25use std::sync::Arc;
26
27use crate::config::ConnectorInfo;
28use crate::registry::ConnectorRegistry;
29
30pub mod config;
31pub mod discovery;
32pub mod manifest;
33pub mod sink;
34pub mod source;
35pub mod text_decoder;
36
37pub use config::{FileFormat, FileSinkConfig, FileSourceConfig, SinkMode};
38pub use manifest::{FileEntry, FileIngestionManifest};
39pub use sink::FileSink;
40pub use source::FileSource;
41pub use text_decoder::TextLineDecoder;
42
43/// Registers the file source connector in the registry.
44///
45/// This is called by `LaminarDB::register_builtin_connectors()` when the
46/// `files` feature is enabled, and makes `connector = 'files'` available
47/// in `CREATE SOURCE` statements.
48pub fn register_file_source(registry: &ConnectorRegistry) {
49    let info = ConnectorInfo {
50        name: "files".to_string(),
51        display_name: "File Source (AutoLoader)".to_string(),
52        version: env!("CARGO_PKG_VERSION").to_string(),
53        is_source: true,
54        is_sink: false,
55        config_keys: vec![],
56    };
57    registry.register_source("files", info, Arc::new(|| Box::new(FileSource::new())));
58}
59
60/// Registers the file sink connector in the registry.
61///
62/// Makes `connector = 'files'` available in `CREATE SINK` statements.
63pub fn register_file_sink(registry: &ConnectorRegistry) {
64    let info = ConnectorInfo {
65        name: "files".to_string(),
66        display_name: "File Sink".to_string(),
67        version: env!("CARGO_PKG_VERSION").to_string(),
68        is_source: false,
69        is_sink: true,
70        config_keys: vec![],
71    };
72    registry.register_sink("files", info, Arc::new(|| Box::new(FileSink::new())));
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn test_register_file_source() {
81        let registry = ConnectorRegistry::new();
82        register_file_source(&registry);
83
84        let sources = registry.list_sources();
85        assert!(sources.contains(&"files".to_string()));
86
87        let info = registry.source_info("files").unwrap();
88        assert_eq!(info.name, "files");
89        assert!(info.is_source);
90        assert!(!info.is_sink);
91    }
92
93    #[test]
94    fn test_register_file_sink() {
95        let registry = ConnectorRegistry::new();
96        register_file_sink(&registry);
97
98        let sinks = registry.list_sinks();
99        assert!(sinks.contains(&"files".to_string()));
100
101        let info = registry.sink_info("files").unwrap();
102        assert_eq!(info.name, "files");
103        assert!(!info.is_source);
104        assert!(info.is_sink);
105    }
106
107    #[test]
108    fn test_create_source_from_registry() {
109        let registry = ConnectorRegistry::new();
110        register_file_source(&registry);
111
112        let config = crate::config::ConnectorConfig::new("files");
113        let source = registry.create_source(&config);
114        assert!(source.is_ok());
115    }
116
117    #[test]
118    fn test_create_sink_from_registry() {
119        let registry = ConnectorRegistry::new();
120        register_file_sink(&registry);
121
122        let config = crate::config::ConnectorConfig::new("files");
123        let sink = registry.create_sink(&config);
124        assert!(sink.is_ok());
125    }
126}