1use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use foyer::{Cache, CacheBuilder};
11
12use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15
16use crate::error::{ConnectorError, SerdeError};
17use crate::kafka::config::{CompatibilityLevel, SrAuth};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SchemaType {
22 Avro,
24 Protobuf,
26 Json,
28}
29
30impl std::str::FromStr for SchemaType {
31 type Err = ConnectorError;
32
33 fn from_str(s: &str) -> Result<Self, Self::Err> {
34 match s.to_uppercase().as_str() {
35 "AVRO" => Ok(SchemaType::Avro),
36 "PROTOBUF" => Ok(SchemaType::Protobuf),
37 "JSON" => Ok(SchemaType::Json),
38 other => Err(ConnectorError::ConfigurationError(format!(
39 "unknown schema type: '{other}'"
40 ))),
41 }
42 }
43}
44
45impl std::fmt::Display for SchemaType {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 SchemaType::Avro => write!(f, "AVRO"),
49 SchemaType::Protobuf => write!(f, "PROTOBUF"),
50 SchemaType::Json => write!(f, "JSON"),
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct SchemaRegistryCacheConfig {
58 pub max_entries: usize,
60 pub ttl: Option<Duration>,
62}
63
64impl Default for SchemaRegistryCacheConfig {
65 fn default() -> Self {
66 Self {
67 max_entries: 1000,
68 ttl: Some(Duration::from_secs(3600)),
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct CachedSchema {
76 pub id: i32,
78 pub version: i32,
80 pub schema_type: SchemaType,
82 pub schema_str: String,
84 pub arrow_schema: SchemaRef,
86 inserted_at: Instant,
88}
89
90#[derive(Debug, Clone)]
92pub struct CompatibilityResult {
93 pub is_compatible: bool,
95 pub messages: Vec<String>,
97}
98
99pub struct SchemaRegistryClient {
104 client: Client,
105 base_url: String,
106 auth: Option<SrAuth>,
107 cache: Cache<i32, CachedSchema>,
109 subject_cache: Cache<String, CachedSchema>,
111 cache_config: SchemaRegistryCacheConfig,
113}
114
115#[derive(Deserialize)]
118struct SchemaByIdResponse {
119 schema: String,
120 #[serde(default = "default_schema_type")]
121 #[serde(rename = "schemaType")]
122 schema_type: String,
123}
124
125#[derive(Deserialize)]
126struct SchemaVersionResponse {
127 id: i32,
128 version: i32,
129 schema: String,
130 #[serde(default = "default_schema_type")]
131 #[serde(rename = "schemaType")]
132 schema_type: String,
133 #[allow(dead_code)]
134 subject: Option<String>,
135}
136
137#[derive(Deserialize)]
138struct CompatibilityResponse {
139 is_compatible: bool,
140 #[serde(default)]
141 messages: Vec<String>,
142}
143
144#[derive(Deserialize)]
145struct ConfigResponse {
146 #[serde(rename = "compatibilityLevel")]
147 compatibility_level: String,
148}
149
150#[derive(Serialize)]
151struct CompatibilityRequest {
152 schema: String,
153 #[serde(rename = "schemaType")]
154 schema_type: String,
155}
156
157#[derive(Serialize)]
158struct ConfigUpdateRequest {
159 compatibility: String,
160}
161
162#[derive(Serialize)]
163struct RegisterSchemaRequest {
164 schema: String,
165 #[serde(rename = "schemaType")]
166 schema_type: String,
167}
168
169#[derive(Deserialize)]
170struct RegisterSchemaResponse {
171 id: i32,
172}
173
174fn default_schema_type() -> String {
175 "AVRO".to_string()
176}
177
178impl SchemaRegistryClient {
179 #[must_use]
181 pub fn new(base_url: impl Into<String>, auth: Option<SrAuth>) -> Self {
182 Self::with_cache_config(base_url, auth, SchemaRegistryCacheConfig::default())
183 }
184
185 pub fn with_tls(
191 base_url: impl Into<String>,
192 auth: Option<SrAuth>,
193 ca_cert_path: &str,
194 ) -> Result<Self, ConnectorError> {
195 Self::with_tls_mtls(base_url, auth, ca_cert_path, None, None)
196 }
197
198 pub fn with_tls_mtls(
205 base_url: impl Into<String>,
206 auth: Option<SrAuth>,
207 ca_cert_path: &str,
208 client_cert_path: Option<&str>,
209 client_key_path: Option<&str>,
210 ) -> Result<Self, ConnectorError> {
211 let pem = std::fs::read(ca_cert_path).map_err(|e| {
212 ConnectorError::ConfigurationError(format!(
213 "failed to read SR CA cert at '{ca_cert_path}': {e}"
214 ))
215 })?;
216 let cert = reqwest::tls::Certificate::from_pem(&pem).map_err(|e| {
217 ConnectorError::ConfigurationError(format!(
218 "invalid PEM CA cert at '{ca_cert_path}': {e}"
219 ))
220 })?;
221
222 let mut builder = Client::builder().add_root_certificate(cert);
223
224 if client_cert_path.is_some() != client_key_path.is_some() {
225 return Err(ConnectorError::ConfigurationError(
226 "mTLS requires both client cert and key — only one was provided".into(),
227 ));
228 }
229 if let (Some(cert_path), Some(key_path)) = (client_cert_path, client_key_path) {
230 let mut identity_pem = std::fs::read(cert_path).map_err(|e| {
231 ConnectorError::ConfigurationError(format!(
232 "failed to read SR client cert at '{cert_path}': {e}"
233 ))
234 })?;
235 let key_pem = std::fs::read(key_path).map_err(|e| {
236 ConnectorError::ConfigurationError(format!(
237 "failed to read SR client key at '{key_path}': {e}"
238 ))
239 })?;
240 identity_pem.extend_from_slice(&key_pem);
242 let identity = reqwest::tls::Identity::from_pem(&identity_pem).map_err(|e| {
243 ConnectorError::ConfigurationError(format!("invalid client cert/key PEM: {e}"))
244 })?;
245 builder = builder.identity(identity);
246 }
247
248 let client = builder.build().map_err(|e| {
249 ConnectorError::ConfigurationError(format!("failed to build TLS client: {e}"))
250 })?;
251
252 let cache_config = SchemaRegistryCacheConfig::default();
253 let cache = CacheBuilder::new(cache_config.max_entries)
254 .with_shards(4)
255 .build();
256 let subject_cache = CacheBuilder::new(256).with_shards(4).build();
257 Ok(Self {
258 client,
259 base_url: base_url.into().trim_end_matches('/').to_string(),
260 auth,
261 cache,
262 subject_cache,
263 cache_config,
264 })
265 }
266
267 #[must_use]
269 pub fn with_cache_config(
270 base_url: impl Into<String>,
271 auth: Option<SrAuth>,
272 cache_config: SchemaRegistryCacheConfig,
273 ) -> Self {
274 let cache = CacheBuilder::new(cache_config.max_entries)
275 .with_shards(4)
276 .build();
277 let subject_cache = CacheBuilder::new(256).with_shards(4).build();
279 Self {
280 client: Client::new(),
281 base_url: base_url.into().trim_end_matches('/').to_string(),
282 auth,
283 cache,
284 subject_cache,
285 cache_config,
286 }
287 }
288
289 #[must_use]
291 pub fn base_url(&self) -> &str {
292 &self.base_url
293 }
294
295 #[must_use]
297 pub fn has_auth(&self) -> bool {
298 self.auth.is_some()
299 }
300
301 #[must_use]
303 pub fn cache_config(&self) -> &SchemaRegistryCacheConfig {
304 &self.cache_config
305 }
306
307 fn cache_insert(&self, id: i32, mut schema: CachedSchema) {
311 schema.inserted_at = Instant::now();
312 self.cache.insert(id, schema);
313 }
314
315 fn cache_get(&self, id: i32) -> Option<CachedSchema> {
320 let entry = self.cache.get(&id)?;
321 let schema = entry.value();
322 if let Some(ttl) = self.cache_config.ttl {
323 if schema.inserted_at.elapsed() > ttl {
324 drop(entry);
325 self.cache.remove(&id);
326 return None;
327 }
328 }
329 Some(schema.clone())
331 }
332
333 pub async fn get_schema_by_id(&self, id: i32) -> Result<CachedSchema, ConnectorError> {
342 if let Some(cached) = self.cache_get(id) {
343 return Ok(cached);
344 }
345
346 let url = format!("{}/schemas/ids/{}", self.base_url, id);
347 let resp: SchemaByIdResponse = self.get_json(&url).await?;
348
349 let schema_type: SchemaType = resp.schema_type.parse()?;
350 let arrow_schema = avro_to_arrow_schema(&resp.schema)?;
351
352 let cached = CachedSchema {
353 id,
354 version: 0, schema_type,
356 schema_str: resp.schema,
357 arrow_schema,
358 inserted_at: Instant::now(),
359 };
360 self.cache_insert(id, cached.clone());
361 Ok(cached)
362 }
363
364 pub async fn get_latest_schema(&self, subject: &str) -> Result<CachedSchema, ConnectorError> {
370 let url = format!("{}/subjects/{}/versions/latest", self.base_url, subject);
371 let resp: SchemaVersionResponse = self.get_json(&url).await?;
372
373 let schema_type: SchemaType = resp.schema_type.parse()?;
374 let arrow_schema = avro_to_arrow_schema(&resp.schema)?;
375
376 let cached = CachedSchema {
377 id: resp.id,
378 version: resp.version,
379 schema_type,
380 schema_str: resp.schema,
381 arrow_schema,
382 inserted_at: Instant::now(),
383 };
384
385 self.cache_insert(resp.id, cached.clone());
386 self.subject_cache
387 .insert(subject.to_string(), cached.clone());
388 Ok(cached)
389 }
390
391 pub async fn get_schema_version(
397 &self,
398 subject: &str,
399 version: i32,
400 ) -> Result<CachedSchema, ConnectorError> {
401 let url = format!(
402 "{}/subjects/{}/versions/{}",
403 self.base_url, subject, version
404 );
405 let resp: SchemaVersionResponse = self.get_json(&url).await?;
406
407 let schema_type: SchemaType = resp.schema_type.parse()?;
408 let arrow_schema = avro_to_arrow_schema(&resp.schema)?;
409
410 let cached = CachedSchema {
411 id: resp.id,
412 version: resp.version,
413 schema_type,
414 schema_str: resp.schema,
415 arrow_schema,
416 inserted_at: Instant::now(),
417 };
418 self.cache_insert(resp.id, cached.clone());
419 Ok(cached)
420 }
421
422 pub async fn check_compatibility(
428 &self,
429 subject: &str,
430 schema_str: &str,
431 ) -> Result<CompatibilityResult, ConnectorError> {
432 let url = format!(
433 "{}/compatibility/subjects/{}/versions/latest",
434 self.base_url, subject
435 );
436
437 let body = CompatibilityRequest {
438 schema: schema_str.to_string(),
439 schema_type: "AVRO".to_string(),
440 };
441
442 let mut req = self.client.post(&url).json(&body);
443 if let Some(ref auth) = self.auth {
444 req = req.basic_auth(&auth.username, Some(&auth.password));
445 }
446
447 let resp = req
448 .send()
449 .await
450 .map_err(|e| ConnectorError::ConnectionFailed(format!("schema registry: {e}")))?;
451
452 if !resp.status().is_success() {
453 let status = resp.status();
454 let text = resp.text().await.unwrap_or_default();
455 return Err(ConnectorError::ConnectionFailed(format!(
456 "schema registry compatibility check failed: {status} {text}"
457 )));
458 }
459
460 let result: CompatibilityResponse = resp.json().await.map_err(|e| {
461 ConnectorError::Internal(format!("failed to parse compatibility response: {e}"))
462 })?;
463
464 Ok(CompatibilityResult {
465 is_compatible: result.is_compatible,
466 messages: result.messages,
467 })
468 }
469
470 pub async fn get_compatibility_level(
476 &self,
477 subject: &str,
478 ) -> Result<CompatibilityLevel, ConnectorError> {
479 let url = format!("{}/config/{}", self.base_url, subject);
480 let resp: ConfigResponse = self.get_json(&url).await?;
481 resp.compatibility_level.parse()
482 }
483
484 pub async fn set_compatibility_level(
490 &self,
491 subject: &str,
492 level: CompatibilityLevel,
493 ) -> Result<(), ConnectorError> {
494 let url = format!("{}/config/{}", self.base_url, subject);
495 let body = ConfigUpdateRequest {
496 compatibility: level.as_str().to_string(),
497 };
498
499 let mut req = self.client.put(&url).json(&body);
500 if let Some(ref auth) = self.auth {
501 req = req.basic_auth(&auth.username, Some(&auth.password));
502 }
503
504 let resp = req
505 .send()
506 .await
507 .map_err(|e| ConnectorError::ConnectionFailed(format!("schema registry: {e}")))?;
508
509 if !resp.status().is_success() {
510 let status = resp.status();
511 let text = resp.text().await.unwrap_or_default();
512 return Err(ConnectorError::ConnectionFailed(format!(
513 "schema registry config update failed: {status} {text}"
514 )));
515 }
516
517 Ok(())
518 }
519
520 pub async fn resolve_confluent_id(&self, id: i32) -> Result<CachedSchema, ConnectorError> {
529 self.get_schema_by_id(id).await
530 }
531
532 pub async fn register_schema(
542 &self,
543 subject: &str,
544 schema_str: &str,
545 schema_type: SchemaType,
546 ) -> Result<i32, ConnectorError> {
547 if let Some(entry) = self.subject_cache.get(subject) {
549 if entry.value().schema_str == schema_str {
550 return Ok(entry.value().id);
551 }
552 }
553
554 let url = format!("{}/subjects/{}/versions", self.base_url, subject);
555 let body = RegisterSchemaRequest {
556 schema: schema_str.to_string(),
557 schema_type: schema_type.to_string(),
558 };
559
560 let mut req = self.client.post(&url).json(&body);
561 if let Some(ref auth) = self.auth {
562 req = req.basic_auth(&auth.username, Some(&auth.password));
563 }
564
565 let resp = req
566 .send()
567 .await
568 .map_err(|e| ConnectorError::ConnectionFailed(format!("schema registry: {e}")))?;
569
570 if !resp.status().is_success() {
571 let status = resp.status();
572 let text = resp.text().await.unwrap_or_default();
573 return Err(ConnectorError::ConnectionFailed(format!(
574 "schema registry register failed: {status} {text}"
575 )));
576 }
577
578 let result: RegisterSchemaResponse = resp.json().await.map_err(|e| {
579 ConnectorError::Internal(format!("failed to parse register schema response: {e}"))
580 })?;
581
582 let arrow_schema = avro_to_arrow_schema(schema_str)?;
583 let cached = CachedSchema {
584 id: result.id,
585 version: 0,
586 schema_type,
587 schema_str: schema_str.to_string(),
588 arrow_schema,
589 inserted_at: Instant::now(),
590 };
591 self.cache_insert(result.id, cached.clone());
592 self.subject_cache.insert(subject.to_string(), cached);
593
594 Ok(result.id)
595 }
596
597 pub async fn validate_and_register_schema(
608 &self,
609 subject: &str,
610 schema_str: &str,
611 schema_type: SchemaType,
612 ) -> Result<i32, ConnectorError> {
613 match self.check_compatibility(subject, schema_str).await {
615 Ok(result) => {
616 if !result.is_compatible {
617 let message = if result.messages.is_empty() {
618 "new schema is not compatible with existing version".to_string()
619 } else {
620 result.messages.join("; ")
621 };
622 return Err(ConnectorError::Serde(SerdeError::SchemaIncompatible {
623 subject: subject.to_string(),
624 message,
625 }));
626 }
627 }
628 Err(ConnectorError::ConnectionFailed(msg)) if msg.contains("404") => {
629 }
631 Err(e) => return Err(e),
632 }
633
634 self.register_schema(subject, schema_str, schema_type).await
635 }
636
637 #[must_use]
639 pub fn is_cached(&self, id: i32) -> bool {
640 self.cache.contains(&id)
641 }
642
643 #[must_use]
645 pub fn cache_size(&self) -> usize {
646 self.cache.usage()
647 }
648
649 async fn get_json<T: serde::de::DeserializeOwned>(
654 &self,
655 url: &str,
656 ) -> Result<T, ConnectorError> {
657 let backoffs = [
658 std::time::Duration::from_millis(100),
659 std::time::Duration::from_millis(500),
660 ];
661 let mut last_err = None;
662
663 for (attempt, backoff) in std::iter::once(&std::time::Duration::ZERO)
664 .chain(backoffs.iter())
665 .enumerate()
666 {
667 if attempt > 0 {
668 tokio::time::sleep(*backoff).await;
669 }
670
671 let mut req = self.client.get(url);
672 if let Some(ref auth) = self.auth {
673 req = req.basic_auth(&auth.username, Some(&auth.password));
674 }
675
676 let resp = match req.send().await {
677 Ok(r) => r,
678 Err(e) => {
679 tracing::warn!(
680 attempt = attempt + 1,
681 error = %e,
682 "schema registry request failed, retrying"
683 );
684 last_err = Some(ConnectorError::ConnectionFailed(format!(
685 "schema registry: {e}"
686 )));
687 continue;
688 }
689 };
690
691 let status = resp.status();
692 if status.is_success() {
693 return resp.json::<T>().await.map_err(|e| {
694 ConnectorError::Internal(format!(
695 "failed to parse schema registry response: {e}"
696 ))
697 });
698 }
699
700 if status.is_client_error() {
702 let text = resp.text().await.unwrap_or_default();
703 return Err(ConnectorError::ConnectionFailed(format!(
704 "schema registry client error: {status} {text}"
705 )));
706 }
707
708 let text = resp.text().await.unwrap_or_default();
710 tracing::warn!(
711 attempt = attempt + 1,
712 status = %status,
713 "schema registry server error, retrying"
714 );
715 last_err = Some(ConnectorError::ConnectionFailed(format!(
716 "schema registry request failed: {status} {text}"
717 )));
718 }
719
720 Err(last_err.unwrap_or_else(|| {
721 ConnectorError::ConnectionFailed("schema registry: all retries exhausted".into())
722 }))
723 }
724}
725
726impl std::fmt::Debug for SchemaRegistryClient {
727 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728 f.debug_struct("SchemaRegistryClient")
729 .field("base_url", &self.base_url)
730 .field("has_auth", &self.auth.is_some())
731 .field("cached_schemas", &self.cache.usage())
732 .field("cached_subjects", &self.subject_cache.usage())
733 .finish_non_exhaustive()
734 }
735}
736
737pub fn avro_to_arrow_schema(avro_schema_str: &str) -> Result<SchemaRef, ConnectorError> {
746 let avro: serde_json::Value = serde_json::from_str(avro_schema_str)
747 .map_err(|e| ConnectorError::SchemaMismatch(format!("invalid Avro schema JSON: {e}")))?;
748
749 let fields_val = avro.get("fields").ok_or_else(|| {
750 ConnectorError::SchemaMismatch("Avro schema missing 'fields' array".into())
751 })?;
752
753 let fields_arr = fields_val.as_array().ok_or_else(|| {
754 ConnectorError::SchemaMismatch("Avro schema 'fields' is not an array".into())
755 })?;
756
757 let mut arrow_fields = Vec::with_capacity(fields_arr.len());
758 for field in fields_arr {
759 let name = field
760 .get("name")
761 .and_then(|v| v.as_str())
762 .ok_or_else(|| ConnectorError::SchemaMismatch("Avro field missing 'name'".into()))?;
763
764 let (data_type, nullable) = parse_avro_type(field.get("type").ok_or_else(|| {
765 ConnectorError::SchemaMismatch(format!("Avro field '{name}' missing 'type'"))
766 })?)?;
767
768 arrow_fields.push(Field::new(name, data_type, nullable));
769 }
770
771 Ok(Arc::new(Schema::new(arrow_fields)))
772}
773
774#[allow(clippy::too_many_lines)]
776fn parse_avro_type(avro_type: &serde_json::Value) -> Result<(DataType, bool), ConnectorError> {
777 match avro_type {
778 serde_json::Value::String(s) => Ok((avro_primitive_to_arrow(s)?, false)),
779 serde_json::Value::Array(union) => {
780 let non_null: Vec<_> = union
782 .iter()
783 .filter(|v| v.as_str() != Some("null"))
784 .collect();
785 let nullable = union.iter().any(|v| v.as_str() == Some("null"));
786
787 if non_null.len() == 1 {
788 let (dt, _) = parse_avro_type(non_null[0])?;
789 Ok((dt, nullable))
790 } else {
791 Ok((DataType::Utf8, nullable))
793 }
794 }
795 serde_json::Value::Object(obj) => {
796 if let Some(logical) = obj.get("logicalType").and_then(|v| v.as_str()) {
798 return match logical {
799 "timestamp-millis" | "timestamp-micros" => Ok((DataType::Int64, false)),
800 "date" => Ok((DataType::Int32, false)),
801 "decimal" => Ok((DataType::Float64, false)),
802 _ => Ok((DataType::Utf8, false)),
803 };
804 }
805
806 let type_str = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
807 match type_str {
808 "array" => {
809 let items = obj.get("items").ok_or_else(|| {
810 ConnectorError::SchemaMismatch("Avro array type missing 'items'".into())
811 })?;
812 let (item_type, _) = parse_avro_type(items)?;
813 Ok((
814 DataType::List(Arc::new(Field::new("item", item_type, true))),
815 false,
816 ))
817 }
818 "map" => {
819 let values = obj.get("values").ok_or_else(|| {
820 ConnectorError::SchemaMismatch("Avro map type missing 'values'".into())
821 })?;
822 let (value_type, _) = parse_avro_type(values)?;
823 Ok((
824 DataType::Map(
825 Arc::new(Field::new(
826 "entries",
827 DataType::Struct(Fields::from(vec![
828 Field::new("key", DataType::Utf8, false),
829 Field::new("value", value_type, true),
830 ])),
831 false,
832 )),
833 false,
834 ),
835 false,
836 ))
837 }
838 "record" => {
839 let fields_val = obj.get("fields").ok_or_else(|| {
840 ConnectorError::SchemaMismatch("Avro nested record missing 'fields'".into())
841 })?;
842 let fields_arr = fields_val.as_array().ok_or_else(|| {
843 ConnectorError::SchemaMismatch(
844 "Avro nested record 'fields' is not an array".into(),
845 )
846 })?;
847 let mut arrow_fields = Vec::with_capacity(fields_arr.len());
848 for f in fields_arr {
849 let name = f.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
850 ConnectorError::SchemaMismatch(
851 "Avro nested record field missing 'name'".into(),
852 )
853 })?;
854 let f_type = f.get("type").ok_or_else(|| {
855 ConnectorError::SchemaMismatch(format!(
856 "Avro nested field '{name}' missing 'type'"
857 ))
858 })?;
859 let (dt, nullable) = parse_avro_type(f_type)?;
860 arrow_fields.push(Field::new(name, dt, nullable));
861 }
862 Ok((DataType::Struct(Fields::from(arrow_fields)), false))
863 }
864 "enum" => Ok((
865 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
866 false,
867 )),
868 "fixed" => {
869 let size = obj
870 .get("size")
871 .and_then(serde_json::Value::as_u64)
872 .ok_or_else(|| {
873 ConnectorError::SchemaMismatch("Avro fixed type missing 'size'".into())
874 })?;
875 #[allow(clippy::cast_possible_truncation)]
876 Ok((DataType::FixedSizeBinary(size as i32), false))
877 }
878 _ => Ok((DataType::Utf8, false)),
879 }
880 }
881 _ => Err(ConnectorError::SchemaMismatch(format!(
882 "unsupported Avro type: {avro_type}"
883 ))),
884 }
885}
886
887fn avro_primitive_to_arrow(avro_type: &str) -> Result<DataType, ConnectorError> {
889 match avro_type {
890 "null" => Ok(DataType::Null),
891 "boolean" => Ok(DataType::Boolean),
892 "int" => Ok(DataType::Int32),
893 "long" => Ok(DataType::Int64),
894 "float" => Ok(DataType::Float32),
895 "double" => Ok(DataType::Float64),
896 "bytes" => Ok(DataType::Binary),
897 "string" => Ok(DataType::Utf8),
898 other => Err(ConnectorError::SchemaMismatch(format!(
899 "unsupported Avro primitive type: '{other}'"
900 ))),
901 }
902}
903
904pub fn arrow_to_avro_schema(schema: &SchemaRef, record_name: &str) -> Result<String, SerdeError> {
913 let mut fields = Vec::with_capacity(schema.fields().len());
914
915 for field in schema.fields() {
916 let avro_type = arrow_to_avro_type(field.data_type())?;
917
918 let field_type = if field.is_nullable() {
919 serde_json::json!(["null", avro_type])
920 } else {
921 avro_type
922 };
923
924 fields.push(serde_json::json!({
925 "name": field.name(),
926 "type": field_type,
927 }));
928 }
929
930 let schema = serde_json::json!({
931 "type": "record",
932 "name": record_name,
933 "fields": fields,
934 });
935
936 serde_json::to_string(&schema)
937 .map_err(|e| SerdeError::MalformedInput(format!("failed to serialize Avro schema: {e}")))
938}
939
940fn arrow_to_avro_type(data_type: &DataType) -> Result<serde_json::Value, SerdeError> {
942 match data_type {
943 DataType::Null => Ok(serde_json::json!("null")),
944 DataType::Boolean => Ok(serde_json::json!("boolean")),
945 DataType::Int8
946 | DataType::Int16
947 | DataType::Int32
948 | DataType::UInt8
949 | DataType::UInt16
950 | DataType::UInt32 => Ok(serde_json::json!("int")),
951 DataType::Int64 | DataType::UInt64 => Ok(serde_json::json!("long")),
952 DataType::Float32 => Ok(serde_json::json!("float")),
953 DataType::Float64 => Ok(serde_json::json!("double")),
954 DataType::Utf8 | DataType::LargeUtf8 => Ok(serde_json::json!("string")),
955 DataType::Binary | DataType::LargeBinary => Ok(serde_json::json!("bytes")),
956 DataType::List(item_field) => {
957 let items = arrow_to_avro_type(item_field.data_type())?;
958 Ok(serde_json::json!({
959 "type": "array",
960 "items": items,
961 }))
962 }
963 DataType::Map(entries_field, _) => {
964 if let DataType::Struct(fields) = entries_field.data_type() {
966 let value_field = fields.iter().find(|f| f.name() == "value").ok_or_else(|| {
967 SerdeError::UnsupportedFormat(
968 "Arrow Map missing 'value' field in entries struct".into(),
969 )
970 })?;
971 let values = arrow_to_avro_type(value_field.data_type())?;
972 Ok(serde_json::json!({
973 "type": "map",
974 "values": values,
975 }))
976 } else {
977 Err(SerdeError::UnsupportedFormat(
978 "Arrow Map entries field is not a Struct".into(),
979 ))
980 }
981 }
982 DataType::Struct(fields) => {
983 let mut avro_fields = Vec::with_capacity(fields.len());
984 for field in fields {
985 let avro_type = arrow_to_avro_type(field.data_type())?;
986 let field_type = if field.is_nullable() {
987 serde_json::json!(["null", avro_type])
988 } else {
989 avro_type
990 };
991 avro_fields.push(serde_json::json!({
992 "name": field.name(),
993 "type": field_type,
994 }));
995 }
996 Ok(serde_json::json!({
997 "type": "record",
998 "name": "nested",
999 "fields": avro_fields,
1000 }))
1001 }
1002 DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Utf8 => {
1003 Ok(serde_json::json!({
1004 "type": "enum",
1005 "name": "enum_field",
1006 "symbols": [],
1007 }))
1008 }
1009 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
1010 Ok(serde_json::json!({"type": "long", "logicalType": "timestamp-millis"}))
1011 }
1012 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
1013 Ok(serde_json::json!({"type": "long", "logicalType": "timestamp-micros"}))
1014 }
1015 DataType::Date32 => Ok(serde_json::json!({"type": "int", "logicalType": "date"})),
1016 DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
1017 Ok(serde_json::json!({"type": "int", "logicalType": "time-millis"}))
1018 }
1019 DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
1020 Ok(serde_json::json!({"type": "long", "logicalType": "time-micros"}))
1021 }
1022 DataType::FixedSizeBinary(size) => Ok(serde_json::json!({
1023 "type": "fixed",
1024 "name": "fixed_field",
1025 "size": size,
1026 })),
1027 other => Err(SerdeError::UnsupportedFormat(format!(
1028 "no Avro equivalent for Arrow type: {other}"
1029 ))),
1030 }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035 use super::*;
1036
1037 #[test]
1038 fn test_avro_to_arrow_simple_record() {
1039 let avro = r#"{
1040 "type": "record",
1041 "name": "test",
1042 "fields": [
1043 {"name": "id", "type": "long"},
1044 {"name": "name", "type": "string"},
1045 {"name": "active", "type": "boolean"}
1046 ]
1047 }"#;
1048
1049 let schema = avro_to_arrow_schema(avro).unwrap();
1050 assert_eq!(schema.fields().len(), 3);
1051 assert_eq!(schema.field(0).name(), "id");
1052 assert_eq!(schema.field(0).data_type(), &DataType::Int64);
1053 assert!(!schema.field(0).is_nullable());
1054 assert_eq!(schema.field(1).name(), "name");
1055 assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1056 assert_eq!(schema.field(2).name(), "active");
1057 assert_eq!(schema.field(2).data_type(), &DataType::Boolean);
1058 }
1059
1060 #[test]
1061 fn test_avro_to_arrow_nullable_union() {
1062 let avro = r#"{
1063 "type": "record",
1064 "name": "test",
1065 "fields": [
1066 {"name": "id", "type": "long"},
1067 {"name": "email", "type": ["null", "string"]}
1068 ]
1069 }"#;
1070
1071 let schema = avro_to_arrow_schema(avro).unwrap();
1072 assert_eq!(schema.fields().len(), 2);
1073 assert!(!schema.field(0).is_nullable());
1074 assert!(schema.field(1).is_nullable());
1075 assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1076 }
1077
1078 #[test]
1079 fn test_avro_to_arrow_all_primitives() {
1080 let avro = r#"{
1081 "type": "record",
1082 "name": "test",
1083 "fields": [
1084 {"name": "b", "type": "boolean"},
1085 {"name": "i", "type": "int"},
1086 {"name": "l", "type": "long"},
1087 {"name": "f", "type": "float"},
1088 {"name": "d", "type": "double"},
1089 {"name": "s", "type": "string"},
1090 {"name": "raw", "type": "bytes"}
1091 ]
1092 }"#;
1093
1094 let schema = avro_to_arrow_schema(avro).unwrap();
1095 assert_eq!(schema.field(0).data_type(), &DataType::Boolean);
1096 assert_eq!(schema.field(1).data_type(), &DataType::Int32);
1097 assert_eq!(schema.field(2).data_type(), &DataType::Int64);
1098 assert_eq!(schema.field(3).data_type(), &DataType::Float32);
1099 assert_eq!(schema.field(4).data_type(), &DataType::Float64);
1100 assert_eq!(schema.field(5).data_type(), &DataType::Utf8);
1101 assert_eq!(schema.field(6).data_type(), &DataType::Binary);
1102 }
1103
1104 #[test]
1105 fn test_avro_to_arrow_invalid_json() {
1106 assert!(avro_to_arrow_schema("not json").is_err());
1107 }
1108
1109 #[test]
1110 fn test_avro_to_arrow_missing_fields() {
1111 let avro = r#"{"type": "record", "name": "test"}"#;
1112 assert!(avro_to_arrow_schema(avro).is_err());
1113 }
1114
1115 #[test]
1116 fn test_schema_type_parsing() {
1117 assert_eq!("AVRO".parse::<SchemaType>().unwrap(), SchemaType::Avro);
1118 assert_eq!(
1119 "PROTOBUF".parse::<SchemaType>().unwrap(),
1120 SchemaType::Protobuf
1121 );
1122 assert_eq!("JSON".parse::<SchemaType>().unwrap(), SchemaType::Json);
1123 assert!("UNKNOWN".parse::<SchemaType>().is_err());
1124 }
1125
1126 #[test]
1127 fn test_schema_type_display() {
1128 assert_eq!(SchemaType::Avro.to_string(), "AVRO");
1129 assert_eq!(SchemaType::Protobuf.to_string(), "PROTOBUF");
1130 assert_eq!(SchemaType::Json.to_string(), "JSON");
1131 }
1132
1133 #[test]
1134 fn test_client_creation() {
1135 let client = SchemaRegistryClient::new("http://localhost:8081", None);
1136 assert_eq!(client.base_url(), "http://localhost:8081");
1137 assert!(!client.has_auth());
1138 assert_eq!(client.cache_size(), 0);
1139 }
1140
1141 #[test]
1142 fn test_client_with_auth() {
1143 let auth = SrAuth {
1144 username: "user".into(),
1145 password: "pass".into(),
1146 };
1147 let client = SchemaRegistryClient::new("http://localhost:8081", Some(auth));
1148 assert!(client.has_auth());
1149 }
1150
1151 #[test]
1152 fn test_client_trailing_slash_stripped() {
1153 let client = SchemaRegistryClient::new("http://localhost:8081/", None);
1154 assert_eq!(client.base_url(), "http://localhost:8081");
1155 }
1156
1157 #[test]
1158 fn test_arrow_to_avro_schema_simple() {
1159 let schema = Arc::new(Schema::new(vec![
1160 Field::new("id", DataType::Int64, false),
1161 Field::new("name", DataType::Utf8, false),
1162 ]));
1163
1164 let avro_str = arrow_to_avro_schema(&schema, "test_record").unwrap();
1165 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1166
1167 assert_eq!(avro["type"], "record");
1168 assert_eq!(avro["name"], "test_record");
1169
1170 let fields = avro["fields"].as_array().unwrap();
1171 assert_eq!(fields.len(), 2);
1172 assert_eq!(fields[0]["name"], "id");
1173 assert_eq!(fields[0]["type"], "long");
1174 assert_eq!(fields[1]["name"], "name");
1175 assert_eq!(fields[1]["type"], "string");
1176 }
1177
1178 #[test]
1179 fn test_arrow_to_avro_schema_nullable() {
1180 let schema = Arc::new(Schema::new(vec![
1181 Field::new("id", DataType::Int64, false),
1182 Field::new("email", DataType::Utf8, true),
1183 ]));
1184
1185 let avro_str = arrow_to_avro_schema(&schema, "record").unwrap();
1186 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1187
1188 let fields = avro["fields"].as_array().unwrap();
1189 assert_eq!(fields[0]["type"], "long");
1191 let union = fields[1]["type"].as_array().unwrap();
1193 assert_eq!(union.len(), 2);
1194 assert_eq!(union[0], "null");
1195 assert_eq!(union[1], "string");
1196 }
1197
1198 #[test]
1199 fn test_arrow_to_avro_all_primitives() {
1200 let schema = Arc::new(Schema::new(vec![
1201 Field::new("b", DataType::Boolean, false),
1202 Field::new("i32", DataType::Int32, false),
1203 Field::new("i64", DataType::Int64, false),
1204 Field::new("f32", DataType::Float32, false),
1205 Field::new("f64", DataType::Float64, false),
1206 Field::new("s", DataType::Utf8, false),
1207 Field::new("bin", DataType::Binary, false),
1208 ]));
1209
1210 let avro_str = arrow_to_avro_schema(&schema, "all_types").unwrap();
1211 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1212 let fields = avro["fields"].as_array().unwrap();
1213
1214 assert_eq!(fields[0]["type"], "boolean");
1215 assert_eq!(fields[1]["type"], "int");
1216 assert_eq!(fields[2]["type"], "long");
1217 assert_eq!(fields[3]["type"], "float");
1218 assert_eq!(fields[4]["type"], "double");
1219 assert_eq!(fields[5]["type"], "string");
1220 assert_eq!(fields[6]["type"], "bytes");
1221 }
1222
1223 #[test]
1224 fn test_arrow_to_avro_roundtrip() {
1225 let original = Arc::new(Schema::new(vec![
1226 Field::new("id", DataType::Int64, false),
1227 Field::new("name", DataType::Utf8, true),
1228 Field::new("active", DataType::Boolean, false),
1229 ]));
1230
1231 let avro_str = arrow_to_avro_schema(&original, "roundtrip").unwrap();
1232 let recovered = avro_to_arrow_schema(&avro_str).unwrap();
1233
1234 assert_eq!(recovered.fields().len(), 3);
1235 assert_eq!(recovered.field(0).data_type(), &DataType::Int64);
1236 assert!(!recovered.field(0).is_nullable());
1237 assert_eq!(recovered.field(1).data_type(), &DataType::Utf8);
1238 assert!(recovered.field(1).is_nullable());
1239 assert_eq!(recovered.field(2).data_type(), &DataType::Boolean);
1240 }
1241
1242 #[test]
1245 fn test_avro_to_arrow_array_type() {
1246 let avro = r#"{
1247 "type": "record",
1248 "name": "test",
1249 "fields": [
1250 {"name": "tags", "type": {"type": "array", "items": "string"}}
1251 ]
1252 }"#;
1253
1254 let schema = avro_to_arrow_schema(avro).unwrap();
1255 assert_eq!(schema.fields().len(), 1);
1256 match schema.field(0).data_type() {
1257 DataType::List(item) => {
1258 assert_eq!(item.data_type(), &DataType::Utf8);
1259 }
1260 other => panic!("expected List, got {other:?}"),
1261 }
1262 }
1263
1264 #[test]
1265 fn test_avro_to_arrow_map_type() {
1266 let avro = r#"{
1267 "type": "record",
1268 "name": "test",
1269 "fields": [
1270 {"name": "metadata", "type": {"type": "map", "values": "long"}}
1271 ]
1272 }"#;
1273
1274 let schema = avro_to_arrow_schema(avro).unwrap();
1275 assert_eq!(schema.fields().len(), 1);
1276 match schema.field(0).data_type() {
1277 DataType::Map(entries, _) => {
1278 if let DataType::Struct(fields) = entries.data_type() {
1279 assert_eq!(fields.len(), 2);
1280 assert_eq!(fields[0].name(), "key");
1281 assert_eq!(fields[0].data_type(), &DataType::Utf8);
1282 assert_eq!(fields[1].name(), "value");
1283 assert_eq!(fields[1].data_type(), &DataType::Int64);
1284 } else {
1285 panic!("expected Struct entries");
1286 }
1287 }
1288 other => panic!("expected Map, got {other:?}"),
1289 }
1290 }
1291
1292 #[test]
1293 fn test_avro_to_arrow_nested_record() {
1294 let avro = r#"{
1295 "type": "record",
1296 "name": "test",
1297 "fields": [
1298 {
1299 "name": "address",
1300 "type": {
1301 "type": "record",
1302 "name": "Address",
1303 "fields": [
1304 {"name": "street", "type": "string"},
1305 {"name": "zip", "type": "int"}
1306 ]
1307 }
1308 }
1309 ]
1310 }"#;
1311
1312 let schema = avro_to_arrow_schema(avro).unwrap();
1313 assert_eq!(schema.fields().len(), 1);
1314 match schema.field(0).data_type() {
1315 DataType::Struct(fields) => {
1316 assert_eq!(fields.len(), 2);
1317 assert_eq!(fields[0].name(), "street");
1318 assert_eq!(fields[0].data_type(), &DataType::Utf8);
1319 assert_eq!(fields[1].name(), "zip");
1320 assert_eq!(fields[1].data_type(), &DataType::Int32);
1321 }
1322 other => panic!("expected Struct, got {other:?}"),
1323 }
1324 }
1325
1326 #[test]
1327 fn test_avro_to_arrow_enum_type() {
1328 let avro = r#"{
1329 "type": "record",
1330 "name": "test",
1331 "fields": [
1332 {
1333 "name": "status",
1334 "type": {
1335 "type": "enum",
1336 "name": "Status",
1337 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
1338 }
1339 }
1340 ]
1341 }"#;
1342
1343 let schema = avro_to_arrow_schema(avro).unwrap();
1344 assert_eq!(schema.fields().len(), 1);
1345 match schema.field(0).data_type() {
1346 DataType::Dictionary(key, value) => {
1347 assert_eq!(key.as_ref(), &DataType::Int32);
1348 assert_eq!(value.as_ref(), &DataType::Utf8);
1349 }
1350 other => panic!("expected Dictionary, got {other:?}"),
1351 }
1352 }
1353
1354 #[test]
1355 fn test_avro_to_arrow_fixed_type() {
1356 let avro = r#"{
1357 "type": "record",
1358 "name": "test",
1359 "fields": [
1360 {
1361 "name": "uuid",
1362 "type": {"type": "fixed", "name": "uuid", "size": 16}
1363 }
1364 ]
1365 }"#;
1366
1367 let schema = avro_to_arrow_schema(avro).unwrap();
1368 assert_eq!(schema.fields().len(), 1);
1369 assert_eq!(schema.field(0).data_type(), &DataType::FixedSizeBinary(16));
1370 }
1371
1372 #[test]
1373 fn test_avro_to_arrow_nullable_complex_in_union() {
1374 let avro = r#"{
1375 "type": "record",
1376 "name": "test",
1377 "fields": [
1378 {
1379 "name": "tags",
1380 "type": ["null", {"type": "array", "items": "string"}]
1381 }
1382 ]
1383 }"#;
1384
1385 let schema = avro_to_arrow_schema(avro).unwrap();
1386 assert!(schema.field(0).is_nullable());
1387 assert!(matches!(schema.field(0).data_type(), DataType::List(_)));
1388 }
1389
1390 #[test]
1391 fn test_avro_array_missing_items() {
1392 let avro = r#"{
1393 "type": "record",
1394 "name": "test",
1395 "fields": [
1396 {"name": "bad", "type": {"type": "array"}}
1397 ]
1398 }"#;
1399 assert!(avro_to_arrow_schema(avro).is_err());
1400 }
1401
1402 #[test]
1403 fn test_avro_map_missing_values() {
1404 let avro = r#"{
1405 "type": "record",
1406 "name": "test",
1407 "fields": [
1408 {"name": "bad", "type": {"type": "map"}}
1409 ]
1410 }"#;
1411 assert!(avro_to_arrow_schema(avro).is_err());
1412 }
1413
1414 #[test]
1415 fn test_arrow_to_avro_array_type() {
1416 let schema = Arc::new(Schema::new(vec![Field::new(
1417 "tags",
1418 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1419 false,
1420 )]));
1421
1422 let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1423 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1424 let field = &avro["fields"][0];
1425 assert_eq!(field["type"]["type"], "array");
1426 assert_eq!(field["type"]["items"], "string");
1427 }
1428
1429 #[test]
1430 fn test_arrow_to_avro_map_type() {
1431 let schema = Arc::new(Schema::new(vec![Field::new(
1432 "metadata",
1433 DataType::Map(
1434 Arc::new(Field::new(
1435 "entries",
1436 DataType::Struct(Fields::from(vec![
1437 Field::new("key", DataType::Utf8, false),
1438 Field::new("value", DataType::Int64, true),
1439 ])),
1440 false,
1441 )),
1442 false,
1443 ),
1444 false,
1445 )]));
1446
1447 let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1448 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1449 let field = &avro["fields"][0];
1450 assert_eq!(field["type"]["type"], "map");
1451 assert_eq!(field["type"]["values"], "long");
1452 }
1453
1454 #[test]
1455 fn test_arrow_to_avro_struct_type() {
1456 let schema = Arc::new(Schema::new(vec![Field::new(
1457 "address",
1458 DataType::Struct(Fields::from(vec![
1459 Field::new("street", DataType::Utf8, false),
1460 Field::new("zip", DataType::Int32, false),
1461 ])),
1462 false,
1463 )]));
1464
1465 let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1466 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1467 let field = &avro["fields"][0];
1468 assert_eq!(field["type"]["type"], "record");
1469 let nested = field["type"]["fields"].as_array().unwrap();
1470 assert_eq!(nested.len(), 2);
1471 assert_eq!(nested[0]["name"], "street");
1472 assert_eq!(nested[0]["type"], "string");
1473 assert_eq!(nested[1]["name"], "zip");
1474 assert_eq!(nested[1]["type"], "int");
1475 }
1476
1477 #[test]
1478 fn test_arrow_to_avro_fixed_type() {
1479 let schema = Arc::new(Schema::new(vec![Field::new(
1480 "uuid",
1481 DataType::FixedSizeBinary(16),
1482 false,
1483 )]));
1484
1485 let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1486 let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1487 let field = &avro["fields"][0];
1488 assert_eq!(field["type"]["type"], "fixed");
1489 assert_eq!(field["type"]["size"], 16);
1490 }
1491
1492 fn make_cached_schema(id: i32) -> CachedSchema {
1495 CachedSchema {
1496 id,
1497 version: 1,
1498 schema_type: SchemaType::Avro,
1499 schema_str: format!(
1500 r#"{{"type":"record","name":"t{id}","fields":[{{"name":"x","type":"int"}}]}}"#
1501 ),
1502 arrow_schema: Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])),
1503 inserted_at: Instant::now(),
1504 }
1505 }
1506
1507 #[test]
1508 fn test_cache_config_defaults() {
1509 let config = SchemaRegistryCacheConfig::default();
1510 assert_eq!(config.max_entries, 1000);
1511 assert_eq!(config.ttl, Some(Duration::from_secs(3600)));
1512 }
1513
1514 #[test]
1515 fn test_cache_lru_eviction() {
1516 let config = SchemaRegistryCacheConfig {
1517 max_entries: 3,
1518 ttl: None,
1519 };
1520 let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1521
1522 client.cache_insert(1, make_cached_schema(1));
1524 client.cache_insert(2, make_cached_schema(2));
1525 client.cache_insert(3, make_cached_schema(3));
1526 assert_eq!(client.cache_size(), 3);
1527
1528 client.cache_insert(4, make_cached_schema(4));
1530 assert!(client.cache_size() <= 3);
1531 assert!(client.cache_get(4).is_some());
1533 }
1534
1535 #[test]
1536 fn test_cache_ttl_expiration() {
1537 let config = SchemaRegistryCacheConfig {
1538 max_entries: 100,
1539 ttl: Some(Duration::from_millis(50)),
1540 };
1541 let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1542
1543 client.cache_insert(1, make_cached_schema(1));
1544 assert!(client.cache_get(1).is_some());
1545
1546 std::thread::sleep(Duration::from_millis(60));
1548 assert!(client.cache_get(1).is_none());
1550 }
1551
1552 #[test]
1553 fn test_cache_no_ttl() {
1554 let config = SchemaRegistryCacheConfig {
1555 max_entries: 100,
1556 ttl: None,
1557 };
1558 let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1559
1560 client.cache_insert(1, make_cached_schema(1));
1561 assert!(client.cache_get(1).is_some());
1563 }
1564
1565 #[test]
1566 fn test_cache_replace_existing_id() {
1567 let config = SchemaRegistryCacheConfig {
1568 max_entries: 10,
1569 ttl: None,
1570 };
1571 let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1572
1573 client.cache_insert(1, make_cached_schema(1));
1574 client.cache_insert(2, make_cached_schema(2));
1575 assert_eq!(client.cache_size(), 2);
1576
1577 client.cache_insert(1, make_cached_schema(1));
1579 assert_eq!(client.cache_size(), 2);
1580 }
1581
1582 #[test]
1583 fn test_schema_incompatible_error_via_serde() {
1584 let err = SerdeError::SchemaIncompatible {
1585 subject: "orders-value".into(),
1586 message: "READER_FIELD_MISSING_DEFAULT_VALUE: field 'new_field'".into(),
1587 };
1588 let conn_err: ConnectorError = err.into();
1589 assert!(matches!(
1590 conn_err,
1591 ConnectorError::Serde(SerdeError::SchemaIncompatible { .. })
1592 ));
1593 assert!(conn_err.to_string().contains("orders-value"));
1594 }
1595
1596 #[test]
1597 fn test_validate_and_register_method_exists() {
1598 let client = SchemaRegistryClient::new("http://localhost:8081", None);
1600 let _ = &client;
1602 }
1603
1604 #[test]
1605 fn test_complex_type_roundtrip() {
1606 let avro = r#"{
1607 "type": "record",
1608 "name": "test",
1609 "fields": [
1610 {"name": "tags", "type": {"type": "array", "items": "string"}},
1611 {"name": "metadata", "type": {"type": "map", "values": "long"}}
1612 ]
1613 }"#;
1614
1615 let arrow_schema = avro_to_arrow_schema(avro).unwrap();
1616 assert!(matches!(
1617 arrow_schema.field(0).data_type(),
1618 DataType::List(_)
1619 ));
1620 assert!(matches!(
1621 arrow_schema.field(1).data_type(),
1622 DataType::Map(_, _)
1623 ));
1624
1625 let avro_str = arrow_to_avro_schema(&arrow_schema, "test").unwrap();
1627 let recovered = avro_to_arrow_schema(&avro_str).unwrap();
1628
1629 assert!(matches!(recovered.field(0).data_type(), DataType::List(_)));
1630 assert!(matches!(
1631 recovered.field(1).data_type(),
1632 DataType::Map(_, _)
1633 ));
1634 }
1635}