Skip to main content

laminar_connectors/otel/
convert.rs

1//! Protobuf-to-Arrow conversion for OTel traces, metrics, and logs.
2//!
3//! Flattens the nested protobuf hierarchies into flat Arrow `RecordBatch` rows.
4
5use std::sync::Arc;
6
7use arrow_array::builder::{
8    FixedSizeBinaryBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder,
9    TimestampNanosecondBuilder, UInt64Builder,
10};
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13
14use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
15use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
16use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
17use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
18use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point};
19use opentelemetry_proto::tonic::resource::v1::Resource;
20
21/// Pad or truncate a byte slice to exactly `len` bytes.
22/// Returns the input directly if it's already the right size.
23fn fixed_bytes(src: &[u8], len: usize) -> std::borrow::Cow<'_, [u8]> {
24    if src.len() == len {
25        std::borrow::Cow::Borrowed(src)
26    } else {
27        let mut out = vec![0u8; len];
28        let copy_len = src.len().min(len);
29        out[..copy_len].copy_from_slice(&src[..copy_len]);
30        std::borrow::Cow::Owned(out)
31    }
32}
33
34/// Returns `true` if every byte in the slice is zero.
35fn is_all_zeros(bytes: &[u8]) -> bool {
36    bytes.iter().all(|&b| b == 0)
37}
38
39/// Extract a string attribute value from resource attributes by key.
40fn extract_resource_attr(resource: Option<&Resource>, key: &str) -> Option<String> {
41    resource.and_then(|r| {
42        r.attributes.iter().find_map(|kv| {
43            if kv.key == key {
44                any_value_to_string(kv.value.as_ref())
45            } else {
46                None
47            }
48        })
49    })
50}
51
52/// Serialize resource attributes to JSON, excluding promoted fields.
53fn resource_attributes_json(resource: Option<&Resource>) -> Option<String> {
54    let attrs: Vec<&KeyValue> = resource
55        .map(|r| {
56            r.attributes
57                .iter()
58                .filter(|kv| kv.key != "service.name" && kv.key != "service.version")
59                .collect()
60        })
61        .unwrap_or_default();
62
63    if attrs.is_empty() {
64        return None;
65    }
66
67    Some(key_values_to_json(&attrs))
68}
69
70/// Serialize a `KeyValue` slice to a JSON object string, or `None` if empty.
71fn kv_to_json(attrs: &[KeyValue]) -> Option<String> {
72    if attrs.is_empty() {
73        return None;
74    }
75    let refs: Vec<&KeyValue> = attrs.iter().collect();
76    Some(key_values_to_json(&refs))
77}
78
79/// Convert a list of `KeyValue` to a JSON object string.
80fn key_values_to_json(kvs: &[&KeyValue]) -> String {
81    let mut buf = String::with_capacity(kvs.len() * 32);
82    buf.push('{');
83    for (i, kv) in kvs.iter().enumerate() {
84        if i > 0 {
85            buf.push(',');
86        }
87        // Key is always a JSON string
88        write_json_string(&mut buf, &kv.key);
89        buf.push(':');
90        // Value
91        if let Some(v) = &kv.value {
92            write_any_value_json(&mut buf, v);
93        } else {
94            buf.push_str("null");
95        }
96    }
97    buf.push('}');
98    buf
99}
100
101/// Write a JSON-escaped string.
102fn write_json_string(buf: &mut String, s: &str) {
103    buf.push('"');
104    for c in s.chars() {
105        match c {
106            '"' => buf.push_str("\\\""),
107            '\\' => buf.push_str("\\\\"),
108            '\n' => buf.push_str("\\n"),
109            '\r' => buf.push_str("\\r"),
110            '\t' => buf.push_str("\\t"),
111            c if c.is_control() => {
112                use std::fmt::Write;
113                let _ = write!(buf, "\\u{:04x}", c as u32);
114            }
115            c => buf.push(c),
116        }
117    }
118    buf.push('"');
119}
120
121/// Maximum nesting depth for `AnyValue` JSON serialization.
122/// Protects against stack overflow from malicious OTLP payloads.
123const MAX_JSON_DEPTH: usize = 32;
124
125/// Write an `AnyValue` as JSON with bounded recursion depth.
126fn write_any_value_json(buf: &mut String, v: &AnyValue) {
127    write_any_value_json_depth(buf, v, 0);
128}
129
130fn write_any_value_json_depth(buf: &mut String, v: &AnyValue, depth: usize) {
131    use opentelemetry_proto::tonic::common::v1::any_value::Value;
132    if depth >= MAX_JSON_DEPTH {
133        buf.push_str("null");
134        return;
135    }
136    match &v.value {
137        Some(Value::StringValue(s)) => write_json_string(buf, s),
138        Some(Value::BoolValue(b)) => buf.push_str(if *b { "true" } else { "false" }),
139        Some(Value::IntValue(i)) => {
140            use std::fmt::Write;
141            let _ = write!(buf, "{i}");
142        }
143        Some(Value::DoubleValue(d)) => {
144            if d.is_finite() {
145                use std::fmt::Write;
146                let _ = write!(buf, "{d}");
147            } else {
148                buf.push_str("null");
149            }
150        }
151        Some(Value::ArrayValue(arr)) => {
152            buf.push('[');
153            for (i, val) in arr.values.iter().enumerate() {
154                if i > 0 {
155                    buf.push(',');
156                }
157                write_any_value_json_depth(buf, val, depth + 1);
158            }
159            buf.push(']');
160        }
161        Some(Value::KvlistValue(kvl)) => {
162            buf.push('{');
163            for (i, kv) in kvl.values.iter().enumerate() {
164                if i > 0 {
165                    buf.push(',');
166                }
167                write_json_string(buf, &kv.key);
168                buf.push(':');
169                if let Some(val) = &kv.value {
170                    write_any_value_json_depth(buf, val, depth + 1);
171                } else {
172                    buf.push_str("null");
173                }
174            }
175            buf.push('}');
176        }
177        Some(Value::BytesValue(b)) => {
178            use base64::Engine;
179            buf.push('"');
180            buf.push_str(&base64::engine::general_purpose::STANDARD.encode(b));
181            buf.push('"');
182        }
183        None => buf.push_str("null"),
184    }
185}
186
187/// Convert an `AnyValue` to a `String` (for promoted fields like service.name).
188fn any_value_to_string(v: Option<&AnyValue>) -> Option<String> {
189    use opentelemetry_proto::tonic::common::v1::any_value::Value;
190    v.and_then(|av| match &av.value {
191        Some(Value::StringValue(s)) => Some(s.clone()),
192        Some(Value::IntValue(i)) => Some(i.to_string()),
193        Some(Value::BoolValue(b)) => Some(b.to_string()),
194        Some(Value::DoubleValue(d)) => Some(d.to_string()),
195        _ => None,
196    })
197}
198
199/// Convert an `ExportTraceServiceRequest` into a `RecordBatch`, or `None` if empty.
200///
201/// # Errors
202///
203/// Returns `ArrowError` if column construction fails.
204#[allow(clippy::too_many_lines)] // 20 columns = inherently long
205pub fn trace_request_to_batch(
206    req: &ExportTraceServiceRequest,
207    schema: &SchemaRef,
208    received_at_nanos: i64,
209) -> Result<Option<RecordBatch>, arrow_schema::ArrowError> {
210    // Count total spans for capacity hints
211    let total_spans: usize = req
212        .resource_spans
213        .iter()
214        .flat_map(|rs| &rs.scope_spans)
215        .map(|ss| ss.spans.len())
216        .sum();
217
218    if total_spans == 0 {
219        return Ok(None);
220    }
221
222    // Column builders
223    let mut trace_ids = FixedSizeBinaryBuilder::with_capacity(total_spans, 16);
224    let mut span_ids = FixedSizeBinaryBuilder::with_capacity(total_spans, 8);
225    let mut parent_span_ids = FixedSizeBinaryBuilder::with_capacity(total_spans, 8);
226    let mut trace_states = StringBuilder::with_capacity(total_spans, total_spans * 8);
227    let mut names = StringBuilder::with_capacity(total_spans, total_spans * 32);
228    let mut kinds = Int32Builder::with_capacity(total_spans);
229    let mut start_times = Int64Builder::with_capacity(total_spans);
230    let mut end_times = Int64Builder::with_capacity(total_spans);
231    let mut durations = Int64Builder::with_capacity(total_spans);
232    let mut status_codes = Int32Builder::with_capacity(total_spans);
233    let mut status_messages = StringBuilder::with_capacity(total_spans, total_spans * 16);
234    let mut res_service_names = StringBuilder::with_capacity(total_spans, total_spans * 16);
235    let mut res_service_versions = StringBuilder::with_capacity(total_spans, total_spans * 8);
236    let mut res_attributes = StringBuilder::with_capacity(total_spans, total_spans * 64);
237    let mut scope_names = StringBuilder::with_capacity(total_spans, total_spans * 16);
238    let mut scope_versions = StringBuilder::with_capacity(total_spans, total_spans * 8);
239    let mut span_attrs = StringBuilder::with_capacity(total_spans, total_spans * 64);
240    let mut events_counts = Int32Builder::with_capacity(total_spans);
241    let mut links_counts = Int32Builder::with_capacity(total_spans);
242    let mut received_at = TimestampNanosecondBuilder::with_capacity(total_spans);
243
244    for resource_spans in &req.resource_spans {
245        let resource = resource_spans.resource.as_ref();
246        let svc_name = extract_resource_attr(resource, "service.name");
247        let svc_version = extract_resource_attr(resource, "service.version");
248        let res_attrs_json = resource_attributes_json(resource);
249
250        for scope_spans in &resource_spans.scope_spans {
251            let scope = scope_spans.scope.as_ref();
252            let scope_n = scope.map(|s| s.name.as_str());
253            let scope_v = scope.map(|s| s.version.as_str());
254
255            for span in &scope_spans.spans {
256                // trace_id: ensure exactly 16 bytes
257                let tid = fixed_bytes(&span.trace_id, 16);
258                trace_ids.append_value(&tid)?;
259
260                // span_id: ensure exactly 8 bytes
261                let sid = fixed_bytes(&span.span_id, 8);
262                span_ids.append_value(&sid)?;
263
264                // parent_span_id: null if all zeros or empty
265                let psid = fixed_bytes(&span.parent_span_id, 8);
266                if span.parent_span_id.is_empty() || is_all_zeros(&psid) {
267                    parent_span_ids.append_null();
268                } else {
269                    parent_span_ids.append_value(&psid)?;
270                }
271
272                // trace_state
273                if span.trace_state.is_empty() {
274                    trace_states.append_null();
275                } else {
276                    trace_states.append_value(&span.trace_state);
277                }
278
279                // name
280                names.append_value(&span.name);
281
282                // kind (SpanKind enum as i32)
283                kinds.append_value(span.kind);
284
285                // timestamps
286                #[allow(clippy::cast_possible_wrap)]
287                let start_ns = span.start_time_unix_nano as i64;
288                #[allow(clippy::cast_possible_wrap)]
289                let end_ns = span.end_time_unix_nano as i64;
290                start_times.append_value(start_ns);
291                end_times.append_value(end_ns);
292                durations.append_value(end_ns.saturating_sub(start_ns));
293
294                // status
295                if let Some(status) = &span.status {
296                    status_codes.append_value(status.code);
297                    if status.message.is_empty() {
298                        status_messages.append_null();
299                    } else {
300                        status_messages.append_value(&status.message);
301                    }
302                } else {
303                    status_codes.append_value(0); // STATUS_CODE_UNSET
304                    status_messages.append_null();
305                }
306
307                // resource/scope/attributes (shared helper)
308                append_context_fields(
309                    svc_name.as_deref(),
310                    res_attrs_json.as_deref(),
311                    scope_n,
312                    &span.attributes,
313                    received_at_nanos,
314                    &mut res_service_names,
315                    &mut res_attributes,
316                    &mut scope_names,
317                    &mut span_attrs,
318                    &mut received_at,
319                );
320
321                // trace-specific extra columns: service_version, scope_version
322                append_nullable_opt(&mut res_service_versions, svc_version.as_deref());
323                append_nullable_opt(&mut scope_versions, scope_v);
324
325                // counts
326                #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
327                let ec = span.events.len() as i32;
328                #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
329                let lc = span.links.len() as i32;
330                events_counts.append_value(ec);
331                links_counts.append_value(lc);
332            }
333        }
334    }
335
336    let batch = RecordBatch::try_new(
337        Arc::clone(schema),
338        vec![
339            Arc::new(trace_ids.finish()),
340            Arc::new(span_ids.finish()),
341            Arc::new(parent_span_ids.finish()),
342            Arc::new(trace_states.finish()),
343            Arc::new(names.finish()),
344            Arc::new(kinds.finish()),
345            Arc::new(start_times.finish()),
346            Arc::new(end_times.finish()),
347            Arc::new(durations.finish()),
348            Arc::new(status_codes.finish()),
349            Arc::new(status_messages.finish()),
350            Arc::new(res_service_names.finish()),
351            Arc::new(res_service_versions.finish()),
352            Arc::new(res_attributes.finish()),
353            Arc::new(scope_names.finish()),
354            Arc::new(scope_versions.finish()),
355            Arc::new(span_attrs.finish()),
356            Arc::new(events_counts.finish()),
357            Arc::new(links_counts.finish()),
358            Arc::new(received_at.finish()),
359        ],
360    )?;
361
362    Ok(Some(batch))
363}
364
365/// Convert an `AnyValue` to a string for log body. Unlike `any_value_to_string`,
366/// this handles complex types (Array, `KvList`, Bytes) by falling back to JSON.
367fn any_value_to_body_string(v: Option<&AnyValue>) -> Option<String> {
368    use opentelemetry_proto::tonic::common::v1::any_value::Value;
369    v.and_then(|av| match &av.value {
370        Some(Value::StringValue(s)) => Some(s.clone()),
371        Some(Value::IntValue(i)) => Some(i.to_string()),
372        Some(Value::BoolValue(b)) => Some(b.to_string()),
373        Some(Value::DoubleValue(d)) => Some(d.to_string()),
374        Some(_) => {
375            let mut buf = String::new();
376            write_any_value_json(&mut buf, av);
377            Some(buf)
378        }
379        None => None,
380    })
381}
382
383// ── Metrics conversion ──
384
385struct MetricBuilders {
386    names: StringBuilder,
387    descs: StringBuilder,
388    units: StringBuilder,
389    types: Int32Builder,
390    timestamps: Int64Builder,
391    value_doubles: Float64Builder,
392    value_ints: Int64Builder,
393    hist_counts: UInt64Builder,
394    hist_sums: Float64Builder,
395    res_svc_names: StringBuilder,
396    res_attrs: StringBuilder,
397    scope_names: StringBuilder,
398    attrs: StringBuilder,
399    received_at: TimestampNanosecondBuilder,
400}
401
402struct MetricContext<'a> {
403    name: &'a str,
404    desc: &'a str,
405    unit: &'a str,
406    metric_type: i32,
407    svc_name: Option<&'a str>,
408    res_json: Option<&'a str>,
409    scope_name: Option<&'a str>,
410    received_at_nanos: i64,
411}
412
413impl MetricBuilders {
414    fn with_capacity(n: usize) -> Self {
415        Self {
416            names: StringBuilder::with_capacity(n, n * 32),
417            descs: StringBuilder::with_capacity(n, n * 32),
418            units: StringBuilder::with_capacity(n, n * 8),
419            types: Int32Builder::with_capacity(n),
420            timestamps: Int64Builder::with_capacity(n),
421            value_doubles: Float64Builder::with_capacity(n),
422            value_ints: Int64Builder::with_capacity(n),
423            hist_counts: UInt64Builder::with_capacity(n),
424            hist_sums: Float64Builder::with_capacity(n),
425            res_svc_names: StringBuilder::with_capacity(n, n * 16),
426            res_attrs: StringBuilder::with_capacity(n, n * 64),
427            scope_names: StringBuilder::with_capacity(n, n * 16),
428            attrs: StringBuilder::with_capacity(n, n * 64),
429            received_at: TimestampNanosecondBuilder::with_capacity(n),
430        }
431    }
432
433    fn append_common(&mut self, ctx: &MetricContext, time_unix_nano: u64, dp_attrs: &[KeyValue]) {
434        self.names.append_value(ctx.name);
435        append_nullable_str(&mut self.descs, ctx.desc);
436        append_nullable_str(&mut self.units, ctx.unit);
437        self.types.append_value(ctx.metric_type);
438        #[allow(clippy::cast_possible_wrap)]
439        self.timestamps.append_value(time_unix_nano as i64);
440
441        append_context_fields(
442            ctx.svc_name,
443            ctx.res_json,
444            ctx.scope_name,
445            dp_attrs,
446            ctx.received_at_nanos,
447            &mut self.res_svc_names,
448            &mut self.res_attrs,
449            &mut self.scope_names,
450            &mut self.attrs,
451            &mut self.received_at,
452        );
453    }
454
455    fn append_number_dp(
456        &mut self,
457        ctx: &MetricContext,
458        time_unix_nano: u64,
459        value: Option<&number_data_point::Value>,
460        dp_attrs: &[KeyValue],
461    ) {
462        self.append_common(ctx, time_unix_nano, dp_attrs);
463        match value {
464            Some(number_data_point::Value::AsDouble(d)) => {
465                self.value_doubles.append_value(*d);
466                self.value_ints.append_null();
467            }
468            Some(number_data_point::Value::AsInt(i)) => {
469                self.value_doubles.append_null();
470                self.value_ints.append_value(*i);
471            }
472            None => {
473                self.value_doubles.append_null();
474                self.value_ints.append_null();
475            }
476        }
477        self.hist_counts.append_null();
478        self.hist_sums.append_null();
479    }
480
481    fn append_histogram_dp(
482        &mut self,
483        ctx: &MetricContext,
484        time_unix_nano: u64,
485        count: u64,
486        sum: Option<f64>,
487        dp_attrs: &[KeyValue],
488    ) {
489        self.append_common(ctx, time_unix_nano, dp_attrs);
490        self.value_doubles.append_null();
491        self.value_ints.append_null();
492        self.hist_counts.append_value(count);
493        match sum {
494            Some(s) => self.hist_sums.append_value(s),
495            None => self.hist_sums.append_null(),
496        }
497    }
498
499    fn finish(mut self, schema: &SchemaRef) -> Result<RecordBatch, arrow_schema::ArrowError> {
500        RecordBatch::try_new(
501            Arc::clone(schema),
502            vec![
503                Arc::new(self.names.finish()),
504                Arc::new(self.descs.finish()),
505                Arc::new(self.units.finish()),
506                Arc::new(self.types.finish()),
507                Arc::new(self.timestamps.finish()),
508                Arc::new(self.value_doubles.finish()),
509                Arc::new(self.value_ints.finish()),
510                Arc::new(self.hist_counts.finish()),
511                Arc::new(self.hist_sums.finish()),
512                Arc::new(self.res_svc_names.finish()),
513                Arc::new(self.res_attrs.finish()),
514                Arc::new(self.scope_names.finish()),
515                Arc::new(self.attrs.finish()),
516                Arc::new(self.received_at.finish()),
517            ],
518        )
519    }
520}
521
522/// Convert an `ExportMetricsServiceRequest` into an Arrow `RecordBatch`.
523///
524/// # Errors
525///
526/// Returns `ArrowError` if column construction fails.
527#[allow(clippy::too_many_lines)] // 5 metric types × dispatch loop; builders already extracted
528pub fn metrics_request_to_batch(
529    req: &ExportMetricsServiceRequest,
530    schema: &SchemaRef,
531    received_at_nanos: i64,
532) -> Result<Option<RecordBatch>, arrow_schema::ArrowError> {
533    let total_points: usize = req
534        .resource_metrics
535        .iter()
536        .flat_map(|rm| &rm.scope_metrics)
537        .flat_map(|sm| &sm.metrics)
538        .map(count_metric_points)
539        .sum();
540
541    if total_points == 0 {
542        return Ok(None);
543    }
544
545    let mut b = MetricBuilders::with_capacity(total_points);
546
547    for rm in &req.resource_metrics {
548        let resource = rm.resource.as_ref();
549        let svc_name = extract_resource_attr(resource, "service.name");
550        let res_json = resource_attributes_json(resource);
551
552        for sm in &rm.scope_metrics {
553            let scope_name = sm.scope.as_ref().map(|s| s.name.as_str());
554
555            for metric in &sm.metrics {
556                let Some(data) = &metric.data else { continue };
557                let base = MetricContext {
558                    name: &metric.name,
559                    desc: &metric.description,
560                    unit: &metric.unit,
561                    metric_type: 0,
562                    svc_name: svc_name.as_deref(),
563                    res_json: res_json.as_deref(),
564                    scope_name,
565                    received_at_nanos,
566                };
567
568                match data {
569                    metric::Data::Gauge(g) => {
570                        for dp in &g.data_points {
571                            b.append_number_dp(
572                                &base,
573                                dp.time_unix_nano,
574                                dp.value.as_ref(),
575                                &dp.attributes,
576                            );
577                        }
578                    }
579                    metric::Data::Sum(s) => {
580                        for dp in &s.data_points {
581                            let ctx = MetricContext {
582                                metric_type: 1,
583                                ..base
584                            };
585                            b.append_number_dp(
586                                &ctx,
587                                dp.time_unix_nano,
588                                dp.value.as_ref(),
589                                &dp.attributes,
590                            );
591                        }
592                    }
593                    metric::Data::Histogram(h) => {
594                        for dp in &h.data_points {
595                            let ctx = MetricContext {
596                                metric_type: 2,
597                                ..base
598                            };
599                            b.append_histogram_dp(
600                                &ctx,
601                                dp.time_unix_nano,
602                                dp.count,
603                                dp.sum,
604                                &dp.attributes,
605                            );
606                        }
607                    }
608                    metric::Data::ExponentialHistogram(eh) => {
609                        for dp in &eh.data_points {
610                            let ctx = MetricContext {
611                                metric_type: 3,
612                                ..base
613                            };
614                            b.append_histogram_dp(
615                                &ctx,
616                                dp.time_unix_nano,
617                                dp.count,
618                                dp.sum,
619                                &dp.attributes,
620                            );
621                        }
622                    }
623                    metric::Data::Summary(s) => {
624                        for dp in &s.data_points {
625                            let ctx = MetricContext {
626                                metric_type: 4,
627                                ..base
628                            };
629                            b.append_histogram_dp(
630                                &ctx,
631                                dp.time_unix_nano,
632                                dp.count,
633                                Some(dp.sum),
634                                &dp.attributes,
635                            );
636                        }
637                    }
638                }
639            }
640        }
641    }
642
643    Ok(Some(b.finish(schema)?))
644}
645
646fn count_metric_points(m: &opentelemetry_proto::tonic::metrics::v1::Metric) -> usize {
647    match &m.data {
648        Some(metric::Data::Gauge(g)) => g.data_points.len(),
649        Some(metric::Data::Sum(s)) => s.data_points.len(),
650        Some(metric::Data::Histogram(h)) => h.data_points.len(),
651        Some(metric::Data::ExponentialHistogram(eh)) => eh.data_points.len(),
652        Some(metric::Data::Summary(s)) => s.data_points.len(),
653        None => 0,
654    }
655}
656
657/// Append resource/scope/attribute/`received_at` fields (shared across all metric types).
658#[allow(clippy::too_many_arguments)]
659fn append_context_fields(
660    svc_name: Option<&str>,
661    res_json: Option<&str>,
662    scope_name: Option<&str>,
663    attrs: &[KeyValue],
664    received_at_nanos: i64,
665    res_svc_names: &mut StringBuilder,
666    res_attrs: &mut StringBuilder,
667    scope_names_col: &mut StringBuilder,
668    attrs_col: &mut StringBuilder,
669    received_at: &mut TimestampNanosecondBuilder,
670) {
671    match svc_name {
672        Some(s) => res_svc_names.append_value(s),
673        None => res_svc_names.append_null(),
674    }
675    match res_json {
676        Some(s) => res_attrs.append_value(s),
677        None => res_attrs.append_null(),
678    }
679    match scope_name {
680        Some(s) if !s.is_empty() => scope_names_col.append_value(s),
681        _ => scope_names_col.append_null(),
682    }
683    match kv_to_json(attrs) {
684        Some(s) => attrs_col.append_value(&s),
685        None => attrs_col.append_null(),
686    }
687    received_at.append_value(received_at_nanos);
688}
689
690/// Append a string value as non-null if non-empty, null otherwise.
691fn append_nullable_str(builder: &mut StringBuilder, s: &str) {
692    if s.is_empty() {
693        builder.append_null();
694    } else {
695        builder.append_value(s);
696    }
697}
698
699/// Append an `Option<&str>` — `Some(non-empty)` → value, else null.
700fn append_nullable_opt(builder: &mut StringBuilder, s: Option<&str>) {
701    match s {
702        Some(v) if !v.is_empty() => builder.append_value(v),
703        _ => builder.append_null(),
704    }
705}
706
707// ── Logs conversion ──
708
709/// Convert an `ExportLogsServiceRequest` into an Arrow `RecordBatch`.
710///
711/// # Errors
712///
713/// Returns `ArrowError` if column construction fails.
714pub fn logs_request_to_batch(
715    req: &ExportLogsServiceRequest,
716    schema: &SchemaRef,
717    received_at_nanos: i64,
718) -> Result<Option<RecordBatch>, arrow_schema::ArrowError> {
719    let total_records: usize = req
720        .resource_logs
721        .iter()
722        .flat_map(|rl| &rl.scope_logs)
723        .map(|sl| sl.log_records.len())
724        .sum();
725
726    if total_records == 0 {
727        return Ok(None);
728    }
729
730    let mut ts = Int64Builder::with_capacity(total_records);
731    let mut observed_ts = Int64Builder::with_capacity(total_records);
732    let mut sev_nums = Int32Builder::with_capacity(total_records);
733    let mut sev_texts = StringBuilder::with_capacity(total_records, total_records * 8);
734    let mut bodies = StringBuilder::with_capacity(total_records, total_records * 64);
735    let mut trace_ids = FixedSizeBinaryBuilder::with_capacity(total_records, 16);
736    let mut span_ids = FixedSizeBinaryBuilder::with_capacity(total_records, 8);
737    let mut res_svc_names = StringBuilder::with_capacity(total_records, total_records * 16);
738    let mut res_attrs = StringBuilder::with_capacity(total_records, total_records * 64);
739    let mut scope_names_col = StringBuilder::with_capacity(total_records, total_records * 16);
740    let mut attrs_col = StringBuilder::with_capacity(total_records, total_records * 64);
741    let mut received_at = TimestampNanosecondBuilder::with_capacity(total_records);
742
743    for rl in &req.resource_logs {
744        let resource = rl.resource.as_ref();
745        let svc_name = extract_resource_attr(resource, "service.name");
746        let res_json = resource_attributes_json(resource);
747
748        for sl in &rl.scope_logs {
749            let scope_name = sl.scope.as_ref().map(|s| s.name.as_str());
750
751            for log in &sl.log_records {
752                #[allow(clippy::cast_possible_wrap)]
753                ts.append_value(log.time_unix_nano as i64);
754
755                if log.observed_time_unix_nano == 0 {
756                    observed_ts.append_null();
757                } else {
758                    #[allow(clippy::cast_possible_wrap)]
759                    observed_ts.append_value(log.observed_time_unix_nano as i64);
760                }
761
762                sev_nums.append_value(log.severity_number);
763
764                if log.severity_text.is_empty() {
765                    sev_texts.append_null();
766                } else {
767                    sev_texts.append_value(&log.severity_text);
768                }
769
770                match any_value_to_body_string(log.body.as_ref()) {
771                    Some(s) => bodies.append_value(&s),
772                    None => bodies.append_null(),
773                }
774
775                // trace_id: null if empty or all-zeros
776                let tid = fixed_bytes(&log.trace_id, 16);
777                if log.trace_id.is_empty() || is_all_zeros(&tid) {
778                    trace_ids.append_null();
779                } else {
780                    trace_ids.append_value(&tid)?;
781                }
782
783                // span_id: null if empty or all-zeros
784                let sid = fixed_bytes(&log.span_id, 8);
785                if log.span_id.is_empty() || is_all_zeros(&sid) {
786                    span_ids.append_null();
787                } else {
788                    span_ids.append_value(&sid)?;
789                }
790
791                append_context_fields(
792                    svc_name.as_deref(),
793                    res_json.as_deref(),
794                    scope_name,
795                    &log.attributes,
796                    received_at_nanos,
797                    &mut res_svc_names,
798                    &mut res_attrs,
799                    &mut scope_names_col,
800                    &mut attrs_col,
801                    &mut received_at,
802                );
803            }
804        }
805    }
806
807    let batch = RecordBatch::try_new(
808        Arc::clone(schema),
809        vec![
810            Arc::new(ts.finish()),
811            Arc::new(observed_ts.finish()),
812            Arc::new(sev_nums.finish()),
813            Arc::new(sev_texts.finish()),
814            Arc::new(bodies.finish()),
815            Arc::new(trace_ids.finish()),
816            Arc::new(span_ids.finish()),
817            Arc::new(res_svc_names.finish()),
818            Arc::new(res_attrs.finish()),
819            Arc::new(scope_names_col.finish()),
820            Arc::new(attrs_col.finish()),
821            Arc::new(received_at.finish()),
822        ],
823    )?;
824
825    Ok(Some(batch))
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use crate::otel::schema::traces_schema;
832    use arrow_array::Array;
833    use arrow_schema::SchemaRef;
834    use opentelemetry_proto::tonic::common::v1::{
835        any_value, AnyValue as ProtoAnyValue, InstrumentationScope,
836    };
837    use opentelemetry_proto::tonic::resource::v1::Resource as ProtoResource;
838    use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status};
839
840    fn schema() -> SchemaRef {
841        traces_schema()
842    }
843
844    fn make_kv(key: &str, val: &str) -> KeyValue {
845        KeyValue {
846            key: key.to_string(),
847            value: Some(ProtoAnyValue {
848                value: Some(any_value::Value::StringValue(val.to_string())),
849            }),
850        }
851    }
852
853    fn make_kv_int(key: &str, val: i64) -> KeyValue {
854        KeyValue {
855            key: key.to_string(),
856            value: Some(ProtoAnyValue {
857                value: Some(any_value::Value::IntValue(val)),
858            }),
859        }
860    }
861
862    fn make_test_request(num_spans: usize) -> ExportTraceServiceRequest {
863        let spans: Vec<Span> = (0..num_spans)
864            .map(|i| {
865                let mut trace_id = vec![0u8; 16];
866                trace_id[15] = (i + 1) as u8;
867                let mut span_id = vec![0u8; 8];
868                span_id[7] = (i + 1) as u8;
869
870                Span {
871                    trace_id,
872                    span_id,
873                    parent_span_id: vec![],
874                    trace_state: String::new(),
875                    name: format!("span-{i}"),
876                    kind: 1, // INTERNAL
877                    start_time_unix_nano: 1_000_000_000 * (i as u64 + 1),
878                    end_time_unix_nano: 1_000_000_000 * (i as u64 + 1) + 500_000,
879                    attributes: vec![make_kv("http.method", "GET")],
880                    dropped_attributes_count: 0,
881                    events: vec![],
882                    dropped_events_count: 0,
883                    links: vec![],
884                    dropped_links_count: 0,
885                    status: Some(Status {
886                        message: String::new(),
887                        code: 0,
888                    }),
889                    flags: 0,
890                }
891            })
892            .collect();
893
894        ExportTraceServiceRequest {
895            resource_spans: vec![ResourceSpans {
896                resource: Some(ProtoResource {
897                    attributes: vec![
898                        make_kv("service.name", "test-svc"),
899                        make_kv("service.version", "1.0.0"),
900                        make_kv("host.name", "test-host"),
901                    ],
902                    dropped_attributes_count: 0,
903                    entity_refs: vec![],
904                }),
905                scope_spans: vec![ScopeSpans {
906                    scope: Some(InstrumentationScope {
907                        name: "my-lib".to_string(),
908                        version: "0.1.0".to_string(),
909                        attributes: vec![],
910                        dropped_attributes_count: 0,
911                    }),
912                    spans,
913                    schema_url: String::new(),
914                }],
915                schema_url: String::new(),
916            }],
917        }
918    }
919
920    #[test]
921    fn test_empty_request() {
922        let req = ExportTraceServiceRequest {
923            resource_spans: vec![],
924        };
925        let result = trace_request_to_batch(&req, &schema(), 0).unwrap();
926        assert!(result.is_none());
927    }
928
929    #[test]
930    fn test_single_span() {
931        let req = make_test_request(1);
932        let batch = trace_request_to_batch(&req, &schema(), 999)
933            .unwrap()
934            .unwrap();
935        assert_eq!(batch.num_rows(), 1);
936        assert_eq!(batch.num_columns(), 20);
937    }
938
939    #[test]
940    fn test_multi_span() {
941        let req = make_test_request(10);
942        let batch = trace_request_to_batch(&req, &schema(), 999)
943            .unwrap()
944            .unwrap();
945        assert_eq!(batch.num_rows(), 10);
946    }
947
948    #[test]
949    fn test_large_batch() {
950        let req = make_test_request(10_000);
951        let batch = trace_request_to_batch(&req, &schema(), 999)
952            .unwrap()
953            .unwrap();
954        assert_eq!(batch.num_rows(), 10_000);
955    }
956
957    #[test]
958    fn test_schema_matches() {
959        let req = make_test_request(1);
960        let batch = trace_request_to_batch(&req, &schema(), 999)
961            .unwrap()
962            .unwrap();
963        assert_eq!(batch.schema(), traces_schema());
964    }
965
966    #[test]
967    fn test_trace_id_encoding() {
968        let req = make_test_request(1);
969        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
970
971        let col = batch
972            .column(0)
973            .as_any()
974            .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
975            .unwrap();
976        let tid = col.value(0);
977        // First span: trace_id[15] = 1
978        assert_eq!(tid.len(), 16);
979        assert_eq!(tid[15], 1);
980    }
981
982    #[test]
983    fn test_parent_span_id_null() {
984        let req = make_test_request(1);
985        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
986
987        let col = batch
988            .column(2)
989            .as_any()
990            .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
991            .unwrap();
992        assert!(col.is_null(0)); // empty parent => null
993    }
994
995    #[test]
996    fn test_duration_computed() {
997        let req = make_test_request(1);
998        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
999
1000        let dur_col = batch
1001            .column(8)
1002            .as_any()
1003            .downcast_ref::<arrow_array::Int64Array>()
1004            .unwrap();
1005        assert_eq!(dur_col.value(0), 500_000); // end - start
1006    }
1007
1008    #[test]
1009    fn test_service_name_promoted() {
1010        let req = make_test_request(1);
1011        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1012
1013        let col = batch
1014            .column(11)
1015            .as_any()
1016            .downcast_ref::<arrow_array::StringArray>()
1017            .unwrap();
1018        assert_eq!(col.value(0), "test-svc");
1019    }
1020
1021    #[test]
1022    fn test_resource_attrs_exclude_promoted() {
1023        let req = make_test_request(1);
1024        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1025
1026        let col = batch
1027            .column(13)
1028            .as_any()
1029            .downcast_ref::<arrow_array::StringArray>()
1030            .unwrap();
1031        let json = col.value(0);
1032        // Should contain host.name but NOT service.name/service.version
1033        assert!(json.contains("host.name"));
1034        assert!(!json.contains("service.name"));
1035        assert!(!json.contains("service.version"));
1036    }
1037
1038    #[test]
1039    fn test_attributes_json() {
1040        let req = make_test_request(1);
1041        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1042
1043        let col = batch
1044            .column(16)
1045            .as_any()
1046            .downcast_ref::<arrow_array::StringArray>()
1047            .unwrap();
1048        let json = col.value(0);
1049        assert!(json.contains("http.method"));
1050        assert!(json.contains("GET"));
1051    }
1052
1053    #[test]
1054    fn test_scope_fields() {
1055        let req = make_test_request(1);
1056        let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1057
1058        let name_col = batch
1059            .column(14)
1060            .as_any()
1061            .downcast_ref::<arrow_array::StringArray>()
1062            .unwrap();
1063        assert_eq!(name_col.value(0), "my-lib");
1064
1065        let ver_col = batch
1066            .column(15)
1067            .as_any()
1068            .downcast_ref::<arrow_array::StringArray>()
1069            .unwrap();
1070        assert_eq!(ver_col.value(0), "0.1.0");
1071    }
1072
1073    #[test]
1074    fn test_received_at() {
1075        let req = make_test_request(3);
1076        let batch = trace_request_to_batch(&req, &schema(), 42)
1077            .unwrap()
1078            .unwrap();
1079
1080        let col = batch
1081            .column(19)
1082            .as_any()
1083            .downcast_ref::<arrow_array::TimestampNanosecondArray>()
1084            .unwrap();
1085        for i in 0..3 {
1086            assert_eq!(col.value(i), 42);
1087        }
1088    }
1089
1090    #[test]
1091    fn test_int_attribute_to_string() {
1092        let kv = make_kv_int("retries", 3);
1093        let s = any_value_to_string(kv.value.as_ref());
1094        assert_eq!(s, Some("3".to_string()));
1095    }
1096
1097    #[test]
1098    fn test_json_escaping() {
1099        let mut buf = String::new();
1100        write_json_string(&mut buf, "hello \"world\"\nline2");
1101        assert_eq!(buf, r#""hello \"world\"\nline2""#);
1102    }
1103
1104    // ── Metrics tests ──
1105
1106    use crate::otel::schema::metrics_schema;
1107    use opentelemetry_proto::tonic::metrics::v1::{
1108        Gauge, Histogram, HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics,
1109        ScopeMetrics, Sum,
1110    };
1111
1112    fn metrics_schema_ref() -> SchemaRef {
1113        metrics_schema()
1114    }
1115
1116    fn make_gauge_metric(name: &str, value: f64, ts: u64) -> Metric {
1117        Metric {
1118            name: name.to_string(),
1119            description: "test gauge".to_string(),
1120            unit: "ms".to_string(),
1121            metadata: vec![],
1122            data: Some(metric::Data::Gauge(Gauge {
1123                data_points: vec![NumberDataPoint {
1124                    attributes: vec![make_kv("host", "web-1")],
1125                    start_time_unix_nano: 0,
1126                    time_unix_nano: ts,
1127                    exemplars: vec![],
1128                    flags: 0,
1129                    value: Some(number_data_point::Value::AsDouble(value)),
1130                }],
1131            })),
1132        }
1133    }
1134
1135    fn make_metrics_request(metrics: Vec<Metric>) -> ExportMetricsServiceRequest {
1136        ExportMetricsServiceRequest {
1137            resource_metrics: vec![ResourceMetrics {
1138                resource: Some(ProtoResource {
1139                    attributes: vec![make_kv("service.name", "metrics-svc")],
1140                    dropped_attributes_count: 0,
1141                    entity_refs: vec![],
1142                }),
1143                scope_metrics: vec![ScopeMetrics {
1144                    scope: Some(InstrumentationScope {
1145                        name: "meter".to_string(),
1146                        version: "1.0".to_string(),
1147                        attributes: vec![],
1148                        dropped_attributes_count: 0,
1149                    }),
1150                    metrics,
1151                    schema_url: String::new(),
1152                }],
1153                schema_url: String::new(),
1154            }],
1155        }
1156    }
1157
1158    #[test]
1159    fn test_metrics_empty_request() {
1160        let req = ExportMetricsServiceRequest {
1161            resource_metrics: vec![],
1162        };
1163        assert!(metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1164            .unwrap()
1165            .is_none());
1166    }
1167
1168    #[test]
1169    fn test_metrics_gauge_double() {
1170        let req = make_metrics_request(vec![make_gauge_metric("cpu.usage", 42.5, 1000)]);
1171        let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 99)
1172            .unwrap()
1173            .unwrap();
1174        assert_eq!(batch.num_rows(), 1);
1175        assert_eq!(batch.num_columns(), 14);
1176
1177        // metric_name
1178        let names = batch
1179            .column(0)
1180            .as_any()
1181            .downcast_ref::<arrow_array::StringArray>()
1182            .unwrap();
1183        assert_eq!(names.value(0), "cpu.usage");
1184
1185        // metric_type = 0 (gauge)
1186        let types = batch
1187            .column(3)
1188            .as_any()
1189            .downcast_ref::<arrow_array::Int32Array>()
1190            .unwrap();
1191        assert_eq!(types.value(0), 0);
1192
1193        // value_double
1194        let vd = batch
1195            .column(5)
1196            .as_any()
1197            .downcast_ref::<arrow_array::Float64Array>()
1198            .unwrap();
1199        assert!((vd.value(0) - 42.5).abs() < f64::EPSILON);
1200
1201        // value_int is null
1202        let vi = batch
1203            .column(6)
1204            .as_any()
1205            .downcast_ref::<arrow_array::Int64Array>()
1206            .unwrap();
1207        assert!(vi.is_null(0));
1208
1209        // histogram fields null
1210        assert!(batch
1211            .column(7)
1212            .as_any()
1213            .downcast_ref::<arrow_array::UInt64Array>()
1214            .unwrap()
1215            .is_null(0));
1216    }
1217
1218    #[test]
1219    fn test_metrics_sum_int() {
1220        let metric = Metric {
1221            name: "requests".to_string(),
1222            description: String::new(),
1223            unit: "1".to_string(),
1224            metadata: vec![],
1225            data: Some(metric::Data::Sum(Sum {
1226                data_points: vec![NumberDataPoint {
1227                    attributes: vec![],
1228                    start_time_unix_nano: 0,
1229                    time_unix_nano: 2000,
1230                    exemplars: vec![],
1231                    flags: 0,
1232                    value: Some(number_data_point::Value::AsInt(100)),
1233                }],
1234                aggregation_temporality: 2,
1235                is_monotonic: true,
1236            })),
1237        };
1238        let req = make_metrics_request(vec![metric]);
1239        let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1240            .unwrap()
1241            .unwrap();
1242
1243        // metric_type = 1 (sum)
1244        let types = batch
1245            .column(3)
1246            .as_any()
1247            .downcast_ref::<arrow_array::Int32Array>()
1248            .unwrap();
1249        assert_eq!(types.value(0), 1);
1250
1251        // value_int = 100
1252        let vi = batch
1253            .column(6)
1254            .as_any()
1255            .downcast_ref::<arrow_array::Int64Array>()
1256            .unwrap();
1257        assert_eq!(vi.value(0), 100);
1258
1259        // value_double is null
1260        assert!(batch
1261            .column(5)
1262            .as_any()
1263            .downcast_ref::<arrow_array::Float64Array>()
1264            .unwrap()
1265            .is_null(0));
1266    }
1267
1268    #[test]
1269    fn test_metrics_histogram() {
1270        let metric = Metric {
1271            name: "latency".to_string(),
1272            description: String::new(),
1273            unit: "ms".to_string(),
1274            metadata: vec![],
1275            data: Some(metric::Data::Histogram(Histogram {
1276                data_points: vec![HistogramDataPoint {
1277                    attributes: vec![],
1278                    start_time_unix_nano: 0,
1279                    time_unix_nano: 3000,
1280                    count: 42,
1281                    sum: Some(1234.5),
1282                    bucket_counts: vec![10, 20, 12],
1283                    explicit_bounds: vec![10.0, 50.0],
1284                    exemplars: vec![],
1285                    flags: 0,
1286                    min: None,
1287                    max: None,
1288                }],
1289                aggregation_temporality: 1,
1290            })),
1291        };
1292        let req = make_metrics_request(vec![metric]);
1293        let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1294            .unwrap()
1295            .unwrap();
1296
1297        // metric_type = 2 (histogram)
1298        let types = batch
1299            .column(3)
1300            .as_any()
1301            .downcast_ref::<arrow_array::Int32Array>()
1302            .unwrap();
1303        assert_eq!(types.value(0), 2);
1304
1305        // histogram_count = 42
1306        let hc = batch
1307            .column(7)
1308            .as_any()
1309            .downcast_ref::<arrow_array::UInt64Array>()
1310            .unwrap();
1311        assert_eq!(hc.value(0), 42);
1312
1313        // histogram_sum = 1234.5
1314        let hs = batch
1315            .column(8)
1316            .as_any()
1317            .downcast_ref::<arrow_array::Float64Array>()
1318            .unwrap();
1319        assert!((hs.value(0) - 1234.5).abs() < f64::EPSILON);
1320    }
1321
1322    #[test]
1323    fn test_metrics_schema_matches() {
1324        let req = make_metrics_request(vec![make_gauge_metric("x", 1.0, 0)]);
1325        let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1326            .unwrap()
1327            .unwrap();
1328        assert_eq!(batch.schema(), metrics_schema());
1329    }
1330
1331    #[test]
1332    fn test_metrics_service_name() {
1333        let req = make_metrics_request(vec![make_gauge_metric("x", 1.0, 0)]);
1334        let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1335            .unwrap()
1336            .unwrap();
1337        let col = batch
1338            .column(9)
1339            .as_any()
1340            .downcast_ref::<arrow_array::StringArray>()
1341            .unwrap();
1342        assert_eq!(col.value(0), "metrics-svc");
1343    }
1344
1345    // ── Logs tests ──
1346
1347    use crate::otel::schema::logs_schema;
1348    use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
1349
1350    fn logs_schema_ref() -> SchemaRef {
1351        logs_schema()
1352    }
1353
1354    fn make_log_record(body: &str, severity: i32, ts: u64) -> LogRecord {
1355        LogRecord {
1356            time_unix_nano: ts,
1357            observed_time_unix_nano: ts + 1000,
1358            severity_number: severity,
1359            severity_text: "INFO".to_string(),
1360            body: Some(ProtoAnyValue {
1361                value: Some(any_value::Value::StringValue(body.to_string())),
1362            }),
1363            attributes: vec![make_kv("env", "prod")],
1364            dropped_attributes_count: 0,
1365            flags: 0,
1366            trace_id: vec![],
1367            span_id: vec![],
1368            event_name: String::new(),
1369        }
1370    }
1371
1372    fn make_logs_request(records: Vec<LogRecord>) -> ExportLogsServiceRequest {
1373        ExportLogsServiceRequest {
1374            resource_logs: vec![ResourceLogs {
1375                resource: Some(ProtoResource {
1376                    attributes: vec![make_kv("service.name", "log-svc")],
1377                    dropped_attributes_count: 0,
1378                    entity_refs: vec![],
1379                }),
1380                scope_logs: vec![ScopeLogs {
1381                    scope: Some(InstrumentationScope {
1382                        name: "logger".to_string(),
1383                        version: "0.1".to_string(),
1384                        attributes: vec![],
1385                        dropped_attributes_count: 0,
1386                    }),
1387                    log_records: records,
1388                    schema_url: String::new(),
1389                }],
1390                schema_url: String::new(),
1391            }],
1392        }
1393    }
1394
1395    #[test]
1396    fn test_logs_empty_request() {
1397        let req = ExportLogsServiceRequest {
1398            resource_logs: vec![],
1399        };
1400        assert!(logs_request_to_batch(&req, &logs_schema_ref(), 0)
1401            .unwrap()
1402            .is_none());
1403    }
1404
1405    #[test]
1406    fn test_logs_single_record() {
1407        let req = make_logs_request(vec![make_log_record("hello world", 9, 5000)]);
1408        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 77)
1409            .unwrap()
1410            .unwrap();
1411        assert_eq!(batch.num_rows(), 1);
1412        assert_eq!(batch.num_columns(), 12);
1413    }
1414
1415    #[test]
1416    fn test_logs_body_string() {
1417        let req = make_logs_request(vec![make_log_record("test message", 9, 0)]);
1418        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1419            .unwrap()
1420            .unwrap();
1421        let col = batch
1422            .column(4)
1423            .as_any()
1424            .downcast_ref::<arrow_array::StringArray>()
1425            .unwrap();
1426        assert_eq!(col.value(0), "test message");
1427    }
1428
1429    #[test]
1430    fn test_logs_severity() {
1431        let req = make_logs_request(vec![make_log_record("err", 17, 0)]);
1432        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1433            .unwrap()
1434            .unwrap();
1435        let sev = batch
1436            .column(2)
1437            .as_any()
1438            .downcast_ref::<arrow_array::Int32Array>()
1439            .unwrap();
1440        assert_eq!(sev.value(0), 17); // ERROR
1441        let text = batch
1442            .column(3)
1443            .as_any()
1444            .downcast_ref::<arrow_array::StringArray>()
1445            .unwrap();
1446        assert_eq!(text.value(0), "INFO");
1447    }
1448
1449    #[test]
1450    fn test_logs_trace_correlation() {
1451        let mut record = make_log_record("with trace", 9, 0);
1452        record.trace_id = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
1453        record.span_id = vec![0, 0, 0, 0, 0, 0, 0, 2];
1454        let req = make_logs_request(vec![record]);
1455        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1456            .unwrap()
1457            .unwrap();
1458
1459        let tid = batch
1460            .column(5)
1461            .as_any()
1462            .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1463            .unwrap();
1464        assert!(!tid.is_null(0));
1465        assert_eq!(tid.value(0)[15], 1);
1466
1467        let sid = batch
1468            .column(6)
1469            .as_any()
1470            .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1471            .unwrap();
1472        assert!(!sid.is_null(0));
1473        assert_eq!(sid.value(0)[7], 2);
1474    }
1475
1476    #[test]
1477    fn test_logs_null_trace_ids() {
1478        let req = make_logs_request(vec![make_log_record("no trace", 9, 0)]);
1479        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1480            .unwrap()
1481            .unwrap();
1482        let tid = batch
1483            .column(5)
1484            .as_any()
1485            .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1486            .unwrap();
1487        assert!(tid.is_null(0));
1488        let sid = batch
1489            .column(6)
1490            .as_any()
1491            .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1492            .unwrap();
1493        assert!(sid.is_null(0));
1494    }
1495
1496    #[test]
1497    fn test_logs_observed_timestamp() {
1498        let mut record = make_log_record("ts test", 9, 5000);
1499        record.observed_time_unix_nano = 0;
1500        let req = make_logs_request(vec![record]);
1501        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1502            .unwrap()
1503            .unwrap();
1504        let obs = batch
1505            .column(1)
1506            .as_any()
1507            .downcast_ref::<arrow_array::Int64Array>()
1508            .unwrap();
1509        assert!(obs.is_null(0));
1510    }
1511
1512    #[test]
1513    fn test_logs_schema_matches() {
1514        let req = make_logs_request(vec![make_log_record("x", 9, 0)]);
1515        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1516            .unwrap()
1517            .unwrap();
1518        assert_eq!(batch.schema(), logs_schema());
1519    }
1520
1521    #[test]
1522    fn test_logs_service_name() {
1523        let req = make_logs_request(vec![make_log_record("x", 9, 0)]);
1524        let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1525            .unwrap()
1526            .unwrap();
1527        let col = batch
1528            .column(7)
1529            .as_any()
1530            .downcast_ref::<arrow_array::StringArray>()
1531            .unwrap();
1532        assert_eq!(col.value(0), "log-svc");
1533    }
1534}