Skip to main content

laminar_connectors/files/
mod.rs

1//! File source and sink connectors.
2
3use std::sync::Arc;
4
5use crate::config::ConnectorInfo;
6use crate::registry::ConnectorRegistry;
7
8pub mod arrow_ipc_codec;
9pub mod config;
10pub mod discovery;
11pub mod manifest;
12pub mod sink;
13pub mod source;
14pub mod text_decoder;
15
16pub use config::{FileFormat, FileSinkConfig, FileSourceConfig, SinkMode};
17pub use manifest::{FileEntry, FileIngestionManifest};
18pub use sink::FileSink;
19pub use source::FileSource;
20pub use text_decoder::TextLineDecoder;
21
22/// Registers the file source connector in the registry.
23///
24/// This is called by `LaminarDB::register_builtin_connectors()` when the
25/// `files` feature is enabled, and makes `connector = 'files'` available
26/// in `CREATE SOURCE` statements.
27pub fn register_file_source(registry: &ConnectorRegistry) {
28    let info = ConnectorInfo {
29        name: "files".to_string(),
30        display_name: "File Source (AutoLoader)".to_string(),
31        version: env!("CARGO_PKG_VERSION").to_string(),
32        is_source: true,
33        is_sink: false,
34        config_keys: vec![],
35    };
36    registry.register_source(
37        "files",
38        info,
39        Arc::new(|registry: Option<&prometheus::Registry>| {
40            Box::new(FileSource::with_registry(registry))
41        }),
42    );
43}
44
45/// Registers the file sink connector in the registry.
46///
47/// Makes `connector = 'files'` available in `CREATE SINK` statements.
48pub fn register_file_sink(registry: &ConnectorRegistry) {
49    let info = ConnectorInfo {
50        name: "files".to_string(),
51        display_name: "File Sink".to_string(),
52        version: env!("CARGO_PKG_VERSION").to_string(),
53        is_source: false,
54        is_sink: true,
55        config_keys: vec![],
56    };
57    registry.register_sink(
58        "files",
59        info,
60        Arc::new(|registry: Option<&prometheus::Registry>| {
61            Box::new(FileSink::with_registry(registry))
62        }),
63    );
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    #[test]
71    fn test_register_file_source() {
72        let registry = ConnectorRegistry::new();
73        register_file_source(&registry);
74
75        let sources = registry.list_sources();
76        assert!(sources.contains(&"files".to_string()));
77
78        let info = registry.source_info("files").unwrap();
79        assert_eq!(info.name, "files");
80        assert!(info.is_source);
81        assert!(!info.is_sink);
82    }
83
84    #[test]
85    fn test_register_file_sink() {
86        let registry = ConnectorRegistry::new();
87        register_file_sink(&registry);
88
89        let sinks = registry.list_sinks();
90        assert!(sinks.contains(&"files".to_string()));
91
92        let info = registry.sink_info("files").unwrap();
93        assert_eq!(info.name, "files");
94        assert!(!info.is_source);
95        assert!(info.is_sink);
96    }
97
98    #[test]
99    fn test_create_source_from_registry() {
100        let registry = ConnectorRegistry::new();
101        register_file_source(&registry);
102
103        let config = crate::config::ConnectorConfig::new("files");
104        let source = registry.create_source(&config, None);
105        assert!(source.is_ok());
106    }
107
108    #[test]
109    fn test_create_sink_from_registry() {
110        let registry = ConnectorRegistry::new();
111        register_file_sink(&registry);
112
113        let config = crate::config::ConnectorConfig::new("files");
114        let sink = registry.create_sink(&config, None);
115        assert!(sink.is_ok());
116    }
117}