Skip to main content

laminar_connectors/otel/
server.rs

1//! Tonic gRPC server implementing OTLP trace, metrics, and logs services.
2//!
3//! Each receiver converts its respective protobuf request to Arrow
4//! `RecordBatch`, splits oversized batches, and forwards them through
5//! a bounded channel to the `OtelSource` connector.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use crossfire::{mpsc, MAsyncTx};
14use tokio::sync::Notify;
15use tonic::{Request, Response, Status};
16
17use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsService;
18use opentelemetry_proto::tonic::collector::logs::v1::{
19    ExportLogsServiceRequest, ExportLogsServiceResponse,
20};
21use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsService;
22use opentelemetry_proto::tonic::collector::metrics::v1::{
23    ExportMetricsServiceRequest, ExportMetricsServiceResponse,
24};
25use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
26use opentelemetry_proto::tonic::collector::trace::v1::{
27    ExportTraceServiceRequest, ExportTraceServiceResponse,
28};
29
30use super::convert::{logs_request_to_batch, metrics_request_to_batch, trace_request_to_batch};
31
32// ── Shared helpers ──
33
34/// Current wall-clock time as nanoseconds since Unix epoch.
35fn now_nanos() -> i64 {
36    std::time::SystemTime::now()
37        .duration_since(std::time::UNIX_EPOCH)
38        .map_or(0, |d| {
39            // Nanos from epoch fits in i64 until ~2262.
40            #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
41            {
42                d.as_nanos() as i64
43            }
44        })
45}
46
47/// Split a batch into chunks of at most `batch_size` rows and send each
48/// through the channel. Returns the total number of rows sent.
49async fn split_and_send(
50    batch: RecordBatch,
51    batch_size: usize,
52    tx: &MAsyncTx<mpsc::Array<RecordBatch>>,
53    notify: &Notify,
54    timeout: Duration,
55) -> Result<usize, Status> {
56    let total = batch.num_rows();
57    if total == 0 {
58        return Ok(0);
59    }
60
61    let mut offset = 0;
62    while offset < total {
63        let len = (total - offset).min(batch_size);
64        let chunk = batch.slice(offset, len);
65
66        match tokio::time::timeout(timeout, tx.send(chunk)).await {
67            Ok(Ok(())) => {
68                notify.notify_one();
69                offset += len;
70            }
71            Ok(Err(_)) => {
72                return Err(Status::unavailable("OTel receiver is shutting down"));
73            }
74            Err(_) => {
75                tracing::warn!(
76                    sent = offset,
77                    remaining = total - offset,
78                    "OTel batch send timed out — downstream backpressure"
79                );
80                return Err(Status::resource_exhausted(
81                    "pipeline backpressure: retry with backoff",
82                ));
83            }
84        }
85    }
86
87    Ok(total)
88}
89
90// ── Receiver struct (shared across all signal types) ──
91
92/// OTLP receiver that converts protobuf to Arrow and pushes to a channel.
93///
94/// One instance is created per signal type; the tonic service trait
95/// (`TraceService`, `MetricsService`, `LogsService`) is impl'd below.
96pub struct OtelReceiver {
97    batch_tx: MAsyncTx<mpsc::Array<RecordBatch>>,
98    schema: SchemaRef,
99    data_ready: Arc<Notify>,
100    records_received: Arc<AtomicU64>,
101    requests_received: Arc<AtomicU64>,
102    send_timeout: Duration,
103    batch_size: usize,
104}
105
106impl OtelReceiver {
107    /// Create a new receiver.
108    pub fn new(
109        batch_tx: MAsyncTx<mpsc::Array<RecordBatch>>,
110        schema: SchemaRef,
111        data_ready: Arc<Notify>,
112        records_received: Arc<AtomicU64>,
113        requests_received: Arc<AtomicU64>,
114        send_timeout: Duration,
115        batch_size: usize,
116    ) -> Self {
117        Self {
118            batch_tx,
119            schema,
120            data_ready,
121            records_received,
122            requests_received,
123            send_timeout,
124            batch_size,
125        }
126    }
127
128    /// Common export path: convert → split → send → count.
129    async fn handle_batch(&self, batch: Option<RecordBatch>) -> Result<usize, Status> {
130        let Some(batch) = batch else {
131            return Ok(0);
132        };
133        let n = split_and_send(
134            batch,
135            self.batch_size,
136            &self.batch_tx,
137            &self.data_ready,
138            self.send_timeout,
139        )
140        .await?;
141        #[allow(clippy::cast_possible_truncation)]
142        self.records_received.fetch_add(n as u64, Ordering::Relaxed);
143        Ok(n)
144    }
145}
146
147// ── Trait impls ──
148
149#[tonic::async_trait]
150impl TraceService for OtelReceiver {
151    async fn export(
152        &self,
153        request: Request<ExportTraceServiceRequest>,
154    ) -> Result<Response<ExportTraceServiceResponse>, Status> {
155        self.requests_received.fetch_add(1, Ordering::Relaxed);
156        let batch = trace_request_to_batch(&request.into_inner(), &self.schema, now_nanos())
157            .map_err(|e| Status::internal(format!("batch conversion failed: {e}")))?;
158        self.handle_batch(batch).await?;
159        Ok(Response::new(ExportTraceServiceResponse {
160            partial_success: None,
161        }))
162    }
163}
164
165#[tonic::async_trait]
166impl MetricsService for OtelReceiver {
167    async fn export(
168        &self,
169        request: Request<ExportMetricsServiceRequest>,
170    ) -> Result<Response<ExportMetricsServiceResponse>, Status> {
171        self.requests_received.fetch_add(1, Ordering::Relaxed);
172        let batch = metrics_request_to_batch(&request.into_inner(), &self.schema, now_nanos())
173            .map_err(|e| Status::internal(format!("batch conversion failed: {e}")))?;
174        self.handle_batch(batch).await?;
175        Ok(Response::new(ExportMetricsServiceResponse {
176            partial_success: None,
177        }))
178    }
179}
180
181#[tonic::async_trait]
182impl LogsService for OtelReceiver {
183    async fn export(
184        &self,
185        request: Request<ExportLogsServiceRequest>,
186    ) -> Result<Response<ExportLogsServiceResponse>, Status> {
187        self.requests_received.fetch_add(1, Ordering::Relaxed);
188        let batch = logs_request_to_batch(&request.into_inner(), &self.schema, now_nanos())
189            .map_err(|e| Status::internal(format!("batch conversion failed: {e}")))?;
190        self.handle_batch(batch).await?;
191        Ok(Response::new(ExportLogsServiceResponse {
192            partial_success: None,
193        }))
194    }
195}