laminar_connectors/files/
mod.rs1use std::sync::Arc;
26
27use crate::config::ConnectorInfo;
28use crate::registry::ConnectorRegistry;
29
30pub mod config;
31pub mod discovery;
32pub mod manifest;
33pub mod sink;
34pub mod source;
35pub mod text_decoder;
36
37pub use config::{FileFormat, FileSinkConfig, FileSourceConfig, SinkMode};
38pub use manifest::{FileEntry, FileIngestionManifest};
39pub use sink::FileSink;
40pub use source::FileSource;
41pub use text_decoder::TextLineDecoder;
42
43pub fn register_file_source(registry: &ConnectorRegistry) {
49 let info = ConnectorInfo {
50 name: "files".to_string(),
51 display_name: "File Source (AutoLoader)".to_string(),
52 version: env!("CARGO_PKG_VERSION").to_string(),
53 is_source: true,
54 is_sink: false,
55 config_keys: vec![],
56 };
57 registry.register_source("files", info, Arc::new(|| Box::new(FileSource::new())));
58}
59
60pub fn register_file_sink(registry: &ConnectorRegistry) {
64 let info = ConnectorInfo {
65 name: "files".to_string(),
66 display_name: "File Sink".to_string(),
67 version: env!("CARGO_PKG_VERSION").to_string(),
68 is_source: false,
69 is_sink: true,
70 config_keys: vec![],
71 };
72 registry.register_sink("files", info, Arc::new(|| Box::new(FileSink::new())));
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78
79 #[test]
80 fn test_register_file_source() {
81 let registry = ConnectorRegistry::new();
82 register_file_source(®istry);
83
84 let sources = registry.list_sources();
85 assert!(sources.contains(&"files".to_string()));
86
87 let info = registry.source_info("files").unwrap();
88 assert_eq!(info.name, "files");
89 assert!(info.is_source);
90 assert!(!info.is_sink);
91 }
92
93 #[test]
94 fn test_register_file_sink() {
95 let registry = ConnectorRegistry::new();
96 register_file_sink(®istry);
97
98 let sinks = registry.list_sinks();
99 assert!(sinks.contains(&"files".to_string()));
100
101 let info = registry.sink_info("files").unwrap();
102 assert_eq!(info.name, "files");
103 assert!(!info.is_source);
104 assert!(info.is_sink);
105 }
106
107 #[test]
108 fn test_create_source_from_registry() {
109 let registry = ConnectorRegistry::new();
110 register_file_source(®istry);
111
112 let config = crate::config::ConnectorConfig::new("files");
113 let source = registry.create_source(&config);
114 assert!(source.is_ok());
115 }
116
117 #[test]
118 fn test_create_sink_from_registry() {
119 let registry = ConnectorRegistry::new();
120 register_file_sink(®istry);
121
122 let config = crate::config::ConnectorConfig::new("files");
123 let sink = registry.create_sink(&config);
124 assert!(sink.is_ok());
125 }
126}