1use std::sync::Arc;
6
7use arrow_array::builder::{
8 FixedSizeBinaryBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder,
9 TimestampNanosecondBuilder, UInt64Builder,
10};
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13
14use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
15use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
16use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
17use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
18use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point};
19use opentelemetry_proto::tonic::resource::v1::Resource;
20
21fn fixed_bytes(src: &[u8], len: usize) -> std::borrow::Cow<'_, [u8]> {
24 if src.len() == len {
25 std::borrow::Cow::Borrowed(src)
26 } else {
27 let mut out = vec![0u8; len];
28 let copy_len = src.len().min(len);
29 out[..copy_len].copy_from_slice(&src[..copy_len]);
30 std::borrow::Cow::Owned(out)
31 }
32}
33
34fn is_all_zeros(bytes: &[u8]) -> bool {
36 bytes.iter().all(|&b| b == 0)
37}
38
39fn extract_resource_attr(resource: Option<&Resource>, key: &str) -> Option<String> {
41 resource.and_then(|r| {
42 r.attributes.iter().find_map(|kv| {
43 if kv.key == key {
44 any_value_to_string(kv.value.as_ref())
45 } else {
46 None
47 }
48 })
49 })
50}
51
52fn resource_attributes_json(resource: Option<&Resource>) -> Option<String> {
54 let attrs: Vec<&KeyValue> = resource
55 .map(|r| {
56 r.attributes
57 .iter()
58 .filter(|kv| kv.key != "service.name" && kv.key != "service.version")
59 .collect()
60 })
61 .unwrap_or_default();
62
63 if attrs.is_empty() {
64 return None;
65 }
66
67 Some(key_values_to_json(&attrs))
68}
69
70fn kv_to_json(attrs: &[KeyValue]) -> Option<String> {
72 if attrs.is_empty() {
73 return None;
74 }
75 let refs: Vec<&KeyValue> = attrs.iter().collect();
76 Some(key_values_to_json(&refs))
77}
78
79fn key_values_to_json(kvs: &[&KeyValue]) -> String {
81 let mut buf = String::with_capacity(kvs.len() * 32);
82 buf.push('{');
83 for (i, kv) in kvs.iter().enumerate() {
84 if i > 0 {
85 buf.push(',');
86 }
87 write_json_string(&mut buf, &kv.key);
89 buf.push(':');
90 if let Some(v) = &kv.value {
92 write_any_value_json(&mut buf, v);
93 } else {
94 buf.push_str("null");
95 }
96 }
97 buf.push('}');
98 buf
99}
100
101fn write_json_string(buf: &mut String, s: &str) {
103 buf.push('"');
104 for c in s.chars() {
105 match c {
106 '"' => buf.push_str("\\\""),
107 '\\' => buf.push_str("\\\\"),
108 '\n' => buf.push_str("\\n"),
109 '\r' => buf.push_str("\\r"),
110 '\t' => buf.push_str("\\t"),
111 c if c.is_control() => {
112 use std::fmt::Write;
113 let _ = write!(buf, "\\u{:04x}", c as u32);
114 }
115 c => buf.push(c),
116 }
117 }
118 buf.push('"');
119}
120
121const MAX_JSON_DEPTH: usize = 32;
124
125fn write_any_value_json(buf: &mut String, v: &AnyValue) {
127 write_any_value_json_depth(buf, v, 0);
128}
129
130fn write_any_value_json_depth(buf: &mut String, v: &AnyValue, depth: usize) {
131 use opentelemetry_proto::tonic::common::v1::any_value::Value;
132 if depth >= MAX_JSON_DEPTH {
133 buf.push_str("null");
134 return;
135 }
136 match &v.value {
137 Some(Value::StringValue(s)) => write_json_string(buf, s),
138 Some(Value::BoolValue(b)) => buf.push_str(if *b { "true" } else { "false" }),
139 Some(Value::IntValue(i)) => {
140 use std::fmt::Write;
141 let _ = write!(buf, "{i}");
142 }
143 Some(Value::DoubleValue(d)) => {
144 if d.is_finite() {
145 use std::fmt::Write;
146 let _ = write!(buf, "{d}");
147 } else {
148 buf.push_str("null");
149 }
150 }
151 Some(Value::ArrayValue(arr)) => {
152 buf.push('[');
153 for (i, val) in arr.values.iter().enumerate() {
154 if i > 0 {
155 buf.push(',');
156 }
157 write_any_value_json_depth(buf, val, depth + 1);
158 }
159 buf.push(']');
160 }
161 Some(Value::KvlistValue(kvl)) => {
162 buf.push('{');
163 for (i, kv) in kvl.values.iter().enumerate() {
164 if i > 0 {
165 buf.push(',');
166 }
167 write_json_string(buf, &kv.key);
168 buf.push(':');
169 if let Some(val) = &kv.value {
170 write_any_value_json_depth(buf, val, depth + 1);
171 } else {
172 buf.push_str("null");
173 }
174 }
175 buf.push('}');
176 }
177 Some(Value::BytesValue(b)) => {
178 use base64::Engine;
179 buf.push('"');
180 buf.push_str(&base64::engine::general_purpose::STANDARD.encode(b));
181 buf.push('"');
182 }
183 None => buf.push_str("null"),
184 }
185}
186
187fn any_value_to_string(v: Option<&AnyValue>) -> Option<String> {
189 use opentelemetry_proto::tonic::common::v1::any_value::Value;
190 v.and_then(|av| match &av.value {
191 Some(Value::StringValue(s)) => Some(s.clone()),
192 Some(Value::IntValue(i)) => Some(i.to_string()),
193 Some(Value::BoolValue(b)) => Some(b.to_string()),
194 Some(Value::DoubleValue(d)) => Some(d.to_string()),
195 _ => None,
196 })
197}
198
199#[allow(clippy::too_many_lines)] pub fn trace_request_to_batch(
206 req: &ExportTraceServiceRequest,
207 schema: &SchemaRef,
208 received_at_nanos: i64,
209) -> Result<Option<RecordBatch>, arrow_schema::ArrowError> {
210 let total_spans: usize = req
212 .resource_spans
213 .iter()
214 .flat_map(|rs| &rs.scope_spans)
215 .map(|ss| ss.spans.len())
216 .sum();
217
218 if total_spans == 0 {
219 return Ok(None);
220 }
221
222 let mut trace_ids = FixedSizeBinaryBuilder::with_capacity(total_spans, 16);
224 let mut span_ids = FixedSizeBinaryBuilder::with_capacity(total_spans, 8);
225 let mut parent_span_ids = FixedSizeBinaryBuilder::with_capacity(total_spans, 8);
226 let mut trace_states = StringBuilder::with_capacity(total_spans, total_spans * 8);
227 let mut names = StringBuilder::with_capacity(total_spans, total_spans * 32);
228 let mut kinds = Int32Builder::with_capacity(total_spans);
229 let mut start_times = Int64Builder::with_capacity(total_spans);
230 let mut end_times = Int64Builder::with_capacity(total_spans);
231 let mut durations = Int64Builder::with_capacity(total_spans);
232 let mut status_codes = Int32Builder::with_capacity(total_spans);
233 let mut status_messages = StringBuilder::with_capacity(total_spans, total_spans * 16);
234 let mut res_service_names = StringBuilder::with_capacity(total_spans, total_spans * 16);
235 let mut res_service_versions = StringBuilder::with_capacity(total_spans, total_spans * 8);
236 let mut res_attributes = StringBuilder::with_capacity(total_spans, total_spans * 64);
237 let mut scope_names = StringBuilder::with_capacity(total_spans, total_spans * 16);
238 let mut scope_versions = StringBuilder::with_capacity(total_spans, total_spans * 8);
239 let mut span_attrs = StringBuilder::with_capacity(total_spans, total_spans * 64);
240 let mut events_counts = Int32Builder::with_capacity(total_spans);
241 let mut links_counts = Int32Builder::with_capacity(total_spans);
242 let mut received_at = TimestampNanosecondBuilder::with_capacity(total_spans);
243
244 for resource_spans in &req.resource_spans {
245 let resource = resource_spans.resource.as_ref();
246 let svc_name = extract_resource_attr(resource, "service.name");
247 let svc_version = extract_resource_attr(resource, "service.version");
248 let res_attrs_json = resource_attributes_json(resource);
249
250 for scope_spans in &resource_spans.scope_spans {
251 let scope = scope_spans.scope.as_ref();
252 let scope_n = scope.map(|s| s.name.as_str());
253 let scope_v = scope.map(|s| s.version.as_str());
254
255 for span in &scope_spans.spans {
256 let tid = fixed_bytes(&span.trace_id, 16);
258 trace_ids.append_value(&tid)?;
259
260 let sid = fixed_bytes(&span.span_id, 8);
262 span_ids.append_value(&sid)?;
263
264 let psid = fixed_bytes(&span.parent_span_id, 8);
266 if span.parent_span_id.is_empty() || is_all_zeros(&psid) {
267 parent_span_ids.append_null();
268 } else {
269 parent_span_ids.append_value(&psid)?;
270 }
271
272 if span.trace_state.is_empty() {
274 trace_states.append_null();
275 } else {
276 trace_states.append_value(&span.trace_state);
277 }
278
279 names.append_value(&span.name);
281
282 kinds.append_value(span.kind);
284
285 #[allow(clippy::cast_possible_wrap)]
287 let start_ns = span.start_time_unix_nano as i64;
288 #[allow(clippy::cast_possible_wrap)]
289 let end_ns = span.end_time_unix_nano as i64;
290 start_times.append_value(start_ns);
291 end_times.append_value(end_ns);
292 durations.append_value(end_ns.saturating_sub(start_ns));
293
294 if let Some(status) = &span.status {
296 status_codes.append_value(status.code);
297 if status.message.is_empty() {
298 status_messages.append_null();
299 } else {
300 status_messages.append_value(&status.message);
301 }
302 } else {
303 status_codes.append_value(0); status_messages.append_null();
305 }
306
307 append_context_fields(
309 svc_name.as_deref(),
310 res_attrs_json.as_deref(),
311 scope_n,
312 &span.attributes,
313 received_at_nanos,
314 &mut res_service_names,
315 &mut res_attributes,
316 &mut scope_names,
317 &mut span_attrs,
318 &mut received_at,
319 );
320
321 append_nullable_opt(&mut res_service_versions, svc_version.as_deref());
323 append_nullable_opt(&mut scope_versions, scope_v);
324
325 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
327 let ec = span.events.len() as i32;
328 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
329 let lc = span.links.len() as i32;
330 events_counts.append_value(ec);
331 links_counts.append_value(lc);
332 }
333 }
334 }
335
336 let batch = RecordBatch::try_new(
337 Arc::clone(schema),
338 vec![
339 Arc::new(trace_ids.finish()),
340 Arc::new(span_ids.finish()),
341 Arc::new(parent_span_ids.finish()),
342 Arc::new(trace_states.finish()),
343 Arc::new(names.finish()),
344 Arc::new(kinds.finish()),
345 Arc::new(start_times.finish()),
346 Arc::new(end_times.finish()),
347 Arc::new(durations.finish()),
348 Arc::new(status_codes.finish()),
349 Arc::new(status_messages.finish()),
350 Arc::new(res_service_names.finish()),
351 Arc::new(res_service_versions.finish()),
352 Arc::new(res_attributes.finish()),
353 Arc::new(scope_names.finish()),
354 Arc::new(scope_versions.finish()),
355 Arc::new(span_attrs.finish()),
356 Arc::new(events_counts.finish()),
357 Arc::new(links_counts.finish()),
358 Arc::new(received_at.finish()),
359 ],
360 )?;
361
362 Ok(Some(batch))
363}
364
365fn any_value_to_body_string(v: Option<&AnyValue>) -> Option<String> {
368 use opentelemetry_proto::tonic::common::v1::any_value::Value;
369 v.and_then(|av| match &av.value {
370 Some(Value::StringValue(s)) => Some(s.clone()),
371 Some(Value::IntValue(i)) => Some(i.to_string()),
372 Some(Value::BoolValue(b)) => Some(b.to_string()),
373 Some(Value::DoubleValue(d)) => Some(d.to_string()),
374 Some(_) => {
375 let mut buf = String::new();
376 write_any_value_json(&mut buf, av);
377 Some(buf)
378 }
379 None => None,
380 })
381}
382
383struct MetricBuilders {
386 names: StringBuilder,
387 descs: StringBuilder,
388 units: StringBuilder,
389 types: Int32Builder,
390 timestamps: Int64Builder,
391 value_doubles: Float64Builder,
392 value_ints: Int64Builder,
393 hist_counts: UInt64Builder,
394 hist_sums: Float64Builder,
395 res_svc_names: StringBuilder,
396 res_attrs: StringBuilder,
397 scope_names: StringBuilder,
398 attrs: StringBuilder,
399 received_at: TimestampNanosecondBuilder,
400}
401
402struct MetricContext<'a> {
403 name: &'a str,
404 desc: &'a str,
405 unit: &'a str,
406 metric_type: i32,
407 svc_name: Option<&'a str>,
408 res_json: Option<&'a str>,
409 scope_name: Option<&'a str>,
410 received_at_nanos: i64,
411}
412
413impl MetricBuilders {
414 fn with_capacity(n: usize) -> Self {
415 Self {
416 names: StringBuilder::with_capacity(n, n * 32),
417 descs: StringBuilder::with_capacity(n, n * 32),
418 units: StringBuilder::with_capacity(n, n * 8),
419 types: Int32Builder::with_capacity(n),
420 timestamps: Int64Builder::with_capacity(n),
421 value_doubles: Float64Builder::with_capacity(n),
422 value_ints: Int64Builder::with_capacity(n),
423 hist_counts: UInt64Builder::with_capacity(n),
424 hist_sums: Float64Builder::with_capacity(n),
425 res_svc_names: StringBuilder::with_capacity(n, n * 16),
426 res_attrs: StringBuilder::with_capacity(n, n * 64),
427 scope_names: StringBuilder::with_capacity(n, n * 16),
428 attrs: StringBuilder::with_capacity(n, n * 64),
429 received_at: TimestampNanosecondBuilder::with_capacity(n),
430 }
431 }
432
433 fn append_common(&mut self, ctx: &MetricContext, time_unix_nano: u64, dp_attrs: &[KeyValue]) {
434 self.names.append_value(ctx.name);
435 append_nullable_str(&mut self.descs, ctx.desc);
436 append_nullable_str(&mut self.units, ctx.unit);
437 self.types.append_value(ctx.metric_type);
438 #[allow(clippy::cast_possible_wrap)]
439 self.timestamps.append_value(time_unix_nano as i64);
440
441 append_context_fields(
442 ctx.svc_name,
443 ctx.res_json,
444 ctx.scope_name,
445 dp_attrs,
446 ctx.received_at_nanos,
447 &mut self.res_svc_names,
448 &mut self.res_attrs,
449 &mut self.scope_names,
450 &mut self.attrs,
451 &mut self.received_at,
452 );
453 }
454
455 fn append_number_dp(
456 &mut self,
457 ctx: &MetricContext,
458 time_unix_nano: u64,
459 value: Option<&number_data_point::Value>,
460 dp_attrs: &[KeyValue],
461 ) {
462 self.append_common(ctx, time_unix_nano, dp_attrs);
463 match value {
464 Some(number_data_point::Value::AsDouble(d)) => {
465 self.value_doubles.append_value(*d);
466 self.value_ints.append_null();
467 }
468 Some(number_data_point::Value::AsInt(i)) => {
469 self.value_doubles.append_null();
470 self.value_ints.append_value(*i);
471 }
472 None => {
473 self.value_doubles.append_null();
474 self.value_ints.append_null();
475 }
476 }
477 self.hist_counts.append_null();
478 self.hist_sums.append_null();
479 }
480
481 fn append_histogram_dp(
482 &mut self,
483 ctx: &MetricContext,
484 time_unix_nano: u64,
485 count: u64,
486 sum: Option<f64>,
487 dp_attrs: &[KeyValue],
488 ) {
489 self.append_common(ctx, time_unix_nano, dp_attrs);
490 self.value_doubles.append_null();
491 self.value_ints.append_null();
492 self.hist_counts.append_value(count);
493 match sum {
494 Some(s) => self.hist_sums.append_value(s),
495 None => self.hist_sums.append_null(),
496 }
497 }
498
499 fn finish(mut self, schema: &SchemaRef) -> Result<RecordBatch, arrow_schema::ArrowError> {
500 RecordBatch::try_new(
501 Arc::clone(schema),
502 vec![
503 Arc::new(self.names.finish()),
504 Arc::new(self.descs.finish()),
505 Arc::new(self.units.finish()),
506 Arc::new(self.types.finish()),
507 Arc::new(self.timestamps.finish()),
508 Arc::new(self.value_doubles.finish()),
509 Arc::new(self.value_ints.finish()),
510 Arc::new(self.hist_counts.finish()),
511 Arc::new(self.hist_sums.finish()),
512 Arc::new(self.res_svc_names.finish()),
513 Arc::new(self.res_attrs.finish()),
514 Arc::new(self.scope_names.finish()),
515 Arc::new(self.attrs.finish()),
516 Arc::new(self.received_at.finish()),
517 ],
518 )
519 }
520}
521
522#[allow(clippy::too_many_lines)] pub fn metrics_request_to_batch(
529 req: &ExportMetricsServiceRequest,
530 schema: &SchemaRef,
531 received_at_nanos: i64,
532) -> Result<Option<RecordBatch>, arrow_schema::ArrowError> {
533 let total_points: usize = req
534 .resource_metrics
535 .iter()
536 .flat_map(|rm| &rm.scope_metrics)
537 .flat_map(|sm| &sm.metrics)
538 .map(count_metric_points)
539 .sum();
540
541 if total_points == 0 {
542 return Ok(None);
543 }
544
545 let mut b = MetricBuilders::with_capacity(total_points);
546
547 for rm in &req.resource_metrics {
548 let resource = rm.resource.as_ref();
549 let svc_name = extract_resource_attr(resource, "service.name");
550 let res_json = resource_attributes_json(resource);
551
552 for sm in &rm.scope_metrics {
553 let scope_name = sm.scope.as_ref().map(|s| s.name.as_str());
554
555 for metric in &sm.metrics {
556 let Some(data) = &metric.data else { continue };
557 let base = MetricContext {
558 name: &metric.name,
559 desc: &metric.description,
560 unit: &metric.unit,
561 metric_type: 0,
562 svc_name: svc_name.as_deref(),
563 res_json: res_json.as_deref(),
564 scope_name,
565 received_at_nanos,
566 };
567
568 match data {
569 metric::Data::Gauge(g) => {
570 for dp in &g.data_points {
571 b.append_number_dp(
572 &base,
573 dp.time_unix_nano,
574 dp.value.as_ref(),
575 &dp.attributes,
576 );
577 }
578 }
579 metric::Data::Sum(s) => {
580 for dp in &s.data_points {
581 let ctx = MetricContext {
582 metric_type: 1,
583 ..base
584 };
585 b.append_number_dp(
586 &ctx,
587 dp.time_unix_nano,
588 dp.value.as_ref(),
589 &dp.attributes,
590 );
591 }
592 }
593 metric::Data::Histogram(h) => {
594 for dp in &h.data_points {
595 let ctx = MetricContext {
596 metric_type: 2,
597 ..base
598 };
599 b.append_histogram_dp(
600 &ctx,
601 dp.time_unix_nano,
602 dp.count,
603 dp.sum,
604 &dp.attributes,
605 );
606 }
607 }
608 metric::Data::ExponentialHistogram(eh) => {
609 for dp in &eh.data_points {
610 let ctx = MetricContext {
611 metric_type: 3,
612 ..base
613 };
614 b.append_histogram_dp(
615 &ctx,
616 dp.time_unix_nano,
617 dp.count,
618 dp.sum,
619 &dp.attributes,
620 );
621 }
622 }
623 metric::Data::Summary(s) => {
624 for dp in &s.data_points {
625 let ctx = MetricContext {
626 metric_type: 4,
627 ..base
628 };
629 b.append_histogram_dp(
630 &ctx,
631 dp.time_unix_nano,
632 dp.count,
633 Some(dp.sum),
634 &dp.attributes,
635 );
636 }
637 }
638 }
639 }
640 }
641 }
642
643 Ok(Some(b.finish(schema)?))
644}
645
646fn count_metric_points(m: &opentelemetry_proto::tonic::metrics::v1::Metric) -> usize {
647 match &m.data {
648 Some(metric::Data::Gauge(g)) => g.data_points.len(),
649 Some(metric::Data::Sum(s)) => s.data_points.len(),
650 Some(metric::Data::Histogram(h)) => h.data_points.len(),
651 Some(metric::Data::ExponentialHistogram(eh)) => eh.data_points.len(),
652 Some(metric::Data::Summary(s)) => s.data_points.len(),
653 None => 0,
654 }
655}
656
657#[allow(clippy::too_many_arguments)]
659fn append_context_fields(
660 svc_name: Option<&str>,
661 res_json: Option<&str>,
662 scope_name: Option<&str>,
663 attrs: &[KeyValue],
664 received_at_nanos: i64,
665 res_svc_names: &mut StringBuilder,
666 res_attrs: &mut StringBuilder,
667 scope_names_col: &mut StringBuilder,
668 attrs_col: &mut StringBuilder,
669 received_at: &mut TimestampNanosecondBuilder,
670) {
671 match svc_name {
672 Some(s) => res_svc_names.append_value(s),
673 None => res_svc_names.append_null(),
674 }
675 match res_json {
676 Some(s) => res_attrs.append_value(s),
677 None => res_attrs.append_null(),
678 }
679 match scope_name {
680 Some(s) if !s.is_empty() => scope_names_col.append_value(s),
681 _ => scope_names_col.append_null(),
682 }
683 match kv_to_json(attrs) {
684 Some(s) => attrs_col.append_value(&s),
685 None => attrs_col.append_null(),
686 }
687 received_at.append_value(received_at_nanos);
688}
689
690fn append_nullable_str(builder: &mut StringBuilder, s: &str) {
692 if s.is_empty() {
693 builder.append_null();
694 } else {
695 builder.append_value(s);
696 }
697}
698
699fn append_nullable_opt(builder: &mut StringBuilder, s: Option<&str>) {
701 match s {
702 Some(v) if !v.is_empty() => builder.append_value(v),
703 _ => builder.append_null(),
704 }
705}
706
707pub fn logs_request_to_batch(
715 req: &ExportLogsServiceRequest,
716 schema: &SchemaRef,
717 received_at_nanos: i64,
718) -> Result<Option<RecordBatch>, arrow_schema::ArrowError> {
719 let total_records: usize = req
720 .resource_logs
721 .iter()
722 .flat_map(|rl| &rl.scope_logs)
723 .map(|sl| sl.log_records.len())
724 .sum();
725
726 if total_records == 0 {
727 return Ok(None);
728 }
729
730 let mut ts = Int64Builder::with_capacity(total_records);
731 let mut observed_ts = Int64Builder::with_capacity(total_records);
732 let mut sev_nums = Int32Builder::with_capacity(total_records);
733 let mut sev_texts = StringBuilder::with_capacity(total_records, total_records * 8);
734 let mut bodies = StringBuilder::with_capacity(total_records, total_records * 64);
735 let mut trace_ids = FixedSizeBinaryBuilder::with_capacity(total_records, 16);
736 let mut span_ids = FixedSizeBinaryBuilder::with_capacity(total_records, 8);
737 let mut res_svc_names = StringBuilder::with_capacity(total_records, total_records * 16);
738 let mut res_attrs = StringBuilder::with_capacity(total_records, total_records * 64);
739 let mut scope_names_col = StringBuilder::with_capacity(total_records, total_records * 16);
740 let mut attrs_col = StringBuilder::with_capacity(total_records, total_records * 64);
741 let mut received_at = TimestampNanosecondBuilder::with_capacity(total_records);
742
743 for rl in &req.resource_logs {
744 let resource = rl.resource.as_ref();
745 let svc_name = extract_resource_attr(resource, "service.name");
746 let res_json = resource_attributes_json(resource);
747
748 for sl in &rl.scope_logs {
749 let scope_name = sl.scope.as_ref().map(|s| s.name.as_str());
750
751 for log in &sl.log_records {
752 #[allow(clippy::cast_possible_wrap)]
753 ts.append_value(log.time_unix_nano as i64);
754
755 if log.observed_time_unix_nano == 0 {
756 observed_ts.append_null();
757 } else {
758 #[allow(clippy::cast_possible_wrap)]
759 observed_ts.append_value(log.observed_time_unix_nano as i64);
760 }
761
762 sev_nums.append_value(log.severity_number);
763
764 if log.severity_text.is_empty() {
765 sev_texts.append_null();
766 } else {
767 sev_texts.append_value(&log.severity_text);
768 }
769
770 match any_value_to_body_string(log.body.as_ref()) {
771 Some(s) => bodies.append_value(&s),
772 None => bodies.append_null(),
773 }
774
775 let tid = fixed_bytes(&log.trace_id, 16);
777 if log.trace_id.is_empty() || is_all_zeros(&tid) {
778 trace_ids.append_null();
779 } else {
780 trace_ids.append_value(&tid)?;
781 }
782
783 let sid = fixed_bytes(&log.span_id, 8);
785 if log.span_id.is_empty() || is_all_zeros(&sid) {
786 span_ids.append_null();
787 } else {
788 span_ids.append_value(&sid)?;
789 }
790
791 append_context_fields(
792 svc_name.as_deref(),
793 res_json.as_deref(),
794 scope_name,
795 &log.attributes,
796 received_at_nanos,
797 &mut res_svc_names,
798 &mut res_attrs,
799 &mut scope_names_col,
800 &mut attrs_col,
801 &mut received_at,
802 );
803 }
804 }
805 }
806
807 let batch = RecordBatch::try_new(
808 Arc::clone(schema),
809 vec![
810 Arc::new(ts.finish()),
811 Arc::new(observed_ts.finish()),
812 Arc::new(sev_nums.finish()),
813 Arc::new(sev_texts.finish()),
814 Arc::new(bodies.finish()),
815 Arc::new(trace_ids.finish()),
816 Arc::new(span_ids.finish()),
817 Arc::new(res_svc_names.finish()),
818 Arc::new(res_attrs.finish()),
819 Arc::new(scope_names_col.finish()),
820 Arc::new(attrs_col.finish()),
821 Arc::new(received_at.finish()),
822 ],
823 )?;
824
825 Ok(Some(batch))
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831 use crate::otel::schema::traces_schema;
832 use arrow_array::Array;
833 use arrow_schema::SchemaRef;
834 use opentelemetry_proto::tonic::common::v1::{
835 any_value, AnyValue as ProtoAnyValue, InstrumentationScope,
836 };
837 use opentelemetry_proto::tonic::resource::v1::Resource as ProtoResource;
838 use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status};
839
840 fn schema() -> SchemaRef {
841 traces_schema()
842 }
843
844 fn make_kv(key: &str, val: &str) -> KeyValue {
845 KeyValue {
846 key: key.to_string(),
847 value: Some(ProtoAnyValue {
848 value: Some(any_value::Value::StringValue(val.to_string())),
849 }),
850 }
851 }
852
853 fn make_kv_int(key: &str, val: i64) -> KeyValue {
854 KeyValue {
855 key: key.to_string(),
856 value: Some(ProtoAnyValue {
857 value: Some(any_value::Value::IntValue(val)),
858 }),
859 }
860 }
861
862 fn make_test_request(num_spans: usize) -> ExportTraceServiceRequest {
863 let spans: Vec<Span> = (0..num_spans)
864 .map(|i| {
865 let mut trace_id = vec![0u8; 16];
866 trace_id[15] = (i + 1) as u8;
867 let mut span_id = vec![0u8; 8];
868 span_id[7] = (i + 1) as u8;
869
870 Span {
871 trace_id,
872 span_id,
873 parent_span_id: vec![],
874 trace_state: String::new(),
875 name: format!("span-{i}"),
876 kind: 1, start_time_unix_nano: 1_000_000_000 * (i as u64 + 1),
878 end_time_unix_nano: 1_000_000_000 * (i as u64 + 1) + 500_000,
879 attributes: vec![make_kv("http.method", "GET")],
880 dropped_attributes_count: 0,
881 events: vec![],
882 dropped_events_count: 0,
883 links: vec![],
884 dropped_links_count: 0,
885 status: Some(Status {
886 message: String::new(),
887 code: 0,
888 }),
889 flags: 0,
890 }
891 })
892 .collect();
893
894 ExportTraceServiceRequest {
895 resource_spans: vec![ResourceSpans {
896 resource: Some(ProtoResource {
897 attributes: vec![
898 make_kv("service.name", "test-svc"),
899 make_kv("service.version", "1.0.0"),
900 make_kv("host.name", "test-host"),
901 ],
902 dropped_attributes_count: 0,
903 entity_refs: vec![],
904 }),
905 scope_spans: vec![ScopeSpans {
906 scope: Some(InstrumentationScope {
907 name: "my-lib".to_string(),
908 version: "0.1.0".to_string(),
909 attributes: vec![],
910 dropped_attributes_count: 0,
911 }),
912 spans,
913 schema_url: String::new(),
914 }],
915 schema_url: String::new(),
916 }],
917 }
918 }
919
920 #[test]
921 fn test_empty_request() {
922 let req = ExportTraceServiceRequest {
923 resource_spans: vec![],
924 };
925 let result = trace_request_to_batch(&req, &schema(), 0).unwrap();
926 assert!(result.is_none());
927 }
928
929 #[test]
930 fn test_single_span() {
931 let req = make_test_request(1);
932 let batch = trace_request_to_batch(&req, &schema(), 999)
933 .unwrap()
934 .unwrap();
935 assert_eq!(batch.num_rows(), 1);
936 assert_eq!(batch.num_columns(), 20);
937 }
938
939 #[test]
940 fn test_multi_span() {
941 let req = make_test_request(10);
942 let batch = trace_request_to_batch(&req, &schema(), 999)
943 .unwrap()
944 .unwrap();
945 assert_eq!(batch.num_rows(), 10);
946 }
947
948 #[test]
949 fn test_large_batch() {
950 let req = make_test_request(10_000);
951 let batch = trace_request_to_batch(&req, &schema(), 999)
952 .unwrap()
953 .unwrap();
954 assert_eq!(batch.num_rows(), 10_000);
955 }
956
957 #[test]
958 fn test_schema_matches() {
959 let req = make_test_request(1);
960 let batch = trace_request_to_batch(&req, &schema(), 999)
961 .unwrap()
962 .unwrap();
963 assert_eq!(batch.schema(), traces_schema());
964 }
965
966 #[test]
967 fn test_trace_id_encoding() {
968 let req = make_test_request(1);
969 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
970
971 let col = batch
972 .column(0)
973 .as_any()
974 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
975 .unwrap();
976 let tid = col.value(0);
977 assert_eq!(tid.len(), 16);
979 assert_eq!(tid[15], 1);
980 }
981
982 #[test]
983 fn test_parent_span_id_null() {
984 let req = make_test_request(1);
985 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
986
987 let col = batch
988 .column(2)
989 .as_any()
990 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
991 .unwrap();
992 assert!(col.is_null(0)); }
994
995 #[test]
996 fn test_duration_computed() {
997 let req = make_test_request(1);
998 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
999
1000 let dur_col = batch
1001 .column(8)
1002 .as_any()
1003 .downcast_ref::<arrow_array::Int64Array>()
1004 .unwrap();
1005 assert_eq!(dur_col.value(0), 500_000); }
1007
1008 #[test]
1009 fn test_service_name_promoted() {
1010 let req = make_test_request(1);
1011 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1012
1013 let col = batch
1014 .column(11)
1015 .as_any()
1016 .downcast_ref::<arrow_array::StringArray>()
1017 .unwrap();
1018 assert_eq!(col.value(0), "test-svc");
1019 }
1020
1021 #[test]
1022 fn test_resource_attrs_exclude_promoted() {
1023 let req = make_test_request(1);
1024 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1025
1026 let col = batch
1027 .column(13)
1028 .as_any()
1029 .downcast_ref::<arrow_array::StringArray>()
1030 .unwrap();
1031 let json = col.value(0);
1032 assert!(json.contains("host.name"));
1034 assert!(!json.contains("service.name"));
1035 assert!(!json.contains("service.version"));
1036 }
1037
1038 #[test]
1039 fn test_attributes_json() {
1040 let req = make_test_request(1);
1041 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1042
1043 let col = batch
1044 .column(16)
1045 .as_any()
1046 .downcast_ref::<arrow_array::StringArray>()
1047 .unwrap();
1048 let json = col.value(0);
1049 assert!(json.contains("http.method"));
1050 assert!(json.contains("GET"));
1051 }
1052
1053 #[test]
1054 fn test_scope_fields() {
1055 let req = make_test_request(1);
1056 let batch = trace_request_to_batch(&req, &schema(), 0).unwrap().unwrap();
1057
1058 let name_col = batch
1059 .column(14)
1060 .as_any()
1061 .downcast_ref::<arrow_array::StringArray>()
1062 .unwrap();
1063 assert_eq!(name_col.value(0), "my-lib");
1064
1065 let ver_col = batch
1066 .column(15)
1067 .as_any()
1068 .downcast_ref::<arrow_array::StringArray>()
1069 .unwrap();
1070 assert_eq!(ver_col.value(0), "0.1.0");
1071 }
1072
1073 #[test]
1074 fn test_received_at() {
1075 let req = make_test_request(3);
1076 let batch = trace_request_to_batch(&req, &schema(), 42)
1077 .unwrap()
1078 .unwrap();
1079
1080 let col = batch
1081 .column(19)
1082 .as_any()
1083 .downcast_ref::<arrow_array::TimestampNanosecondArray>()
1084 .unwrap();
1085 for i in 0..3 {
1086 assert_eq!(col.value(i), 42);
1087 }
1088 }
1089
1090 #[test]
1091 fn test_int_attribute_to_string() {
1092 let kv = make_kv_int("retries", 3);
1093 let s = any_value_to_string(kv.value.as_ref());
1094 assert_eq!(s, Some("3".to_string()));
1095 }
1096
1097 #[test]
1098 fn test_json_escaping() {
1099 let mut buf = String::new();
1100 write_json_string(&mut buf, "hello \"world\"\nline2");
1101 assert_eq!(buf, r#""hello \"world\"\nline2""#);
1102 }
1103
1104 use crate::otel::schema::metrics_schema;
1107 use opentelemetry_proto::tonic::metrics::v1::{
1108 Gauge, Histogram, HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics,
1109 ScopeMetrics, Sum,
1110 };
1111
1112 fn metrics_schema_ref() -> SchemaRef {
1113 metrics_schema()
1114 }
1115
1116 fn make_gauge_metric(name: &str, value: f64, ts: u64) -> Metric {
1117 Metric {
1118 name: name.to_string(),
1119 description: "test gauge".to_string(),
1120 unit: "ms".to_string(),
1121 metadata: vec![],
1122 data: Some(metric::Data::Gauge(Gauge {
1123 data_points: vec![NumberDataPoint {
1124 attributes: vec![make_kv("host", "web-1")],
1125 start_time_unix_nano: 0,
1126 time_unix_nano: ts,
1127 exemplars: vec![],
1128 flags: 0,
1129 value: Some(number_data_point::Value::AsDouble(value)),
1130 }],
1131 })),
1132 }
1133 }
1134
1135 fn make_metrics_request(metrics: Vec<Metric>) -> ExportMetricsServiceRequest {
1136 ExportMetricsServiceRequest {
1137 resource_metrics: vec![ResourceMetrics {
1138 resource: Some(ProtoResource {
1139 attributes: vec![make_kv("service.name", "metrics-svc")],
1140 dropped_attributes_count: 0,
1141 entity_refs: vec![],
1142 }),
1143 scope_metrics: vec![ScopeMetrics {
1144 scope: Some(InstrumentationScope {
1145 name: "meter".to_string(),
1146 version: "1.0".to_string(),
1147 attributes: vec![],
1148 dropped_attributes_count: 0,
1149 }),
1150 metrics,
1151 schema_url: String::new(),
1152 }],
1153 schema_url: String::new(),
1154 }],
1155 }
1156 }
1157
1158 #[test]
1159 fn test_metrics_empty_request() {
1160 let req = ExportMetricsServiceRequest {
1161 resource_metrics: vec![],
1162 };
1163 assert!(metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1164 .unwrap()
1165 .is_none());
1166 }
1167
1168 #[test]
1169 fn test_metrics_gauge_double() {
1170 let req = make_metrics_request(vec![make_gauge_metric("cpu.usage", 42.5, 1000)]);
1171 let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 99)
1172 .unwrap()
1173 .unwrap();
1174 assert_eq!(batch.num_rows(), 1);
1175 assert_eq!(batch.num_columns(), 14);
1176
1177 let names = batch
1179 .column(0)
1180 .as_any()
1181 .downcast_ref::<arrow_array::StringArray>()
1182 .unwrap();
1183 assert_eq!(names.value(0), "cpu.usage");
1184
1185 let types = batch
1187 .column(3)
1188 .as_any()
1189 .downcast_ref::<arrow_array::Int32Array>()
1190 .unwrap();
1191 assert_eq!(types.value(0), 0);
1192
1193 let vd = batch
1195 .column(5)
1196 .as_any()
1197 .downcast_ref::<arrow_array::Float64Array>()
1198 .unwrap();
1199 assert!((vd.value(0) - 42.5).abs() < f64::EPSILON);
1200
1201 let vi = batch
1203 .column(6)
1204 .as_any()
1205 .downcast_ref::<arrow_array::Int64Array>()
1206 .unwrap();
1207 assert!(vi.is_null(0));
1208
1209 assert!(batch
1211 .column(7)
1212 .as_any()
1213 .downcast_ref::<arrow_array::UInt64Array>()
1214 .unwrap()
1215 .is_null(0));
1216 }
1217
1218 #[test]
1219 fn test_metrics_sum_int() {
1220 let metric = Metric {
1221 name: "requests".to_string(),
1222 description: String::new(),
1223 unit: "1".to_string(),
1224 metadata: vec![],
1225 data: Some(metric::Data::Sum(Sum {
1226 data_points: vec![NumberDataPoint {
1227 attributes: vec![],
1228 start_time_unix_nano: 0,
1229 time_unix_nano: 2000,
1230 exemplars: vec![],
1231 flags: 0,
1232 value: Some(number_data_point::Value::AsInt(100)),
1233 }],
1234 aggregation_temporality: 2,
1235 is_monotonic: true,
1236 })),
1237 };
1238 let req = make_metrics_request(vec![metric]);
1239 let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1240 .unwrap()
1241 .unwrap();
1242
1243 let types = batch
1245 .column(3)
1246 .as_any()
1247 .downcast_ref::<arrow_array::Int32Array>()
1248 .unwrap();
1249 assert_eq!(types.value(0), 1);
1250
1251 let vi = batch
1253 .column(6)
1254 .as_any()
1255 .downcast_ref::<arrow_array::Int64Array>()
1256 .unwrap();
1257 assert_eq!(vi.value(0), 100);
1258
1259 assert!(batch
1261 .column(5)
1262 .as_any()
1263 .downcast_ref::<arrow_array::Float64Array>()
1264 .unwrap()
1265 .is_null(0));
1266 }
1267
1268 #[test]
1269 fn test_metrics_histogram() {
1270 let metric = Metric {
1271 name: "latency".to_string(),
1272 description: String::new(),
1273 unit: "ms".to_string(),
1274 metadata: vec![],
1275 data: Some(metric::Data::Histogram(Histogram {
1276 data_points: vec![HistogramDataPoint {
1277 attributes: vec![],
1278 start_time_unix_nano: 0,
1279 time_unix_nano: 3000,
1280 count: 42,
1281 sum: Some(1234.5),
1282 bucket_counts: vec![10, 20, 12],
1283 explicit_bounds: vec![10.0, 50.0],
1284 exemplars: vec![],
1285 flags: 0,
1286 min: None,
1287 max: None,
1288 }],
1289 aggregation_temporality: 1,
1290 })),
1291 };
1292 let req = make_metrics_request(vec![metric]);
1293 let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1294 .unwrap()
1295 .unwrap();
1296
1297 let types = batch
1299 .column(3)
1300 .as_any()
1301 .downcast_ref::<arrow_array::Int32Array>()
1302 .unwrap();
1303 assert_eq!(types.value(0), 2);
1304
1305 let hc = batch
1307 .column(7)
1308 .as_any()
1309 .downcast_ref::<arrow_array::UInt64Array>()
1310 .unwrap();
1311 assert_eq!(hc.value(0), 42);
1312
1313 let hs = batch
1315 .column(8)
1316 .as_any()
1317 .downcast_ref::<arrow_array::Float64Array>()
1318 .unwrap();
1319 assert!((hs.value(0) - 1234.5).abs() < f64::EPSILON);
1320 }
1321
1322 #[test]
1323 fn test_metrics_schema_matches() {
1324 let req = make_metrics_request(vec![make_gauge_metric("x", 1.0, 0)]);
1325 let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1326 .unwrap()
1327 .unwrap();
1328 assert_eq!(batch.schema(), metrics_schema());
1329 }
1330
1331 #[test]
1332 fn test_metrics_service_name() {
1333 let req = make_metrics_request(vec![make_gauge_metric("x", 1.0, 0)]);
1334 let batch = metrics_request_to_batch(&req, &metrics_schema_ref(), 0)
1335 .unwrap()
1336 .unwrap();
1337 let col = batch
1338 .column(9)
1339 .as_any()
1340 .downcast_ref::<arrow_array::StringArray>()
1341 .unwrap();
1342 assert_eq!(col.value(0), "metrics-svc");
1343 }
1344
1345 use crate::otel::schema::logs_schema;
1348 use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
1349
1350 fn logs_schema_ref() -> SchemaRef {
1351 logs_schema()
1352 }
1353
1354 fn make_log_record(body: &str, severity: i32, ts: u64) -> LogRecord {
1355 LogRecord {
1356 time_unix_nano: ts,
1357 observed_time_unix_nano: ts + 1000,
1358 severity_number: severity,
1359 severity_text: "INFO".to_string(),
1360 body: Some(ProtoAnyValue {
1361 value: Some(any_value::Value::StringValue(body.to_string())),
1362 }),
1363 attributes: vec![make_kv("env", "prod")],
1364 dropped_attributes_count: 0,
1365 flags: 0,
1366 trace_id: vec![],
1367 span_id: vec![],
1368 event_name: String::new(),
1369 }
1370 }
1371
1372 fn make_logs_request(records: Vec<LogRecord>) -> ExportLogsServiceRequest {
1373 ExportLogsServiceRequest {
1374 resource_logs: vec![ResourceLogs {
1375 resource: Some(ProtoResource {
1376 attributes: vec![make_kv("service.name", "log-svc")],
1377 dropped_attributes_count: 0,
1378 entity_refs: vec![],
1379 }),
1380 scope_logs: vec![ScopeLogs {
1381 scope: Some(InstrumentationScope {
1382 name: "logger".to_string(),
1383 version: "0.1".to_string(),
1384 attributes: vec![],
1385 dropped_attributes_count: 0,
1386 }),
1387 log_records: records,
1388 schema_url: String::new(),
1389 }],
1390 schema_url: String::new(),
1391 }],
1392 }
1393 }
1394
1395 #[test]
1396 fn test_logs_empty_request() {
1397 let req = ExportLogsServiceRequest {
1398 resource_logs: vec![],
1399 };
1400 assert!(logs_request_to_batch(&req, &logs_schema_ref(), 0)
1401 .unwrap()
1402 .is_none());
1403 }
1404
1405 #[test]
1406 fn test_logs_single_record() {
1407 let req = make_logs_request(vec![make_log_record("hello world", 9, 5000)]);
1408 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 77)
1409 .unwrap()
1410 .unwrap();
1411 assert_eq!(batch.num_rows(), 1);
1412 assert_eq!(batch.num_columns(), 12);
1413 }
1414
1415 #[test]
1416 fn test_logs_body_string() {
1417 let req = make_logs_request(vec![make_log_record("test message", 9, 0)]);
1418 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1419 .unwrap()
1420 .unwrap();
1421 let col = batch
1422 .column(4)
1423 .as_any()
1424 .downcast_ref::<arrow_array::StringArray>()
1425 .unwrap();
1426 assert_eq!(col.value(0), "test message");
1427 }
1428
1429 #[test]
1430 fn test_logs_severity() {
1431 let req = make_logs_request(vec![make_log_record("err", 17, 0)]);
1432 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1433 .unwrap()
1434 .unwrap();
1435 let sev = batch
1436 .column(2)
1437 .as_any()
1438 .downcast_ref::<arrow_array::Int32Array>()
1439 .unwrap();
1440 assert_eq!(sev.value(0), 17); let text = batch
1442 .column(3)
1443 .as_any()
1444 .downcast_ref::<arrow_array::StringArray>()
1445 .unwrap();
1446 assert_eq!(text.value(0), "INFO");
1447 }
1448
1449 #[test]
1450 fn test_logs_trace_correlation() {
1451 let mut record = make_log_record("with trace", 9, 0);
1452 record.trace_id = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
1453 record.span_id = vec![0, 0, 0, 0, 0, 0, 0, 2];
1454 let req = make_logs_request(vec![record]);
1455 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1456 .unwrap()
1457 .unwrap();
1458
1459 let tid = batch
1460 .column(5)
1461 .as_any()
1462 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1463 .unwrap();
1464 assert!(!tid.is_null(0));
1465 assert_eq!(tid.value(0)[15], 1);
1466
1467 let sid = batch
1468 .column(6)
1469 .as_any()
1470 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1471 .unwrap();
1472 assert!(!sid.is_null(0));
1473 assert_eq!(sid.value(0)[7], 2);
1474 }
1475
1476 #[test]
1477 fn test_logs_null_trace_ids() {
1478 let req = make_logs_request(vec![make_log_record("no trace", 9, 0)]);
1479 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1480 .unwrap()
1481 .unwrap();
1482 let tid = batch
1483 .column(5)
1484 .as_any()
1485 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1486 .unwrap();
1487 assert!(tid.is_null(0));
1488 let sid = batch
1489 .column(6)
1490 .as_any()
1491 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1492 .unwrap();
1493 assert!(sid.is_null(0));
1494 }
1495
1496 #[test]
1497 fn test_logs_observed_timestamp() {
1498 let mut record = make_log_record("ts test", 9, 5000);
1499 record.observed_time_unix_nano = 0;
1500 let req = make_logs_request(vec![record]);
1501 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1502 .unwrap()
1503 .unwrap();
1504 let obs = batch
1505 .column(1)
1506 .as_any()
1507 .downcast_ref::<arrow_array::Int64Array>()
1508 .unwrap();
1509 assert!(obs.is_null(0));
1510 }
1511
1512 #[test]
1513 fn test_logs_schema_matches() {
1514 let req = make_logs_request(vec![make_log_record("x", 9, 0)]);
1515 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1516 .unwrap()
1517 .unwrap();
1518 assert_eq!(batch.schema(), logs_schema());
1519 }
1520
1521 #[test]
1522 fn test_logs_service_name() {
1523 let req = make_logs_request(vec![make_log_record("x", 9, 0)]);
1524 let batch = logs_request_to_batch(&req, &logs_schema_ref(), 0)
1525 .unwrap()
1526 .unwrap();
1527 let col = batch
1528 .column(7)
1529 .as_any()
1530 .downcast_ref::<arrow_array::StringArray>()
1531 .unwrap();
1532 assert_eq!(col.value(0), "log-svc");
1533 }
1534}