Skip to main content

laminar_connectors/files/
mod.rs

1//! File source and sink connectors.
2
3use std::sync::Arc;
4
5use crate::config::ConnectorInfo;
6use crate::registry::ConnectorRegistry;
7
8pub mod arrow_ipc_codec;
9pub mod config;
10pub mod discovery;
11pub mod manifest;
12pub mod sink;
13pub mod source;
14pub mod text_decoder;
15
16pub use config::{FileFormat, FileSinkConfig, FileSourceConfig, SinkMode};
17pub use manifest::{FileEntry, FileIngestionManifest};
18pub use sink::FileSink;
19pub use source::FileSource;
20pub use text_decoder::TextLineDecoder;
21
22/// Registers the file source connector in the registry.
23///
24/// This is called by `LaminarDB::register_builtin_connectors()` when the
25/// `files` feature is enabled, and makes `connector = 'files'` available
26/// in `CREATE SOURCE` statements.
27pub fn register_file_source(registry: &ConnectorRegistry) {
28    use crate::config::ConfigKeySpec;
29    let info = ConnectorInfo {
30        name: "files".to_string(),
31        display_name: "File Source (AutoLoader)".to_string(),
32        version: env!("CARGO_PKG_VERSION").to_string(),
33        is_source: true,
34        is_sink: false,
35        config_keys: vec![
36            ConfigKeySpec::required("path", "Directory path, glob pattern, or cloud storage URL"),
37            ConfigKeySpec::optional(
38                "format",
39                "Data format (csv, tsv, json, jsonl, text, txt, parquet, arrow)",
40                "auto-detect",
41            ),
42            ConfigKeySpec::optional(
43                "glob_pattern",
44                "Optional glob pattern to filter files by name",
45                "*",
46            ),
47        ],
48    };
49    registry.register_source(
50        "files",
51        info,
52        Arc::new(|registry: Option<&prometheus::Registry>| {
53            Box::new(FileSource::with_registry(registry))
54        }),
55    );
56}
57
58/// Registers the file sink connector in the registry.
59///
60/// Makes `connector = 'files'` available in `CREATE SINK` statements.
61pub fn register_file_sink(registry: &ConnectorRegistry) {
62    use crate::config::ConfigKeySpec;
63    let info = ConnectorInfo {
64        name: "files".to_string(),
65        display_name: "File Sink".to_string(),
66        version: env!("CARGO_PKG_VERSION").to_string(),
67        is_source: false,
68        is_sink: true,
69        config_keys: vec![
70            ConfigKeySpec::required("path", "Output directory path"),
71            ConfigKeySpec::required("format", "Output format (csv, json, text, parquet, arrow)"),
72            ConfigKeySpec::optional("mode", "Write mode (append, rolling)", "rolling"),
73            ConfigKeySpec::optional("prefix", "File name prefix for rolling mode", "part"),
74        ],
75    };
76    registry.register_sink(
77        "files",
78        info,
79        Arc::new(|registry: Option<&prometheus::Registry>| {
80            Box::new(FileSink::with_registry(registry))
81        }),
82    );
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88
89    #[test]
90    fn test_register_file_source() {
91        let registry = ConnectorRegistry::new();
92        register_file_source(&registry);
93
94        let sources = registry.list_sources();
95        assert!(sources.contains(&"files".to_string()));
96
97        let info = registry.source_info("files").unwrap();
98        assert_eq!(info.name, "files");
99        assert!(info.is_source);
100        assert!(!info.is_sink);
101    }
102
103    #[test]
104    fn test_register_file_sink() {
105        let registry = ConnectorRegistry::new();
106        register_file_sink(&registry);
107
108        let sinks = registry.list_sinks();
109        assert!(sinks.contains(&"files".to_string()));
110
111        let info = registry.sink_info("files").unwrap();
112        assert_eq!(info.name, "files");
113        assert!(!info.is_source);
114        assert!(info.is_sink);
115    }
116
117    #[test]
118    fn test_create_source_from_registry() {
119        let registry = ConnectorRegistry::new();
120        register_file_source(&registry);
121
122        let config = crate::config::ConnectorConfig::new("files");
123        let source = registry.create_source(&config, None);
124        assert!(source.is_ok());
125    }
126
127    #[test]
128    fn test_create_sink_from_registry() {
129        let registry = ConnectorRegistry::new();
130        register_file_sink(&registry);
131
132        let config = crate::config::ConnectorConfig::new("files");
133        let sink = registry.create_sink(&config, None);
134        assert!(sink.is_ok());
135    }
136}