laminar_connectors/otel/
server.rs1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use crossfire::{mpsc, MAsyncTx};
14use tokio::sync::Notify;
15use tonic::{Request, Response, Status};
16
17use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsService;
18use opentelemetry_proto::tonic::collector::logs::v1::{
19 ExportLogsServiceRequest, ExportLogsServiceResponse,
20};
21use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsService;
22use opentelemetry_proto::tonic::collector::metrics::v1::{
23 ExportMetricsServiceRequest, ExportMetricsServiceResponse,
24};
25use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
26use opentelemetry_proto::tonic::collector::trace::v1::{
27 ExportTraceServiceRequest, ExportTraceServiceResponse,
28};
29
30use super::convert::{logs_request_to_batch, metrics_request_to_batch, trace_request_to_batch};
31
32fn now_nanos() -> i64 {
36 std::time::SystemTime::now()
37 .duration_since(std::time::UNIX_EPOCH)
38 .map_or(0, |d| {
39 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
41 {
42 d.as_nanos() as i64
43 }
44 })
45}
46
47async fn split_and_send(
50 batch: RecordBatch,
51 batch_size: usize,
52 tx: &MAsyncTx<mpsc::Array<RecordBatch>>,
53 notify: &Notify,
54 timeout: Duration,
55) -> Result<usize, Status> {
56 let total = batch.num_rows();
57 if total == 0 {
58 return Ok(0);
59 }
60
61 let mut offset = 0;
62 while offset < total {
63 let len = (total - offset).min(batch_size);
64 let chunk = batch.slice(offset, len);
65
66 match tokio::time::timeout(timeout, tx.send(chunk)).await {
67 Ok(Ok(())) => {
68 notify.notify_one();
69 offset += len;
70 }
71 Ok(Err(_)) => {
72 return Err(Status::unavailable("OTel receiver is shutting down"));
73 }
74 Err(_) => {
75 tracing::warn!(
76 sent = offset,
77 remaining = total - offset,
78 "OTel batch send timed out — downstream backpressure"
79 );
80 return Err(Status::resource_exhausted(
81 "pipeline backpressure: retry with backoff",
82 ));
83 }
84 }
85 }
86
87 Ok(total)
88}
89
90pub struct OtelReceiver {
97 batch_tx: MAsyncTx<mpsc::Array<RecordBatch>>,
98 schema: SchemaRef,
99 data_ready: Arc<Notify>,
100 records_received: Arc<AtomicU64>,
101 requests_received: Arc<AtomicU64>,
102 send_timeout: Duration,
103 batch_size: usize,
104}
105
106impl OtelReceiver {
107 pub fn new(
109 batch_tx: MAsyncTx<mpsc::Array<RecordBatch>>,
110 schema: SchemaRef,
111 data_ready: Arc<Notify>,
112 records_received: Arc<AtomicU64>,
113 requests_received: Arc<AtomicU64>,
114 send_timeout: Duration,
115 batch_size: usize,
116 ) -> Self {
117 Self {
118 batch_tx,
119 schema,
120 data_ready,
121 records_received,
122 requests_received,
123 send_timeout,
124 batch_size,
125 }
126 }
127
128 async fn handle_batch(&self, batch: Option<RecordBatch>) -> Result<usize, Status> {
130 let Some(batch) = batch else {
131 return Ok(0);
132 };
133 let n = split_and_send(
134 batch,
135 self.batch_size,
136 &self.batch_tx,
137 &self.data_ready,
138 self.send_timeout,
139 )
140 .await?;
141 #[allow(clippy::cast_possible_truncation)]
142 self.records_received.fetch_add(n as u64, Ordering::Relaxed);
143 Ok(n)
144 }
145}
146
147#[tonic::async_trait]
150impl TraceService for OtelReceiver {
151 async fn export(
152 &self,
153 request: Request<ExportTraceServiceRequest>,
154 ) -> Result<Response<ExportTraceServiceResponse>, Status> {
155 self.requests_received.fetch_add(1, Ordering::Relaxed);
156 let batch = trace_request_to_batch(&request.into_inner(), &self.schema, now_nanos())
157 .map_err(|e| Status::internal(format!("batch conversion failed: {e}")))?;
158 self.handle_batch(batch).await?;
159 Ok(Response::new(ExportTraceServiceResponse {
160 partial_success: None,
161 }))
162 }
163}
164
165#[tonic::async_trait]
166impl MetricsService for OtelReceiver {
167 async fn export(
168 &self,
169 request: Request<ExportMetricsServiceRequest>,
170 ) -> Result<Response<ExportMetricsServiceResponse>, Status> {
171 self.requests_received.fetch_add(1, Ordering::Relaxed);
172 let batch = metrics_request_to_batch(&request.into_inner(), &self.schema, now_nanos())
173 .map_err(|e| Status::internal(format!("batch conversion failed: {e}")))?;
174 self.handle_batch(batch).await?;
175 Ok(Response::new(ExportMetricsServiceResponse {
176 partial_success: None,
177 }))
178 }
179}
180
181#[tonic::async_trait]
182impl LogsService for OtelReceiver {
183 async fn export(
184 &self,
185 request: Request<ExportLogsServiceRequest>,
186 ) -> Result<Response<ExportLogsServiceResponse>, Status> {
187 self.requests_received.fetch_add(1, Ordering::Relaxed);
188 let batch = logs_request_to_batch(&request.into_inner(), &self.schema, now_nanos())
189 .map_err(|e| Status::internal(format!("batch conversion failed: {e}")))?;
190 self.handle_batch(batch).await?;
191 Ok(Response::new(ExportLogsServiceResponse {
192 partial_success: None,
193 }))
194 }
195}