Skip to main content

laminar_connectors/otel/
source.rs

1//! `SourceConnector` implementation for the OTel OTLP/gRPC receiver.
2//!
3//! Spawns a tonic gRPC server that accepts OTLP export RPCs for traces,
4//! metrics, or logs (one signal type per source), converts to Arrow
5//! `RecordBatch`, and delivers them via `poll_batch()`.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use async_trait::async_trait;
14use crossfire::{mpsc, AsyncRx, TryRecvError};
15use tokio::net::TcpListener;
16use tokio::sync::{watch, Notify};
17use tokio::task::JoinHandle;
18use tonic::transport::server::TcpIncoming;
19
20use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
21use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
22use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
23
24use crate::checkpoint::SourceCheckpoint;
25use crate::config::{ConnectorConfig, ConnectorState};
26use crate::connector::{SourceBatch, SourceConnector};
27use crate::error::ConnectorError;
28
29use super::config::{OtelSignal, OtelSourceConfig};
30use super::schema::{logs_schema, metrics_schema, traces_schema};
31use super::server::OtelReceiver;
32
33/// OTel OTLP/gRPC source connector.
34///
35/// Binds a gRPC server on the configured port and receives telemetry
36/// data from `OpenTelemetry` exporters and collectors. Each source
37/// handles exactly one signal type (traces, metrics, or logs).
38pub struct OtelSource {
39    config: OtelSourceConfig,
40    schema: SchemaRef,
41    state: ConnectorState,
42    batch_rx: Option<AsyncRx<mpsc::Array<RecordBatch>>>,
43    data_ready: Arc<Notify>,
44    server_handle: Option<JoinHandle<()>>,
45    shutdown_tx: Option<watch::Sender<bool>>,
46    /// Monotonic counter of records received (spans, data points, or log records).
47    records_received: Arc<AtomicU64>,
48    requests_received: Arc<AtomicU64>,
49    checkpoint_seq: u64,
50}
51
52impl OtelSource {
53    /// Create a new OTel source with the given default schema.
54    #[must_use]
55    pub fn new(schema: SchemaRef, _registry: Option<&prometheus::Registry>) -> Self {
56        Self {
57            config: OtelSourceConfig::default(),
58            schema,
59            state: ConnectorState::Created,
60            batch_rx: None,
61            data_ready: Arc::new(Notify::new()),
62            server_handle: None,
63            shutdown_tx: None,
64            records_received: Arc::new(AtomicU64::new(0)),
65            requests_received: Arc::new(AtomicU64::new(0)),
66            checkpoint_seq: 0,
67        }
68    }
69}
70
71#[async_trait]
72impl SourceConnector for OtelSource {
73    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
74        self.state = ConnectorState::Initializing;
75        self.config = OtelSourceConfig::from_config(config)?;
76
77        self.schema = match self.config.signals {
78            OtelSignal::Traces => traces_schema(),
79            OtelSignal::Metrics => metrics_schema(),
80            OtelSignal::Logs => logs_schema(),
81        };
82
83        let (batch_tx, batch_rx) = mpsc::bounded_async::<RecordBatch>(self.config.channel_capacity);
84        self.batch_rx = Some(batch_rx);
85
86        let addr = self.config.socket_addr();
87
88        // Bind the TCP listener here so port conflicts fail open(),
89        // not silently inside the background task.
90        let listener = TcpListener::bind(&addr)
91            .await
92            .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to bind {addr}: {e}")))?;
93        let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
94
95        let send_timeout = Duration::from_secs(5);
96        let (shutdown_tx, shutdown_rx) = watch::channel(false);
97        self.shutdown_tx = Some(shutdown_tx);
98
99        let receiver = OtelReceiver::new(
100            batch_tx,
101            Arc::clone(&self.schema),
102            Arc::clone(&self.data_ready),
103            Arc::clone(&self.records_received),
104            Arc::clone(&self.requests_received),
105            send_timeout,
106            self.config.batch_size,
107        );
108
109        // Spawn signal-specific gRPC server on the already-bound listener
110        let server_handle = match self.config.signals {
111            OtelSignal::Traces => {
112                spawn_grpc_server(TraceServiceServer::new(receiver), incoming, shutdown_rx)
113            }
114            OtelSignal::Metrics => {
115                spawn_grpc_server(MetricsServiceServer::new(receiver), incoming, shutdown_rx)
116            }
117            OtelSignal::Logs => {
118                spawn_grpc_server(LogsServiceServer::new(receiver), incoming, shutdown_rx)
119            }
120        };
121
122        self.server_handle = Some(server_handle);
123        self.state = ConnectorState::Running;
124
125        tracing::info!(
126            %addr,
127            signals = ?self.config.signals,
128            batch_size = self.config.batch_size,
129            "OTel source connector started"
130        );
131
132        Ok(())
133    }
134
135    async fn poll_batch(
136        &mut self,
137        max_records: usize,
138    ) -> Result<Option<SourceBatch>, ConnectorError> {
139        let rx = self.batch_rx.as_ref().ok_or(ConnectorError::InvalidState {
140            expected: "Running".into(),
141            actual: format!("{}", self.state),
142        })?;
143
144        let mut total_rows = 0usize;
145        let mut batches: Vec<RecordBatch> = Vec::new();
146        let mut disconnected = false;
147
148        loop {
149            match rx.try_recv() {
150                Ok(batch) => {
151                    total_rows += batch.num_rows();
152                    batches.push(batch);
153                    if total_rows >= max_records {
154                        break;
155                    }
156                }
157                Err(TryRecvError::Empty) => break,
158                Err(TryRecvError::Disconnected) => {
159                    self.state = ConnectorState::Closed;
160                    disconnected = true;
161                    break;
162                }
163            }
164        }
165
166        if batches.is_empty() {
167            return if disconnected {
168                Err(ConnectorError::Closed)
169            } else {
170                Ok(None)
171            };
172        }
173
174        self.checkpoint_seq += 1;
175
176        if batches.len() == 1 {
177            return Ok(Some(SourceBatch::new(batches.into_iter().next().unwrap())));
178        }
179
180        let schema = batches[0].schema();
181        let combined =
182            arrow_select::concat::concat_batches(&schema, batches.iter()).map_err(|e| {
183                ConnectorError::ReadError(format!("failed to concatenate OTel batches: {e}"))
184            })?;
185
186        Ok(Some(SourceBatch::new(combined)))
187    }
188
189    async fn discover_schema(
190        &mut self,
191        properties: &std::collections::HashMap<String, String>,
192    ) -> Result<(), ConnectorError> {
193        let Some(sig) = properties
194            .get("signals")
195            .or_else(|| properties.get("signal"))
196        else {
197            return Ok(());
198        };
199        let signal = OtelSignal::parse(sig).map_err(|e| {
200            ConnectorError::ConfigurationError(format!("invalid OTel signal '{sig}': {e}"))
201        })?;
202        self.schema = match signal {
203            OtelSignal::Traces => traces_schema(),
204            OtelSignal::Metrics => metrics_schema(),
205            OtelSignal::Logs => logs_schema(),
206        };
207        Ok(())
208    }
209
210    fn schema(&self) -> SchemaRef {
211        Arc::clone(&self.schema)
212    }
213
214    fn checkpoint(&self) -> SourceCheckpoint {
215        let mut cp = SourceCheckpoint::new(self.checkpoint_seq);
216        cp.set_offset(
217            "records_received",
218            self.records_received.load(Ordering::Relaxed).to_string(),
219        );
220        cp.set_offset(
221            "requests_received",
222            self.requests_received.load(Ordering::Relaxed).to_string(),
223        );
224        cp.set_metadata("connector", "otel");
225        cp.set_metadata("signals", format!("{:?}", self.config.signals));
226        cp
227    }
228
229    async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
230        tracing::warn!(
231            "OTel source restore: push-only transport, data gap expected since last checkpoint"
232        );
233        Ok(())
234    }
235
236    async fn close(&mut self) -> Result<(), ConnectorError> {
237        tracing::info!("OTel source connector shutting down");
238
239        if let Some(tx) = self.shutdown_tx.take() {
240            let _ = tx.send(true);
241        }
242
243        self.batch_rx.take();
244
245        if let Some(handle) = self.server_handle.take() {
246            if tokio::time::timeout(Duration::from_secs(5), handle)
247                .await
248                .is_err()
249            {
250                tracing::warn!("OTel gRPC server did not shut down within 5s");
251            }
252        }
253
254        self.state = ConnectorState::Closed;
255        Ok(())
256    }
257
258    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
259        Some(Arc::clone(&self.data_ready))
260    }
261
262    fn supports_replay(&self) -> bool {
263        false
264    }
265}
266
267/// Spawn a tonic gRPC server on a pre-bound listener with graceful shutdown.
268fn spawn_grpc_server<S>(
269    svc: S,
270    incoming: TcpIncoming,
271    mut shutdown_rx: watch::Receiver<bool>,
272) -> JoinHandle<()>
273where
274    S: tonic::codegen::Service<
275            tonic::codegen::http::Request<tonic::body::Body>,
276            Response = tonic::codegen::http::Response<tonic::body::Body>,
277            Error = std::convert::Infallible,
278        > + tonic::server::NamedService
279        + Clone
280        + Send
281        + Sync
282        + 'static,
283    S::Future: Send + 'static,
284{
285    tokio::spawn(async move {
286        if let Err(e) = tonic::transport::Server::builder()
287            .add_service(svc)
288            .serve_with_incoming_shutdown(incoming, async move {
289                let _ = shutdown_rx.wait_for(|&v| v).await;
290            })
291            .await
292        {
293            tracing::error!(error = %e, "OTel gRPC server exited with error");
294        }
295    })
296}
297
298impl std::fmt::Debug for OtelSource {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        f.debug_struct("OtelSource")
301            .field("state", &self.state)
302            .field("config", &self.config)
303            .field(
304                "records_received",
305                &self.records_received.load(Ordering::Relaxed),
306            )
307            .field(
308                "requests_received",
309                &self.requests_received.load(Ordering::Relaxed),
310            )
311            .field("checkpoint_seq", &self.checkpoint_seq)
312            .field(
313                "server_running",
314                &self.server_handle.as_ref().map(|h| !h.is_finished()),
315            )
316            .field("has_shutdown_tx", &self.shutdown_tx.is_some())
317            .finish_non_exhaustive()
318    }
319}