Skip to main content

laminar_connectors/otel/
schema.rs

1//! Arrow schemas for OTel signal types.
2//!
3//! Each schema flattens the nested OTLP protobuf hierarchy
4//! (Resource → Scope → Span/DataPoint/LogRecord) into flat columns.
5
6use std::sync::Arc;
7
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9
10/// Trace span schema.
11#[must_use]
12pub fn traces_schema() -> SchemaRef {
13    Arc::new(Schema::new(vec![
14        Field::new("trace_id", DataType::FixedSizeBinary(16), false),
15        Field::new("span_id", DataType::FixedSizeBinary(8), false),
16        Field::new("parent_span_id", DataType::FixedSizeBinary(8), true),
17        Field::new("trace_state", DataType::Utf8, true),
18        Field::new("name", DataType::Utf8, false),
19        Field::new("kind", DataType::Int32, false),
20        Field::new("start_time_unix_nano", DataType::Int64, false),
21        Field::new("end_time_unix_nano", DataType::Int64, false),
22        Field::new("duration_ns", DataType::Int64, false),
23        Field::new("status_code", DataType::Int32, false),
24        Field::new("status_message", DataType::Utf8, true),
25        Field::new("resource_service_name", DataType::Utf8, true),
26        Field::new("resource_service_version", DataType::Utf8, true),
27        Field::new("resource_attributes", DataType::Utf8, true),
28        Field::new("scope_name", DataType::Utf8, true),
29        Field::new("scope_version", DataType::Utf8, true),
30        Field::new("attributes", DataType::Utf8, true),
31        Field::new("events_count", DataType::Int32, false),
32        Field::new("links_count", DataType::Int32, false),
33        Field::new(
34            "_laminar_received_at",
35            DataType::Timestamp(TimeUnit::Nanosecond, None),
36            false,
37        ),
38    ]))
39}
40
41/// Metric data point schema.
42#[must_use]
43pub fn metrics_schema() -> SchemaRef {
44    Arc::new(Schema::new(vec![
45        Field::new("metric_name", DataType::Utf8, false),
46        Field::new("metric_description", DataType::Utf8, true),
47        Field::new("metric_unit", DataType::Utf8, true),
48        Field::new("metric_type", DataType::Int32, false),
49        Field::new("timestamp_unix_nano", DataType::Int64, false),
50        Field::new("value_double", DataType::Float64, true),
51        Field::new("value_int", DataType::Int64, true),
52        Field::new("histogram_count", DataType::UInt64, true),
53        Field::new("histogram_sum", DataType::Float64, true),
54        Field::new("resource_service_name", DataType::Utf8, true),
55        Field::new("resource_attributes", DataType::Utf8, true),
56        Field::new("scope_name", DataType::Utf8, true),
57        Field::new("attributes", DataType::Utf8, true),
58        Field::new(
59            "_laminar_received_at",
60            DataType::Timestamp(TimeUnit::Nanosecond, None),
61            false,
62        ),
63    ]))
64}
65
66/// Log record schema.
67#[must_use]
68pub fn logs_schema() -> SchemaRef {
69    Arc::new(Schema::new(vec![
70        Field::new("timestamp_unix_nano", DataType::Int64, false),
71        Field::new("observed_timestamp_unix_nano", DataType::Int64, true),
72        Field::new("severity_number", DataType::Int32, false),
73        Field::new("severity_text", DataType::Utf8, true),
74        Field::new("body_string", DataType::Utf8, true),
75        Field::new("trace_id", DataType::FixedSizeBinary(16), true),
76        Field::new("span_id", DataType::FixedSizeBinary(8), true),
77        Field::new("resource_service_name", DataType::Utf8, true),
78        Field::new("resource_attributes", DataType::Utf8, true),
79        Field::new("scope_name", DataType::Utf8, true),
80        Field::new("attributes", DataType::Utf8, true),
81        Field::new(
82            "_laminar_received_at",
83            DataType::Timestamp(TimeUnit::Nanosecond, None),
84            false,
85        ),
86    ]))
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn test_traces_schema_field_count() {
95        let schema = traces_schema();
96        assert_eq!(schema.fields().len(), 20);
97    }
98
99    #[test]
100    fn test_traces_schema_trace_id_type() {
101        let schema = traces_schema();
102        let field = schema.field_with_name("trace_id").unwrap();
103        assert_eq!(*field.data_type(), DataType::FixedSizeBinary(16));
104        assert!(!field.is_nullable());
105    }
106
107    #[test]
108    fn test_traces_schema_span_id_type() {
109        let schema = traces_schema();
110        let field = schema.field_with_name("span_id").unwrap();
111        assert_eq!(*field.data_type(), DataType::FixedSizeBinary(8));
112        assert!(!field.is_nullable());
113    }
114
115    #[test]
116    fn test_traces_schema_parent_nullable() {
117        let schema = traces_schema();
118        let field = schema.field_with_name("parent_span_id").unwrap();
119        assert!(field.is_nullable());
120    }
121
122    #[test]
123    fn test_traces_schema_timestamps() {
124        let schema = traces_schema();
125        for col in ["start_time_unix_nano", "end_time_unix_nano", "duration_ns"] {
126            let field = schema.field_with_name(col).unwrap();
127            assert_eq!(*field.data_type(), DataType::Int64, "column {col}");
128        }
129        let received_at = schema.field_with_name("_laminar_received_at").unwrap();
130        assert_eq!(
131            *received_at.data_type(),
132            DataType::Timestamp(TimeUnit::Nanosecond, None)
133        );
134    }
135
136    #[test]
137    fn test_metrics_schema_field_count() {
138        let schema = metrics_schema();
139        assert_eq!(schema.fields().len(), 14);
140    }
141
142    #[test]
143    fn test_logs_schema_field_count() {
144        let schema = logs_schema();
145        assert_eq!(schema.fields().len(), 12);
146    }
147
148    #[test]
149    fn test_logs_schema_trace_correlation() {
150        let schema = logs_schema();
151        let trace_id = schema.field_with_name("trace_id").unwrap();
152        assert_eq!(*trace_id.data_type(), DataType::FixedSizeBinary(16));
153        assert!(trace_id.is_nullable());
154        let span_id = schema.field_with_name("span_id").unwrap();
155        assert_eq!(*span_id.data_type(), DataType::FixedSizeBinary(8));
156        assert!(span_id.is_nullable());
157    }
158}