laminar_connectors/files/
mod.rs1use std::sync::Arc;
4
5use crate::config::ConnectorInfo;
6use crate::registry::ConnectorRegistry;
7
8pub mod arrow_ipc_codec;
9pub mod config;
10pub mod discovery;
11pub mod manifest;
12pub mod sink;
13pub mod source;
14pub mod text_decoder;
15
16pub use config::{FileFormat, FileSinkConfig, FileSourceConfig, SinkMode};
17pub use manifest::{FileEntry, FileIngestionManifest};
18pub use sink::FileSink;
19pub use source::FileSource;
20pub use text_decoder::TextLineDecoder;
21
22pub fn register_file_source(registry: &ConnectorRegistry) {
28 use crate::config::ConfigKeySpec;
29 let info = ConnectorInfo {
30 name: "files".to_string(),
31 display_name: "File Source (AutoLoader)".to_string(),
32 version: env!("CARGO_PKG_VERSION").to_string(),
33 is_source: true,
34 is_sink: false,
35 config_keys: vec![
36 ConfigKeySpec::required("path", "Directory path, glob pattern, or cloud storage URL"),
37 ConfigKeySpec::optional(
38 "format",
39 "Data format (csv, tsv, json, jsonl, text, txt, parquet, arrow)",
40 "auto-detect",
41 ),
42 ConfigKeySpec::optional(
43 "glob_pattern",
44 "Optional glob pattern to filter files by name",
45 "*",
46 ),
47 ],
48 };
49 registry.register_source(
50 "files",
51 info,
52 Arc::new(|registry: Option<&prometheus::Registry>| {
53 Box::new(FileSource::with_registry(registry))
54 }),
55 );
56}
57
58pub fn register_file_sink(registry: &ConnectorRegistry) {
62 use crate::config::ConfigKeySpec;
63 let info = ConnectorInfo {
64 name: "files".to_string(),
65 display_name: "File Sink".to_string(),
66 version: env!("CARGO_PKG_VERSION").to_string(),
67 is_source: false,
68 is_sink: true,
69 config_keys: vec![
70 ConfigKeySpec::required("path", "Output directory path"),
71 ConfigKeySpec::required("format", "Output format (csv, json, text, parquet, arrow)"),
72 ConfigKeySpec::optional("mode", "Write mode (append, rolling)", "rolling"),
73 ConfigKeySpec::optional("prefix", "File name prefix for rolling mode", "part"),
74 ],
75 };
76 registry.register_sink(
77 "files",
78 info,
79 Arc::new(|registry: Option<&prometheus::Registry>| {
80 Box::new(FileSink::with_registry(registry))
81 }),
82 );
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88
89 #[test]
90 fn test_register_file_source() {
91 let registry = ConnectorRegistry::new();
92 register_file_source(®istry);
93
94 let sources = registry.list_sources();
95 assert!(sources.contains(&"files".to_string()));
96
97 let info = registry.source_info("files").unwrap();
98 assert_eq!(info.name, "files");
99 assert!(info.is_source);
100 assert!(!info.is_sink);
101 }
102
103 #[test]
104 fn test_register_file_sink() {
105 let registry = ConnectorRegistry::new();
106 register_file_sink(®istry);
107
108 let sinks = registry.list_sinks();
109 assert!(sinks.contains(&"files".to_string()));
110
111 let info = registry.sink_info("files").unwrap();
112 assert_eq!(info.name, "files");
113 assert!(!info.is_source);
114 assert!(info.is_sink);
115 }
116
117 #[test]
118 fn test_create_source_from_registry() {
119 let registry = ConnectorRegistry::new();
120 register_file_source(®istry);
121
122 let config = crate::config::ConnectorConfig::new("files");
123 let source = registry.create_source(&config, None);
124 assert!(source.is_ok());
125 }
126
127 #[test]
128 fn test_create_sink_from_registry() {
129 let registry = ConnectorRegistry::new();
130 register_file_sink(®istry);
131
132 let config = crate::config::ConnectorConfig::new("files");
133 let sink = registry.create_sink(&config, None);
134 assert!(sink.is_ok());
135 }
136}