Skip to main content

laminar_connectors/otel/
config.rs

1//! OTel source connector configuration.
2//!
3//! Parses and validates configuration for the OTLP/gRPC receiver.
4
5use crate::config::{ConfigKeySpec, ConnectorConfig};
6use crate::error::ConnectorError;
7
8/// Which OTel signal type to receive.
9///
10/// Each source handles exactly one signal type. Create separate sources
11/// for different signals (each on its own port).
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum OtelSignal {
14    /// Trace spans.
15    Traces,
16    /// Metric data points.
17    Metrics,
18    /// Log records.
19    Logs,
20}
21
22impl OtelSignal {
23    /// Parse from a string value (case-insensitive).
24    ///
25    /// # Errors
26    ///
27    /// Returns `ConnectorError::ConfigurationError` for unknown signal types.
28    pub fn parse(s: &str) -> Result<Self, ConnectorError> {
29        match s.to_lowercase().as_str() {
30            "traces" | "trace" => Ok(Self::Traces),
31            "metrics" | "metric" => Ok(Self::Metrics),
32            "logs" | "log" => Ok(Self::Logs),
33            other => Err(ConnectorError::ConfigurationError(format!(
34                "unknown OTel signal type '{other}': expected traces, metrics, or logs \
35                 (create separate sources for each signal type)"
36            ))),
37        }
38    }
39}
40
41/// OTel source connector configuration.
42#[derive(Debug, Clone)]
43pub struct OtelSourceConfig {
44    /// gRPC bind address.
45    pub bind_address: String,
46    /// gRPC listen port.
47    pub port: u16,
48    /// Which signal type to receive (one per source).
49    pub signals: OtelSignal,
50    /// Max rows per batch.
51    pub batch_size: usize,
52    /// Bounded channel capacity.
53    pub channel_capacity: usize,
54}
55
56impl Default for OtelSourceConfig {
57    fn default() -> Self {
58        Self {
59            bind_address: "0.0.0.0".to_string(),
60            port: 4317,
61            signals: OtelSignal::Traces,
62            batch_size: 1024,
63            channel_capacity: 64,
64        }
65    }
66}
67
68impl OtelSourceConfig {
69    /// Parse configuration from a `ConnectorConfig`.
70    ///
71    /// # Errors
72    ///
73    /// Returns `ConnectorError::ConfigurationError` if any value is invalid.
74    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
75        let mut cfg = Self::default();
76
77        if let Some(addr) = config.get("bind.address").or(config.get("bind_address")) {
78            cfg.bind_address = addr.to_string();
79        }
80
81        if let Some(port_str) = config.get("port") {
82            cfg.port = port_str.parse::<u16>().map_err(|e| {
83                ConnectorError::ConfigurationError(format!("invalid port '{port_str}': {e}"))
84            })?;
85        }
86
87        if let Some(sig) = config.get("signals").or(config.get("signal")) {
88            cfg.signals = OtelSignal::parse(sig)?;
89        }
90
91        if let Some(bs) = config.get("batch_size").or(config.get("batch.size")) {
92            cfg.batch_size = bs.parse::<usize>().map_err(|e| {
93                ConnectorError::ConfigurationError(format!("invalid batch_size '{bs}': {e}"))
94            })?;
95            if cfg.batch_size == 0 {
96                return Err(ConnectorError::ConfigurationError(
97                    "batch_size must be > 0".into(),
98                ));
99            }
100        }
101
102        if let Some(cc) = config
103            .get("channel_capacity")
104            .or(config.get("channel.capacity"))
105        {
106            cfg.channel_capacity = cc.parse::<usize>().map_err(|e| {
107                ConnectorError::ConfigurationError(format!("invalid channel_capacity '{cc}': {e}"))
108            })?;
109            if cfg.channel_capacity == 0 {
110                return Err(ConnectorError::ConfigurationError(
111                    "channel_capacity must be > 0".into(),
112                ));
113            }
114        }
115
116        Ok(cfg)
117    }
118
119    /// Full socket address for binding.
120    #[must_use]
121    pub fn socket_addr(&self) -> String {
122        format!("{}:{}", self.bind_address, self.port)
123    }
124}
125
126/// Configuration keys accepted by the OTel source connector.
127#[must_use]
128pub fn otel_source_config_keys() -> Vec<ConfigKeySpec> {
129    vec![
130        ConfigKeySpec::optional("port", "gRPC listen port", "4317"),
131        ConfigKeySpec::optional("bind.address", "Listen address", "0.0.0.0"),
132        ConfigKeySpec::optional("signals", "Signal type: traces, metrics, or logs", "traces"),
133        ConfigKeySpec::optional("batch_size", "Max rows per RecordBatch", "1024"),
134        ConfigKeySpec::optional("channel_capacity", "Batch channel capacity", "64"),
135    ]
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn test_default_config() {
144        let cfg = OtelSourceConfig::default();
145        assert_eq!(cfg.port, 4317);
146        assert_eq!(cfg.bind_address, "0.0.0.0");
147        assert_eq!(cfg.signals, OtelSignal::Traces);
148        assert_eq!(cfg.batch_size, 1024);
149        assert_eq!(cfg.channel_capacity, 64);
150    }
151
152    #[test]
153    fn test_from_config_all_fields() {
154        let config = ConnectorConfig::with_properties(
155            "otel",
156            [
157                ("port".to_string(), "4318".to_string()),
158                ("bind.address".to_string(), "127.0.0.1".to_string()),
159                ("signals".to_string(), "metrics".to_string()),
160                ("batch_size".to_string(), "2048".to_string()),
161                ("channel_capacity".to_string(), "128".to_string()),
162            ]
163            .into_iter()
164            .collect(),
165        );
166        let cfg = OtelSourceConfig::from_config(&config).unwrap();
167        assert_eq!(cfg.port, 4318);
168        assert_eq!(cfg.bind_address, "127.0.0.1");
169        assert_eq!(cfg.signals, OtelSignal::Metrics);
170        assert_eq!(cfg.batch_size, 2048);
171        assert_eq!(cfg.channel_capacity, 128);
172    }
173
174    #[test]
175    fn test_from_config_defaults() {
176        let config = ConnectorConfig::new("otel");
177        let cfg = OtelSourceConfig::from_config(&config).unwrap();
178        assert_eq!(cfg.port, 4317);
179        assert_eq!(cfg.signals, OtelSignal::Traces);
180    }
181
182    #[test]
183    fn test_invalid_port() {
184        let config = ConnectorConfig::with_properties(
185            "otel",
186            [("port".to_string(), "not_a_number".to_string())]
187                .into_iter()
188                .collect(),
189        );
190        assert!(OtelSourceConfig::from_config(&config).is_err());
191    }
192
193    #[test]
194    fn test_zero_batch_size() {
195        let config = ConnectorConfig::with_properties(
196            "otel",
197            [("batch_size".to_string(), "0".to_string())]
198                .into_iter()
199                .collect(),
200        );
201        assert!(OtelSourceConfig::from_config(&config).is_err());
202    }
203
204    #[test]
205    fn test_invalid_signal() {
206        let config = ConnectorConfig::with_properties(
207            "otel",
208            [("signals".to_string(), "invalid".to_string())]
209                .into_iter()
210                .collect(),
211        );
212        assert!(OtelSourceConfig::from_config(&config).is_err());
213    }
214
215    #[test]
216    fn test_signal_parse() {
217        assert_eq!(OtelSignal::parse("traces").unwrap(), OtelSignal::Traces);
218        assert_eq!(OtelSignal::parse("TRACE").unwrap(), OtelSignal::Traces);
219        assert_eq!(OtelSignal::parse("metrics").unwrap(), OtelSignal::Metrics);
220        assert_eq!(OtelSignal::parse("logs").unwrap(), OtelSignal::Logs);
221        assert!(OtelSignal::parse("bad").is_err());
222    }
223
224    #[test]
225    fn test_all_signal_rejected() {
226        let err = OtelSignal::parse("all").unwrap_err();
227        assert!(err.to_string().contains("separate sources"));
228    }
229
230    #[test]
231    fn test_socket_addr() {
232        let cfg = OtelSourceConfig::default();
233        assert_eq!(cfg.socket_addr(), "0.0.0.0:4317");
234    }
235}