1use crate::config::{ConfigKeySpec, ConnectorConfig};
6use crate::error::ConnectorError;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum OtelSignal {
14 Traces,
16 Metrics,
18 Logs,
20}
21
22impl OtelSignal {
23 pub fn parse(s: &str) -> Result<Self, ConnectorError> {
29 match s.to_lowercase().as_str() {
30 "traces" | "trace" => Ok(Self::Traces),
31 "metrics" | "metric" => Ok(Self::Metrics),
32 "logs" | "log" => Ok(Self::Logs),
33 other => Err(ConnectorError::ConfigurationError(format!(
34 "unknown OTel signal type '{other}': expected traces, metrics, or logs \
35 (create separate sources for each signal type)"
36 ))),
37 }
38 }
39}
40
41#[derive(Debug, Clone)]
43pub struct OtelSourceConfig {
44 pub bind_address: String,
46 pub port: u16,
48 pub signals: OtelSignal,
50 pub batch_size: usize,
52 pub channel_capacity: usize,
54}
55
56impl Default for OtelSourceConfig {
57 fn default() -> Self {
58 Self {
59 bind_address: "0.0.0.0".to_string(),
60 port: 4317,
61 signals: OtelSignal::Traces,
62 batch_size: 1024,
63 channel_capacity: 64,
64 }
65 }
66}
67
68impl OtelSourceConfig {
69 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
75 let mut cfg = Self::default();
76
77 if let Some(addr) = config.get("bind.address").or(config.get("bind_address")) {
78 cfg.bind_address = addr.to_string();
79 }
80
81 if let Some(port_str) = config.get("port") {
82 cfg.port = port_str.parse::<u16>().map_err(|e| {
83 ConnectorError::ConfigurationError(format!("invalid port '{port_str}': {e}"))
84 })?;
85 }
86
87 if let Some(sig) = config.get("signals").or(config.get("signal")) {
88 cfg.signals = OtelSignal::parse(sig)?;
89 }
90
91 if let Some(bs) = config.get("batch_size").or(config.get("batch.size")) {
92 cfg.batch_size = bs.parse::<usize>().map_err(|e| {
93 ConnectorError::ConfigurationError(format!("invalid batch_size '{bs}': {e}"))
94 })?;
95 if cfg.batch_size == 0 {
96 return Err(ConnectorError::ConfigurationError(
97 "batch_size must be > 0".into(),
98 ));
99 }
100 }
101
102 if let Some(cc) = config
103 .get("channel_capacity")
104 .or(config.get("channel.capacity"))
105 {
106 cfg.channel_capacity = cc.parse::<usize>().map_err(|e| {
107 ConnectorError::ConfigurationError(format!("invalid channel_capacity '{cc}': {e}"))
108 })?;
109 if cfg.channel_capacity == 0 {
110 return Err(ConnectorError::ConfigurationError(
111 "channel_capacity must be > 0".into(),
112 ));
113 }
114 }
115
116 Ok(cfg)
117 }
118
119 #[must_use]
121 pub fn socket_addr(&self) -> String {
122 format!("{}:{}", self.bind_address, self.port)
123 }
124}
125
126#[must_use]
128pub fn otel_source_config_keys() -> Vec<ConfigKeySpec> {
129 vec![
130 ConfigKeySpec::optional("port", "gRPC listen port", "4317"),
131 ConfigKeySpec::optional("bind.address", "Listen address", "0.0.0.0"),
132 ConfigKeySpec::optional("signals", "Signal type: traces, metrics, or logs", "traces"),
133 ConfigKeySpec::optional("batch_size", "Max rows per RecordBatch", "1024"),
134 ConfigKeySpec::optional("channel_capacity", "Batch channel capacity", "64"),
135 ]
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn test_default_config() {
144 let cfg = OtelSourceConfig::default();
145 assert_eq!(cfg.port, 4317);
146 assert_eq!(cfg.bind_address, "0.0.0.0");
147 assert_eq!(cfg.signals, OtelSignal::Traces);
148 assert_eq!(cfg.batch_size, 1024);
149 assert_eq!(cfg.channel_capacity, 64);
150 }
151
152 #[test]
153 fn test_from_config_all_fields() {
154 let config = ConnectorConfig::with_properties(
155 "otel",
156 [
157 ("port".to_string(), "4318".to_string()),
158 ("bind.address".to_string(), "127.0.0.1".to_string()),
159 ("signals".to_string(), "metrics".to_string()),
160 ("batch_size".to_string(), "2048".to_string()),
161 ("channel_capacity".to_string(), "128".to_string()),
162 ]
163 .into_iter()
164 .collect(),
165 );
166 let cfg = OtelSourceConfig::from_config(&config).unwrap();
167 assert_eq!(cfg.port, 4318);
168 assert_eq!(cfg.bind_address, "127.0.0.1");
169 assert_eq!(cfg.signals, OtelSignal::Metrics);
170 assert_eq!(cfg.batch_size, 2048);
171 assert_eq!(cfg.channel_capacity, 128);
172 }
173
174 #[test]
175 fn test_from_config_defaults() {
176 let config = ConnectorConfig::new("otel");
177 let cfg = OtelSourceConfig::from_config(&config).unwrap();
178 assert_eq!(cfg.port, 4317);
179 assert_eq!(cfg.signals, OtelSignal::Traces);
180 }
181
182 #[test]
183 fn test_invalid_port() {
184 let config = ConnectorConfig::with_properties(
185 "otel",
186 [("port".to_string(), "not_a_number".to_string())]
187 .into_iter()
188 .collect(),
189 );
190 assert!(OtelSourceConfig::from_config(&config).is_err());
191 }
192
193 #[test]
194 fn test_zero_batch_size() {
195 let config = ConnectorConfig::with_properties(
196 "otel",
197 [("batch_size".to_string(), "0".to_string())]
198 .into_iter()
199 .collect(),
200 );
201 assert!(OtelSourceConfig::from_config(&config).is_err());
202 }
203
204 #[test]
205 fn test_invalid_signal() {
206 let config = ConnectorConfig::with_properties(
207 "otel",
208 [("signals".to_string(), "invalid".to_string())]
209 .into_iter()
210 .collect(),
211 );
212 assert!(OtelSourceConfig::from_config(&config).is_err());
213 }
214
215 #[test]
216 fn test_signal_parse() {
217 assert_eq!(OtelSignal::parse("traces").unwrap(), OtelSignal::Traces);
218 assert_eq!(OtelSignal::parse("TRACE").unwrap(), OtelSignal::Traces);
219 assert_eq!(OtelSignal::parse("metrics").unwrap(), OtelSignal::Metrics);
220 assert_eq!(OtelSignal::parse("logs").unwrap(), OtelSignal::Logs);
221 assert!(OtelSignal::parse("bad").is_err());
222 }
223
224 #[test]
225 fn test_all_signal_rejected() {
226 let err = OtelSignal::parse("all").unwrap_err();
227 assert!(err.to_string().contains("separate sources"));
228 }
229
230 #[test]
231 fn test_socket_addr() {
232 let cfg = OtelSourceConfig::default();
233 assert_eq!(cfg.socket_addr(), "0.0.0.0:4317");
234 }
235}