laminar_connectors/files/
mod.rs1use std::sync::Arc;
4
5use crate::config::ConnectorInfo;
6use crate::registry::ConnectorRegistry;
7
8pub mod arrow_ipc_codec;
9pub mod config;
10pub mod discovery;
11pub mod manifest;
12pub mod sink;
13pub mod source;
14pub mod text_decoder;
15
16pub use config::{FileFormat, FileSinkConfig, FileSourceConfig, SinkMode};
17pub use manifest::{FileEntry, FileIngestionManifest};
18pub use sink::FileSink;
19pub use source::FileSource;
20pub use text_decoder::TextLineDecoder;
21
22pub fn register_file_source(registry: &ConnectorRegistry) {
28 let info = ConnectorInfo {
29 name: "files".to_string(),
30 display_name: "File Source (AutoLoader)".to_string(),
31 version: env!("CARGO_PKG_VERSION").to_string(),
32 is_source: true,
33 is_sink: false,
34 config_keys: vec![],
35 };
36 registry.register_source(
37 "files",
38 info,
39 Arc::new(|registry: Option<&prometheus::Registry>| {
40 Box::new(FileSource::with_registry(registry))
41 }),
42 );
43}
44
45pub fn register_file_sink(registry: &ConnectorRegistry) {
49 let info = ConnectorInfo {
50 name: "files".to_string(),
51 display_name: "File Sink".to_string(),
52 version: env!("CARGO_PKG_VERSION").to_string(),
53 is_source: false,
54 is_sink: true,
55 config_keys: vec![],
56 };
57 registry.register_sink(
58 "files",
59 info,
60 Arc::new(|registry: Option<&prometheus::Registry>| {
61 Box::new(FileSink::with_registry(registry))
62 }),
63 );
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69
70 #[test]
71 fn test_register_file_source() {
72 let registry = ConnectorRegistry::new();
73 register_file_source(®istry);
74
75 let sources = registry.list_sources();
76 assert!(sources.contains(&"files".to_string()));
77
78 let info = registry.source_info("files").unwrap();
79 assert_eq!(info.name, "files");
80 assert!(info.is_source);
81 assert!(!info.is_sink);
82 }
83
84 #[test]
85 fn test_register_file_sink() {
86 let registry = ConnectorRegistry::new();
87 register_file_sink(®istry);
88
89 let sinks = registry.list_sinks();
90 assert!(sinks.contains(&"files".to_string()));
91
92 let info = registry.sink_info("files").unwrap();
93 assert_eq!(info.name, "files");
94 assert!(!info.is_source);
95 assert!(info.is_sink);
96 }
97
98 #[test]
99 fn test_create_source_from_registry() {
100 let registry = ConnectorRegistry::new();
101 register_file_source(®istry);
102
103 let config = crate::config::ConnectorConfig::new("files");
104 let source = registry.create_source(&config, None);
105 assert!(source.is_ok());
106 }
107
108 #[test]
109 fn test_create_sink_from_registry() {
110 let registry = ConnectorRegistry::new();
111 register_file_sink(®istry);
112
113 let config = crate::config::ConnectorConfig::new("files");
114 let sink = registry.create_sink(&config, None);
115 assert!(sink.is_ok());
116 }
117}