1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use async_trait::async_trait;
14use crossfire::{mpsc, AsyncRx, TryRecvError};
15use tokio::net::TcpListener;
16use tokio::sync::{watch, Notify};
17use tokio::task::JoinHandle;
18use tonic::transport::server::TcpIncoming;
19
20use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
21use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
22use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
23
24use crate::checkpoint::SourceCheckpoint;
25use crate::config::{ConnectorConfig, ConnectorState};
26use crate::connector::{SourceBatch, SourceConnector};
27use crate::error::ConnectorError;
28use crate::health::HealthStatus;
29use crate::metrics::ConnectorMetrics;
30
31use super::config::{OtelSignal, OtelSourceConfig};
32use super::schema::{logs_schema, metrics_schema, traces_schema};
33use super::server::OtelReceiver;
34
35pub struct OtelSource {
41 config: OtelSourceConfig,
42 schema: SchemaRef,
43 state: ConnectorState,
44 batch_rx: Option<AsyncRx<mpsc::Array<RecordBatch>>>,
45 data_ready: Arc<Notify>,
46 server_handle: Option<JoinHandle<()>>,
47 shutdown_tx: Option<watch::Sender<bool>>,
48 records_received: Arc<AtomicU64>,
50 requests_received: Arc<AtomicU64>,
51 checkpoint_seq: u64,
52}
53
54impl OtelSource {
55 #[must_use]
57 pub fn new(schema: SchemaRef, _registry: Option<&prometheus::Registry>) -> Self {
58 Self {
59 config: OtelSourceConfig::default(),
60 schema,
61 state: ConnectorState::Created,
62 batch_rx: None,
63 data_ready: Arc::new(Notify::new()),
64 server_handle: None,
65 shutdown_tx: None,
66 records_received: Arc::new(AtomicU64::new(0)),
67 requests_received: Arc::new(AtomicU64::new(0)),
68 checkpoint_seq: 0,
69 }
70 }
71}
72
73#[async_trait]
74impl SourceConnector for OtelSource {
75 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
76 self.state = ConnectorState::Initializing;
77 self.config = OtelSourceConfig::from_config(config)?;
78
79 self.schema = match self.config.signals {
80 OtelSignal::Traces => traces_schema(),
81 OtelSignal::Metrics => metrics_schema(),
82 OtelSignal::Logs => logs_schema(),
83 };
84
85 let (batch_tx, batch_rx) = mpsc::bounded_async::<RecordBatch>(self.config.channel_capacity);
86 self.batch_rx = Some(batch_rx);
87
88 let addr = self.config.socket_addr();
89
90 let listener = TcpListener::bind(&addr)
93 .await
94 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to bind {addr}: {e}")))?;
95 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
96
97 let send_timeout = Duration::from_secs(5);
98 let (shutdown_tx, shutdown_rx) = watch::channel(false);
99 self.shutdown_tx = Some(shutdown_tx);
100
101 let receiver = OtelReceiver::new(
102 batch_tx,
103 Arc::clone(&self.schema),
104 Arc::clone(&self.data_ready),
105 Arc::clone(&self.records_received),
106 Arc::clone(&self.requests_received),
107 send_timeout,
108 self.config.batch_size,
109 );
110
111 let server_handle = match self.config.signals {
113 OtelSignal::Traces => {
114 spawn_grpc_server(TraceServiceServer::new(receiver), incoming, shutdown_rx)
115 }
116 OtelSignal::Metrics => {
117 spawn_grpc_server(MetricsServiceServer::new(receiver), incoming, shutdown_rx)
118 }
119 OtelSignal::Logs => {
120 spawn_grpc_server(LogsServiceServer::new(receiver), incoming, shutdown_rx)
121 }
122 };
123
124 self.server_handle = Some(server_handle);
125 self.state = ConnectorState::Running;
126
127 tracing::info!(
128 %addr,
129 signals = ?self.config.signals,
130 batch_size = self.config.batch_size,
131 "OTel source connector started"
132 );
133
134 Ok(())
135 }
136
137 async fn poll_batch(
138 &mut self,
139 max_records: usize,
140 ) -> Result<Option<SourceBatch>, ConnectorError> {
141 let rx = self.batch_rx.as_ref().ok_or(ConnectorError::InvalidState {
142 expected: "Running".into(),
143 actual: format!("{}", self.state),
144 })?;
145
146 let mut total_rows = 0usize;
147 let mut batches: Vec<RecordBatch> = Vec::new();
148 let mut disconnected = false;
149
150 loop {
151 match rx.try_recv() {
152 Ok(batch) => {
153 total_rows += batch.num_rows();
154 batches.push(batch);
155 if total_rows >= max_records {
156 break;
157 }
158 }
159 Err(TryRecvError::Empty) => break,
160 Err(TryRecvError::Disconnected) => {
161 self.state = ConnectorState::Closed;
162 disconnected = true;
163 break;
164 }
165 }
166 }
167
168 if batches.is_empty() {
169 return if disconnected {
170 Err(ConnectorError::Closed)
171 } else {
172 Ok(None)
173 };
174 }
175
176 self.checkpoint_seq += 1;
177
178 if batches.len() == 1 {
179 return Ok(Some(SourceBatch::new(batches.into_iter().next().unwrap())));
180 }
181
182 let schema = batches[0].schema();
183 let combined =
184 arrow_select::concat::concat_batches(&schema, batches.iter()).map_err(|e| {
185 ConnectorError::ReadError(format!("failed to concatenate OTel batches: {e}"))
186 })?;
187
188 Ok(Some(SourceBatch::new(combined)))
189 }
190
191 async fn discover_schema(&mut self, properties: &std::collections::HashMap<String, String>) {
192 let sig = properties
193 .get("signals")
194 .or_else(|| properties.get("signal"));
195 if let Some(sig) = sig {
196 match OtelSignal::parse(sig) {
197 Ok(signal) => {
198 self.schema = match signal {
199 OtelSignal::Traces => traces_schema(),
200 OtelSignal::Metrics => metrics_schema(),
201 OtelSignal::Logs => logs_schema(),
202 };
203 }
204 Err(e) => {
205 tracing::warn!(%e, "invalid OTel signal type, using default traces schema");
206 }
207 }
208 }
209 }
210
211 fn schema(&self) -> SchemaRef {
212 Arc::clone(&self.schema)
213 }
214
215 fn checkpoint(&self) -> SourceCheckpoint {
216 let mut cp = SourceCheckpoint::new(self.checkpoint_seq);
217 cp.set_offset(
218 "records_received",
219 self.records_received.load(Ordering::Relaxed).to_string(),
220 );
221 cp.set_offset(
222 "requests_received",
223 self.requests_received.load(Ordering::Relaxed).to_string(),
224 );
225 cp.set_metadata("connector", "otel");
226 cp.set_metadata("signals", format!("{:?}", self.config.signals));
227 cp
228 }
229
230 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
231 tracing::warn!(
232 "OTel source restore: push-only transport, data gap expected since last checkpoint"
233 );
234 Ok(())
235 }
236
237 fn health_check(&self) -> HealthStatus {
238 match self.state {
239 ConnectorState::Running => {
240 if self
241 .server_handle
242 .as_ref()
243 .is_some_and(|h| !h.is_finished())
244 {
245 HealthStatus::Healthy
246 } else {
247 HealthStatus::Unhealthy("gRPC server task exited".into())
248 }
249 }
250 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
251 ConnectorState::Paused | ConnectorState::Recovering => {
252 HealthStatus::Degraded(format!("{}", self.state))
253 }
254 ConnectorState::Closed | ConnectorState::Failed => {
255 HealthStatus::Unhealthy(format!("{}", self.state))
256 }
257 }
258 }
259
260 #[allow(clippy::cast_precision_loss)]
261 fn metrics(&self) -> ConnectorMetrics {
262 ConnectorMetrics {
263 records_total: self.records_received.load(Ordering::Relaxed),
264 bytes_total: 0,
265 errors_total: 0,
266 lag: 0,
267 custom: vec![
268 (
269 "requests_received".into(),
270 self.requests_received.load(Ordering::Relaxed) as f64,
271 ),
272 ("checkpoint_seq".into(), self.checkpoint_seq as f64),
273 ],
274 }
275 }
276
277 async fn close(&mut self) -> Result<(), ConnectorError> {
278 tracing::info!("OTel source connector shutting down");
279
280 if let Some(tx) = self.shutdown_tx.take() {
281 let _ = tx.send(true);
282 }
283
284 self.batch_rx.take();
285
286 if let Some(handle) = self.server_handle.take() {
287 if tokio::time::timeout(Duration::from_secs(5), handle)
288 .await
289 .is_err()
290 {
291 tracing::warn!("OTel gRPC server did not shut down within 5s");
292 }
293 }
294
295 self.state = ConnectorState::Closed;
296 Ok(())
297 }
298
299 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
300 Some(Arc::clone(&self.data_ready))
301 }
302
303 fn supports_replay(&self) -> bool {
304 false
305 }
306}
307
308fn spawn_grpc_server<S>(
310 svc: S,
311 incoming: TcpIncoming,
312 mut shutdown_rx: watch::Receiver<bool>,
313) -> JoinHandle<()>
314where
315 S: tonic::codegen::Service<
316 tonic::codegen::http::Request<tonic::body::Body>,
317 Response = tonic::codegen::http::Response<tonic::body::Body>,
318 Error = std::convert::Infallible,
319 > + tonic::server::NamedService
320 + Clone
321 + Send
322 + Sync
323 + 'static,
324 S::Future: Send + 'static,
325{
326 tokio::spawn(async move {
327 if let Err(e) = tonic::transport::Server::builder()
328 .add_service(svc)
329 .serve_with_incoming_shutdown(incoming, async move {
330 let _ = shutdown_rx.wait_for(|&v| v).await;
331 })
332 .await
333 {
334 tracing::error!(error = %e, "OTel gRPC server exited with error");
335 }
336 })
337}
338
339impl std::fmt::Debug for OtelSource {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 f.debug_struct("OtelSource")
342 .field("state", &self.state)
343 .field("config", &self.config)
344 .field(
345 "records_received",
346 &self.records_received.load(Ordering::Relaxed),
347 )
348 .field(
349 "requests_received",
350 &self.requests_received.load(Ordering::Relaxed),
351 )
352 .field("checkpoint_seq", &self.checkpoint_seq)
353 .field(
354 "server_running",
355 &self.server_handle.as_ref().map(|h| !h.is_finished()),
356 )
357 .field("has_shutdown_tx", &self.shutdown_tx.is_some())
358 .finish_non_exhaustive()
359 }
360}