Skip to main content

laminar_connectors/otel/
source.rs

1//! `SourceConnector` implementation for the OTel OTLP/gRPC receiver.
2//!
3//! Spawns a tonic gRPC server that accepts OTLP export RPCs for traces,
4//! metrics, or logs (one signal type per source), converts to Arrow
5//! `RecordBatch`, and delivers them via `poll_batch()`.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use async_trait::async_trait;
14use crossfire::{mpsc, AsyncRx, TryRecvError};
15use tokio::net::TcpListener;
16use tokio::sync::{watch, Notify};
17use tokio::task::JoinHandle;
18use tonic::transport::server::TcpIncoming;
19
20use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
21use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
22use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
23
24use crate::checkpoint::SourceCheckpoint;
25use crate::config::{ConnectorConfig, ConnectorState};
26use crate::connector::{SourceBatch, SourceConnector};
27use crate::error::ConnectorError;
28use crate::health::HealthStatus;
29use crate::metrics::ConnectorMetrics;
30
31use super::config::{OtelSignal, OtelSourceConfig};
32use super::schema::{logs_schema, metrics_schema, traces_schema};
33use super::server::OtelReceiver;
34
35/// OTel OTLP/gRPC source connector.
36///
37/// Binds a gRPC server on the configured port and receives telemetry
38/// data from `OpenTelemetry` exporters and collectors. Each source
39/// handles exactly one signal type (traces, metrics, or logs).
40pub struct OtelSource {
41    config: OtelSourceConfig,
42    schema: SchemaRef,
43    state: ConnectorState,
44    batch_rx: Option<AsyncRx<mpsc::Array<RecordBatch>>>,
45    data_ready: Arc<Notify>,
46    server_handle: Option<JoinHandle<()>>,
47    shutdown_tx: Option<watch::Sender<bool>>,
48    /// Monotonic counter of records received (spans, data points, or log records).
49    records_received: Arc<AtomicU64>,
50    requests_received: Arc<AtomicU64>,
51    checkpoint_seq: u64,
52}
53
54impl OtelSource {
55    /// Create a new OTel source with the given default schema.
56    #[must_use]
57    pub fn new(schema: SchemaRef, _registry: Option<&prometheus::Registry>) -> Self {
58        Self {
59            config: OtelSourceConfig::default(),
60            schema,
61            state: ConnectorState::Created,
62            batch_rx: None,
63            data_ready: Arc::new(Notify::new()),
64            server_handle: None,
65            shutdown_tx: None,
66            records_received: Arc::new(AtomicU64::new(0)),
67            requests_received: Arc::new(AtomicU64::new(0)),
68            checkpoint_seq: 0,
69        }
70    }
71}
72
73#[async_trait]
74impl SourceConnector for OtelSource {
75    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
76        self.state = ConnectorState::Initializing;
77        self.config = OtelSourceConfig::from_config(config)?;
78
79        self.schema = match self.config.signals {
80            OtelSignal::Traces => traces_schema(),
81            OtelSignal::Metrics => metrics_schema(),
82            OtelSignal::Logs => logs_schema(),
83        };
84
85        let (batch_tx, batch_rx) = mpsc::bounded_async::<RecordBatch>(self.config.channel_capacity);
86        self.batch_rx = Some(batch_rx);
87
88        let addr = self.config.socket_addr();
89
90        // Bind the TCP listener here so port conflicts fail open(),
91        // not silently inside the background task.
92        let listener = TcpListener::bind(&addr)
93            .await
94            .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to bind {addr}: {e}")))?;
95        let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
96
97        let send_timeout = Duration::from_secs(5);
98        let (shutdown_tx, shutdown_rx) = watch::channel(false);
99        self.shutdown_tx = Some(shutdown_tx);
100
101        let receiver = OtelReceiver::new(
102            batch_tx,
103            Arc::clone(&self.schema),
104            Arc::clone(&self.data_ready),
105            Arc::clone(&self.records_received),
106            Arc::clone(&self.requests_received),
107            send_timeout,
108            self.config.batch_size,
109        );
110
111        // Spawn signal-specific gRPC server on the already-bound listener
112        let server_handle = match self.config.signals {
113            OtelSignal::Traces => {
114                spawn_grpc_server(TraceServiceServer::new(receiver), incoming, shutdown_rx)
115            }
116            OtelSignal::Metrics => {
117                spawn_grpc_server(MetricsServiceServer::new(receiver), incoming, shutdown_rx)
118            }
119            OtelSignal::Logs => {
120                spawn_grpc_server(LogsServiceServer::new(receiver), incoming, shutdown_rx)
121            }
122        };
123
124        self.server_handle = Some(server_handle);
125        self.state = ConnectorState::Running;
126
127        tracing::info!(
128            %addr,
129            signals = ?self.config.signals,
130            batch_size = self.config.batch_size,
131            "OTel source connector started"
132        );
133
134        Ok(())
135    }
136
137    async fn poll_batch(
138        &mut self,
139        max_records: usize,
140    ) -> Result<Option<SourceBatch>, ConnectorError> {
141        let rx = self.batch_rx.as_ref().ok_or(ConnectorError::InvalidState {
142            expected: "Running".into(),
143            actual: format!("{}", self.state),
144        })?;
145
146        let mut total_rows = 0usize;
147        let mut batches: Vec<RecordBatch> = Vec::new();
148        let mut disconnected = false;
149
150        loop {
151            match rx.try_recv() {
152                Ok(batch) => {
153                    total_rows += batch.num_rows();
154                    batches.push(batch);
155                    if total_rows >= max_records {
156                        break;
157                    }
158                }
159                Err(TryRecvError::Empty) => break,
160                Err(TryRecvError::Disconnected) => {
161                    self.state = ConnectorState::Closed;
162                    disconnected = true;
163                    break;
164                }
165            }
166        }
167
168        if batches.is_empty() {
169            return if disconnected {
170                Err(ConnectorError::Closed)
171            } else {
172                Ok(None)
173            };
174        }
175
176        self.checkpoint_seq += 1;
177
178        if batches.len() == 1 {
179            return Ok(Some(SourceBatch::new(batches.into_iter().next().unwrap())));
180        }
181
182        let schema = batches[0].schema();
183        let combined =
184            arrow_select::concat::concat_batches(&schema, batches.iter()).map_err(|e| {
185                ConnectorError::ReadError(format!("failed to concatenate OTel batches: {e}"))
186            })?;
187
188        Ok(Some(SourceBatch::new(combined)))
189    }
190
191    async fn discover_schema(&mut self, properties: &std::collections::HashMap<String, String>) {
192        let sig = properties
193            .get("signals")
194            .or_else(|| properties.get("signal"));
195        if let Some(sig) = sig {
196            match OtelSignal::parse(sig) {
197                Ok(signal) => {
198                    self.schema = match signal {
199                        OtelSignal::Traces => traces_schema(),
200                        OtelSignal::Metrics => metrics_schema(),
201                        OtelSignal::Logs => logs_schema(),
202                    };
203                }
204                Err(e) => {
205                    tracing::warn!(%e, "invalid OTel signal type, using default traces schema");
206                }
207            }
208        }
209    }
210
211    fn schema(&self) -> SchemaRef {
212        Arc::clone(&self.schema)
213    }
214
215    fn checkpoint(&self) -> SourceCheckpoint {
216        let mut cp = SourceCheckpoint::new(self.checkpoint_seq);
217        cp.set_offset(
218            "records_received",
219            self.records_received.load(Ordering::Relaxed).to_string(),
220        );
221        cp.set_offset(
222            "requests_received",
223            self.requests_received.load(Ordering::Relaxed).to_string(),
224        );
225        cp.set_metadata("connector", "otel");
226        cp.set_metadata("signals", format!("{:?}", self.config.signals));
227        cp
228    }
229
230    async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
231        tracing::warn!(
232            "OTel source restore: push-only transport, data gap expected since last checkpoint"
233        );
234        Ok(())
235    }
236
237    fn health_check(&self) -> HealthStatus {
238        match self.state {
239            ConnectorState::Running => {
240                if self
241                    .server_handle
242                    .as_ref()
243                    .is_some_and(|h| !h.is_finished())
244                {
245                    HealthStatus::Healthy
246                } else {
247                    HealthStatus::Unhealthy("gRPC server task exited".into())
248                }
249            }
250            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
251            ConnectorState::Paused | ConnectorState::Recovering => {
252                HealthStatus::Degraded(format!("{}", self.state))
253            }
254            ConnectorState::Closed | ConnectorState::Failed => {
255                HealthStatus::Unhealthy(format!("{}", self.state))
256            }
257        }
258    }
259
260    #[allow(clippy::cast_precision_loss)]
261    fn metrics(&self) -> ConnectorMetrics {
262        ConnectorMetrics {
263            records_total: self.records_received.load(Ordering::Relaxed),
264            bytes_total: 0,
265            errors_total: 0,
266            lag: 0,
267            custom: vec![
268                (
269                    "requests_received".into(),
270                    self.requests_received.load(Ordering::Relaxed) as f64,
271                ),
272                ("checkpoint_seq".into(), self.checkpoint_seq as f64),
273            ],
274        }
275    }
276
277    async fn close(&mut self) -> Result<(), ConnectorError> {
278        tracing::info!("OTel source connector shutting down");
279
280        if let Some(tx) = self.shutdown_tx.take() {
281            let _ = tx.send(true);
282        }
283
284        self.batch_rx.take();
285
286        if let Some(handle) = self.server_handle.take() {
287            if tokio::time::timeout(Duration::from_secs(5), handle)
288                .await
289                .is_err()
290            {
291                tracing::warn!("OTel gRPC server did not shut down within 5s");
292            }
293        }
294
295        self.state = ConnectorState::Closed;
296        Ok(())
297    }
298
299    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
300        Some(Arc::clone(&self.data_ready))
301    }
302
303    fn supports_replay(&self) -> bool {
304        false
305    }
306}
307
308/// Spawn a tonic gRPC server on a pre-bound listener with graceful shutdown.
309fn spawn_grpc_server<S>(
310    svc: S,
311    incoming: TcpIncoming,
312    mut shutdown_rx: watch::Receiver<bool>,
313) -> JoinHandle<()>
314where
315    S: tonic::codegen::Service<
316            tonic::codegen::http::Request<tonic::body::Body>,
317            Response = tonic::codegen::http::Response<tonic::body::Body>,
318            Error = std::convert::Infallible,
319        > + tonic::server::NamedService
320        + Clone
321        + Send
322        + Sync
323        + 'static,
324    S::Future: Send + 'static,
325{
326    tokio::spawn(async move {
327        if let Err(e) = tonic::transport::Server::builder()
328            .add_service(svc)
329            .serve_with_incoming_shutdown(incoming, async move {
330                let _ = shutdown_rx.wait_for(|&v| v).await;
331            })
332            .await
333        {
334            tracing::error!(error = %e, "OTel gRPC server exited with error");
335        }
336    })
337}
338
339impl std::fmt::Debug for OtelSource {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        f.debug_struct("OtelSource")
342            .field("state", &self.state)
343            .field("config", &self.config)
344            .field(
345                "records_received",
346                &self.records_received.load(Ordering::Relaxed),
347            )
348            .field(
349                "requests_received",
350                &self.requests_received.load(Ordering::Relaxed),
351            )
352            .field("checkpoint_seq", &self.checkpoint_seq)
353            .field(
354                "server_running",
355                &self.server_handle.as_ref().map(|h| !h.is_finished()),
356            )
357            .field("has_shutdown_tx", &self.shutdown_tx.is_some())
358            .finish_non_exhaustive()
359    }
360}