1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use async_trait::async_trait;
14use crossfire::{mpsc, AsyncRx, TryRecvError};
15use tokio::net::TcpListener;
16use tokio::sync::{watch, Notify};
17use tokio::task::JoinHandle;
18use tonic::transport::server::TcpIncoming;
19
20use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
21use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
22use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
23
24use crate::checkpoint::SourceCheckpoint;
25use crate::config::{ConnectorConfig, ConnectorState};
26use crate::connector::{SourceBatch, SourceConnector};
27use crate::error::ConnectorError;
28
29use super::config::{OtelSignal, OtelSourceConfig};
30use super::schema::{logs_schema, metrics_schema, traces_schema};
31use super::server::OtelReceiver;
32
33pub struct OtelSource {
39 config: OtelSourceConfig,
40 schema: SchemaRef,
41 state: ConnectorState,
42 batch_rx: Option<AsyncRx<mpsc::Array<RecordBatch>>>,
43 data_ready: Arc<Notify>,
44 server_handle: Option<JoinHandle<()>>,
45 shutdown_tx: Option<watch::Sender<bool>>,
46 records_received: Arc<AtomicU64>,
48 requests_received: Arc<AtomicU64>,
49 checkpoint_seq: u64,
50}
51
52impl OtelSource {
53 #[must_use]
55 pub fn new(schema: SchemaRef, _registry: Option<&prometheus::Registry>) -> Self {
56 Self {
57 config: OtelSourceConfig::default(),
58 schema,
59 state: ConnectorState::Created,
60 batch_rx: None,
61 data_ready: Arc::new(Notify::new()),
62 server_handle: None,
63 shutdown_tx: None,
64 records_received: Arc::new(AtomicU64::new(0)),
65 requests_received: Arc::new(AtomicU64::new(0)),
66 checkpoint_seq: 0,
67 }
68 }
69}
70
71#[async_trait]
72impl SourceConnector for OtelSource {
73 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
74 self.state = ConnectorState::Initializing;
75 self.config = OtelSourceConfig::from_config(config)?;
76
77 self.schema = match self.config.signals {
78 OtelSignal::Traces => traces_schema(),
79 OtelSignal::Metrics => metrics_schema(),
80 OtelSignal::Logs => logs_schema(),
81 };
82
83 let (batch_tx, batch_rx) = mpsc::bounded_async::<RecordBatch>(self.config.channel_capacity);
84 self.batch_rx = Some(batch_rx);
85
86 let addr = self.config.socket_addr();
87
88 let listener = TcpListener::bind(&addr)
91 .await
92 .map_err(|e| ConnectorError::ConnectionFailed(format!("failed to bind {addr}: {e}")))?;
93 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
94
95 let send_timeout = Duration::from_secs(5);
96 let (shutdown_tx, shutdown_rx) = watch::channel(false);
97 self.shutdown_tx = Some(shutdown_tx);
98
99 let receiver = OtelReceiver::new(
100 batch_tx,
101 Arc::clone(&self.schema),
102 Arc::clone(&self.data_ready),
103 Arc::clone(&self.records_received),
104 Arc::clone(&self.requests_received),
105 send_timeout,
106 self.config.batch_size,
107 );
108
109 let server_handle = match self.config.signals {
111 OtelSignal::Traces => {
112 spawn_grpc_server(TraceServiceServer::new(receiver), incoming, shutdown_rx)
113 }
114 OtelSignal::Metrics => {
115 spawn_grpc_server(MetricsServiceServer::new(receiver), incoming, shutdown_rx)
116 }
117 OtelSignal::Logs => {
118 spawn_grpc_server(LogsServiceServer::new(receiver), incoming, shutdown_rx)
119 }
120 };
121
122 self.server_handle = Some(server_handle);
123 self.state = ConnectorState::Running;
124
125 tracing::info!(
126 %addr,
127 signals = ?self.config.signals,
128 batch_size = self.config.batch_size,
129 "OTel source connector started"
130 );
131
132 Ok(())
133 }
134
135 async fn poll_batch(
136 &mut self,
137 max_records: usize,
138 ) -> Result<Option<SourceBatch>, ConnectorError> {
139 let rx = self.batch_rx.as_ref().ok_or(ConnectorError::InvalidState {
140 expected: "Running".into(),
141 actual: format!("{}", self.state),
142 })?;
143
144 let mut total_rows = 0usize;
145 let mut batches: Vec<RecordBatch> = Vec::new();
146 let mut disconnected = false;
147
148 loop {
149 match rx.try_recv() {
150 Ok(batch) => {
151 total_rows += batch.num_rows();
152 batches.push(batch);
153 if total_rows >= max_records {
154 break;
155 }
156 }
157 Err(TryRecvError::Empty) => break,
158 Err(TryRecvError::Disconnected) => {
159 self.state = ConnectorState::Closed;
160 disconnected = true;
161 break;
162 }
163 }
164 }
165
166 if batches.is_empty() {
167 return if disconnected {
168 Err(ConnectorError::Closed)
169 } else {
170 Ok(None)
171 };
172 }
173
174 self.checkpoint_seq += 1;
175
176 if batches.len() == 1 {
177 return Ok(Some(SourceBatch::new(batches.into_iter().next().unwrap())));
178 }
179
180 let schema = batches[0].schema();
181 let combined =
182 arrow_select::concat::concat_batches(&schema, batches.iter()).map_err(|e| {
183 ConnectorError::ReadError(format!("failed to concatenate OTel batches: {e}"))
184 })?;
185
186 Ok(Some(SourceBatch::new(combined)))
187 }
188
189 async fn discover_schema(
190 &mut self,
191 properties: &std::collections::HashMap<String, String>,
192 ) -> Result<(), ConnectorError> {
193 let Some(sig) = properties
194 .get("signals")
195 .or_else(|| properties.get("signal"))
196 else {
197 return Ok(());
198 };
199 let signal = OtelSignal::parse(sig).map_err(|e| {
200 ConnectorError::ConfigurationError(format!("invalid OTel signal '{sig}': {e}"))
201 })?;
202 self.schema = match signal {
203 OtelSignal::Traces => traces_schema(),
204 OtelSignal::Metrics => metrics_schema(),
205 OtelSignal::Logs => logs_schema(),
206 };
207 Ok(())
208 }
209
210 fn schema(&self) -> SchemaRef {
211 Arc::clone(&self.schema)
212 }
213
214 fn checkpoint(&self) -> SourceCheckpoint {
215 let mut cp = SourceCheckpoint::new(self.checkpoint_seq);
216 cp.set_offset(
217 "records_received",
218 self.records_received.load(Ordering::Relaxed).to_string(),
219 );
220 cp.set_offset(
221 "requests_received",
222 self.requests_received.load(Ordering::Relaxed).to_string(),
223 );
224 cp.set_metadata("connector", "otel");
225 cp.set_metadata("signals", format!("{:?}", self.config.signals));
226 cp
227 }
228
229 async fn restore(&mut self, _checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
230 tracing::warn!(
231 "OTel source restore: push-only transport, data gap expected since last checkpoint"
232 );
233 Ok(())
234 }
235
236 async fn close(&mut self) -> Result<(), ConnectorError> {
237 tracing::info!("OTel source connector shutting down");
238
239 if let Some(tx) = self.shutdown_tx.take() {
240 let _ = tx.send(true);
241 }
242
243 self.batch_rx.take();
244
245 if let Some(handle) = self.server_handle.take() {
246 if tokio::time::timeout(Duration::from_secs(5), handle)
247 .await
248 .is_err()
249 {
250 tracing::warn!("OTel gRPC server did not shut down within 5s");
251 }
252 }
253
254 self.state = ConnectorState::Closed;
255 Ok(())
256 }
257
258 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
259 Some(Arc::clone(&self.data_ready))
260 }
261
262 fn supports_replay(&self) -> bool {
263 false
264 }
265}
266
267fn spawn_grpc_server<S>(
269 svc: S,
270 incoming: TcpIncoming,
271 mut shutdown_rx: watch::Receiver<bool>,
272) -> JoinHandle<()>
273where
274 S: tonic::codegen::Service<
275 tonic::codegen::http::Request<tonic::body::Body>,
276 Response = tonic::codegen::http::Response<tonic::body::Body>,
277 Error = std::convert::Infallible,
278 > + tonic::server::NamedService
279 + Clone
280 + Send
281 + Sync
282 + 'static,
283 S::Future: Send + 'static,
284{
285 tokio::spawn(async move {
286 if let Err(e) = tonic::transport::Server::builder()
287 .add_service(svc)
288 .serve_with_incoming_shutdown(incoming, async move {
289 let _ = shutdown_rx.wait_for(|&v| v).await;
290 })
291 .await
292 {
293 tracing::error!(error = %e, "OTel gRPC server exited with error");
294 }
295 })
296}
297
298impl std::fmt::Debug for OtelSource {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 f.debug_struct("OtelSource")
301 .field("state", &self.state)
302 .field("config", &self.config)
303 .field(
304 "records_received",
305 &self.records_received.load(Ordering::Relaxed),
306 )
307 .field(
308 "requests_received",
309 &self.requests_received.load(Ordering::Relaxed),
310 )
311 .field("checkpoint_seq", &self.checkpoint_seq)
312 .field(
313 "server_running",
314 &self.server_handle.as_ref().map(|h| !h.is_finished()),
315 )
316 .field("has_shutdown_tx", &self.shutdown_tx.is_some())
317 .finish_non_exhaustive()
318 }
319}