Skip to main content

laminar_connectors/kafka/
schema_registry.rs

1//! Confluent Schema Registry client.
2//!
3//! [`SchemaRegistryClient`] provides a lightweight async REST client for
4//! the Confluent Schema Registry API, with in-memory caching, arrow
5//! schema conversion, and compatibility checking.
6
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use foyer::{Cache, CacheBuilder};
11
12use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15
16use crate::error::{ConnectorError, SerdeError};
17use crate::kafka::config::{CompatibilityLevel, SrAuth};
18
19/// Schema type as reported by the Schema Registry.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SchemaType {
22    /// Apache Avro schema.
23    Avro,
24    /// Protocol Buffers schema.
25    Protobuf,
26    /// JSON Schema.
27    Json,
28}
29
30impl std::str::FromStr for SchemaType {
31    type Err = ConnectorError;
32
33    fn from_str(s: &str) -> Result<Self, Self::Err> {
34        match s.to_uppercase().as_str() {
35            "AVRO" => Ok(SchemaType::Avro),
36            "PROTOBUF" => Ok(SchemaType::Protobuf),
37            "JSON" => Ok(SchemaType::Json),
38            other => Err(ConnectorError::ConfigurationError(format!(
39                "unknown schema type: '{other}'"
40            ))),
41        }
42    }
43}
44
45impl std::fmt::Display for SchemaType {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            SchemaType::Avro => write!(f, "AVRO"),
49            SchemaType::Protobuf => write!(f, "PROTOBUF"),
50            SchemaType::Json => write!(f, "JSON"),
51        }
52    }
53}
54
55/// Configuration for the Schema Registry cache.
56#[derive(Debug, Clone)]
57pub struct SchemaRegistryCacheConfig {
58    /// Maximum number of cached schemas. Default: 1000.
59    pub max_entries: usize,
60    /// TTL for cache entries. `None` means no expiry. Default: 1 hour.
61    pub ttl: Option<Duration>,
62}
63
64impl Default for SchemaRegistryCacheConfig {
65    fn default() -> Self {
66        Self {
67            max_entries: 1000,
68            ttl: Some(Duration::from_secs(3600)),
69        }
70    }
71}
72
73/// A cached schema entry from the Schema Registry.
74#[derive(Debug, Clone)]
75pub struct CachedSchema {
76    /// Schema Registry schema ID.
77    pub id: i32,
78    /// Schema version within its subject.
79    pub version: i32,
80    /// The schema type.
81    pub schema_type: SchemaType,
82    /// Raw schema string (e.g., Avro JSON).
83    pub schema_str: String,
84    /// Derived Arrow schema for `RecordBatch` construction.
85    pub arrow_schema: SchemaRef,
86    /// When this entry was inserted or last accessed.
87    inserted_at: Instant,
88}
89
90/// Result of a compatibility check.
91#[derive(Debug, Clone)]
92pub struct CompatibilityResult {
93    /// Whether the schema is compatible.
94    pub is_compatible: bool,
95    /// Incompatibility reasons (if any).
96    pub messages: Vec<String>,
97}
98
99/// Async client for the Confluent Schema Registry REST API.
100///
101/// Provides schema lookup by ID and subject, caching with LRU eviction
102/// and TTL, compatibility checking, and Avro-to-Arrow schema conversion.
103pub struct SchemaRegistryClient {
104    client: Client,
105    base_url: String,
106    auth: Option<SrAuth>,
107    /// Cache by schema ID (foyer LRU with S3-FIFO eviction).
108    cache: Cache<i32, CachedSchema>,
109    /// Cache by subject name (latest version).
110    subject_cache: Cache<String, CachedSchema>,
111    /// Cache configuration.
112    cache_config: SchemaRegistryCacheConfig,
113}
114
115// -- Schema Registry REST API response types --
116
117#[derive(Deserialize)]
118struct SchemaByIdResponse {
119    schema: String,
120    #[serde(default = "default_schema_type")]
121    #[serde(rename = "schemaType")]
122    schema_type: String,
123}
124
125#[derive(Deserialize)]
126struct SchemaVersionResponse {
127    id: i32,
128    version: i32,
129    schema: String,
130    #[serde(default = "default_schema_type")]
131    #[serde(rename = "schemaType")]
132    schema_type: String,
133    #[allow(dead_code)]
134    subject: Option<String>,
135}
136
137#[derive(Deserialize)]
138struct CompatibilityResponse {
139    is_compatible: bool,
140    #[serde(default)]
141    messages: Vec<String>,
142}
143
144#[derive(Deserialize)]
145struct ConfigResponse {
146    #[serde(rename = "compatibilityLevel")]
147    compatibility_level: String,
148}
149
150#[derive(Serialize)]
151struct CompatibilityRequest {
152    schema: String,
153    #[serde(rename = "schemaType")]
154    schema_type: String,
155}
156
157#[derive(Serialize)]
158struct ConfigUpdateRequest {
159    compatibility: String,
160}
161
162#[derive(Serialize)]
163struct RegisterSchemaRequest {
164    schema: String,
165    #[serde(rename = "schemaType")]
166    schema_type: String,
167}
168
169#[derive(Deserialize)]
170struct RegisterSchemaResponse {
171    id: i32,
172}
173
174fn default_schema_type() -> String {
175    "AVRO".to_string()
176}
177
178impl SchemaRegistryClient {
179    /// Creates a new Schema Registry client with default cache config.
180    #[must_use]
181    pub fn new(base_url: impl Into<String>, auth: Option<SrAuth>) -> Self {
182        Self::with_cache_config(base_url, auth, SchemaRegistryCacheConfig::default())
183    }
184
185    /// Creates a TLS client (CA cert only). Delegates to [`Self::with_tls_mtls`].
186    ///
187    /// # Errors
188    ///
189    /// Returns `ConnectorError::ConfigurationError` if the CA cert cannot be read.
190    pub fn with_tls(
191        base_url: impl Into<String>,
192        auth: Option<SrAuth>,
193        ca_cert_path: &str,
194    ) -> Result<Self, ConnectorError> {
195        Self::with_tls_mtls(base_url, auth, ca_cert_path, None, None)
196    }
197
198    /// Creates a client with full TLS/mTLS support.
199    ///
200    /// # Errors
201    ///
202    /// Returns `ConnectorError::ConfigurationError` if any cert/key file
203    /// cannot be read or parsed.
204    pub fn with_tls_mtls(
205        base_url: impl Into<String>,
206        auth: Option<SrAuth>,
207        ca_cert_path: &str,
208        client_cert_path: Option<&str>,
209        client_key_path: Option<&str>,
210    ) -> Result<Self, ConnectorError> {
211        let pem = std::fs::read(ca_cert_path).map_err(|e| {
212            ConnectorError::ConfigurationError(format!(
213                "failed to read SR CA cert at '{ca_cert_path}': {e}"
214            ))
215        })?;
216        let cert = reqwest::tls::Certificate::from_pem(&pem).map_err(|e| {
217            ConnectorError::ConfigurationError(format!(
218                "invalid PEM CA cert at '{ca_cert_path}': {e}"
219            ))
220        })?;
221
222        let mut builder = Client::builder().add_root_certificate(cert);
223
224        if client_cert_path.is_some() != client_key_path.is_some() {
225            return Err(ConnectorError::ConfigurationError(
226                "mTLS requires both client cert and key — only one was provided".into(),
227            ));
228        }
229        if let (Some(cert_path), Some(key_path)) = (client_cert_path, client_key_path) {
230            let mut identity_pem = std::fs::read(cert_path).map_err(|e| {
231                ConnectorError::ConfigurationError(format!(
232                    "failed to read SR client cert at '{cert_path}': {e}"
233                ))
234            })?;
235            let key_pem = std::fs::read(key_path).map_err(|e| {
236                ConnectorError::ConfigurationError(format!(
237                    "failed to read SR client key at '{key_path}': {e}"
238                ))
239            })?;
240            // reqwest Identity expects cert + key concatenated in PEM format.
241            identity_pem.extend_from_slice(&key_pem);
242            let identity = reqwest::tls::Identity::from_pem(&identity_pem).map_err(|e| {
243                ConnectorError::ConfigurationError(format!("invalid client cert/key PEM: {e}"))
244            })?;
245            builder = builder.identity(identity);
246        }
247
248        let client = builder.build().map_err(|e| {
249            ConnectorError::ConfigurationError(format!("failed to build TLS client: {e}"))
250        })?;
251
252        let cache_config = SchemaRegistryCacheConfig::default();
253        let cache = CacheBuilder::new(cache_config.max_entries)
254            .with_shards(4)
255            .build();
256        let subject_cache = CacheBuilder::new(256).with_shards(4).build();
257        Ok(Self {
258            client,
259            base_url: base_url.into().trim_end_matches('/').to_string(),
260            auth,
261            cache,
262            subject_cache,
263            cache_config,
264        })
265    }
266
267    /// Creates a new Schema Registry client with custom cache config.
268    #[must_use]
269    pub fn with_cache_config(
270        base_url: impl Into<String>,
271        auth: Option<SrAuth>,
272        cache_config: SchemaRegistryCacheConfig,
273    ) -> Self {
274        let cache = CacheBuilder::new(cache_config.max_entries)
275            .with_shards(4)
276            .build();
277        // Subject cache is small — one entry per subject
278        let subject_cache = CacheBuilder::new(256).with_shards(4).build();
279        Self {
280            client: Client::new(),
281            base_url: base_url.into().trim_end_matches('/').to_string(),
282            auth,
283            cache,
284            subject_cache,
285            cache_config,
286        }
287    }
288
289    /// Returns the base URL of the Schema Registry.
290    #[must_use]
291    pub fn base_url(&self) -> &str {
292        &self.base_url
293    }
294
295    /// Returns `true` if authentication is configured.
296    #[must_use]
297    pub fn has_auth(&self) -> bool {
298        self.auth.is_some()
299    }
300
301    /// Returns the cache configuration.
302    #[must_use]
303    pub fn cache_config(&self) -> &SchemaRegistryCacheConfig {
304        &self.cache_config
305    }
306
307    /// Inserts a schema into the cache.
308    ///
309    /// foyer handles LRU eviction internally with S3-FIFO.
310    fn cache_insert(&self, id: i32, mut schema: CachedSchema) {
311        schema.inserted_at = Instant::now();
312        self.cache.insert(id, schema);
313    }
314
315    /// Gets from cache, returning `None` if expired.
316    ///
317    /// TTL is checked lazily on access — expired entries are removed
318    /// and treated as cache misses.
319    fn cache_get(&self, id: i32) -> Option<CachedSchema> {
320        let entry = self.cache.get(&id)?;
321        let schema = entry.value();
322        if let Some(ttl) = self.cache_config.ttl {
323            if schema.inserted_at.elapsed() > ttl {
324                drop(entry);
325                self.cache.remove(&id);
326                return None;
327            }
328        }
329        // foyer's get() already promotes the entry in the eviction policy
330        Some(schema.clone())
331    }
332
333    /// Fetches a schema by its global ID.
334    ///
335    /// Results are cached for subsequent lookups.
336    ///
337    /// # Errors
338    ///
339    /// Returns `ConnectorError` if the HTTP request fails or the schema
340    /// cannot be parsed.
341    pub async fn get_schema_by_id(&self, id: i32) -> Result<CachedSchema, ConnectorError> {
342        if let Some(cached) = self.cache_get(id) {
343            return Ok(cached);
344        }
345
346        let url = format!("{}/schemas/ids/{}", self.base_url, id);
347        let resp: SchemaByIdResponse = self.get_json(&url).await?;
348
349        let schema_type: SchemaType = resp.schema_type.parse()?;
350        let arrow_schema = avro_to_arrow_schema(&resp.schema)?;
351
352        let cached = CachedSchema {
353            id,
354            version: 0, // not available from this endpoint
355            schema_type,
356            schema_str: resp.schema,
357            arrow_schema,
358            inserted_at: Instant::now(),
359        };
360        self.cache_insert(id, cached.clone());
361        Ok(cached)
362    }
363
364    /// Fetches the latest schema version for a subject.
365    ///
366    /// # Errors
367    ///
368    /// Returns `ConnectorError` if the HTTP request fails.
369    pub async fn get_latest_schema(&self, subject: &str) -> Result<CachedSchema, ConnectorError> {
370        let url = format!("{}/subjects/{}/versions/latest", self.base_url, subject);
371        let resp: SchemaVersionResponse = self.get_json(&url).await?;
372
373        let schema_type: SchemaType = resp.schema_type.parse()?;
374        let arrow_schema = avro_to_arrow_schema(&resp.schema)?;
375
376        let cached = CachedSchema {
377            id: resp.id,
378            version: resp.version,
379            schema_type,
380            schema_str: resp.schema,
381            arrow_schema,
382            inserted_at: Instant::now(),
383        };
384
385        self.cache_insert(resp.id, cached.clone());
386        self.subject_cache
387            .insert(subject.to_string(), cached.clone());
388        Ok(cached)
389    }
390
391    /// Fetches a specific schema version for a subject.
392    ///
393    /// # Errors
394    ///
395    /// Returns `ConnectorError` if the HTTP request fails.
396    pub async fn get_schema_version(
397        &self,
398        subject: &str,
399        version: i32,
400    ) -> Result<CachedSchema, ConnectorError> {
401        let url = format!(
402            "{}/subjects/{}/versions/{}",
403            self.base_url, subject, version
404        );
405        let resp: SchemaVersionResponse = self.get_json(&url).await?;
406
407        let schema_type: SchemaType = resp.schema_type.parse()?;
408        let arrow_schema = avro_to_arrow_schema(&resp.schema)?;
409
410        let cached = CachedSchema {
411            id: resp.id,
412            version: resp.version,
413            schema_type,
414            schema_str: resp.schema,
415            arrow_schema,
416            inserted_at: Instant::now(),
417        };
418        self.cache_insert(resp.id, cached.clone());
419        Ok(cached)
420    }
421
422    /// Checks compatibility of a schema against the latest version.
423    ///
424    /// # Errors
425    ///
426    /// Returns `ConnectorError` if the HTTP request fails.
427    pub async fn check_compatibility(
428        &self,
429        subject: &str,
430        schema_str: &str,
431    ) -> Result<CompatibilityResult, ConnectorError> {
432        let url = format!(
433            "{}/compatibility/subjects/{}/versions/latest",
434            self.base_url, subject
435        );
436
437        let body = CompatibilityRequest {
438            schema: schema_str.to_string(),
439            schema_type: "AVRO".to_string(),
440        };
441
442        let mut req = self.client.post(&url).json(&body);
443        if let Some(ref auth) = self.auth {
444            req = req.basic_auth(&auth.username, Some(&auth.password));
445        }
446
447        let resp = req
448            .send()
449            .await
450            .map_err(|e| ConnectorError::ConnectionFailed(format!("schema registry: {e}")))?;
451
452        if !resp.status().is_success() {
453            let status = resp.status();
454            let text = resp.text().await.unwrap_or_default();
455            return Err(ConnectorError::ConnectionFailed(format!(
456                "schema registry compatibility check failed: {status} {text}"
457            )));
458        }
459
460        let result: CompatibilityResponse = resp.json().await.map_err(|e| {
461            ConnectorError::Internal(format!("failed to parse compatibility response: {e}"))
462        })?;
463
464        Ok(CompatibilityResult {
465            is_compatible: result.is_compatible,
466            messages: result.messages,
467        })
468    }
469
470    /// Gets the compatibility level for a subject.
471    ///
472    /// # Errors
473    ///
474    /// Returns `ConnectorError` if the HTTP request fails.
475    pub async fn get_compatibility_level(
476        &self,
477        subject: &str,
478    ) -> Result<CompatibilityLevel, ConnectorError> {
479        let url = format!("{}/config/{}", self.base_url, subject);
480        let resp: ConfigResponse = self.get_json(&url).await?;
481        resp.compatibility_level.parse()
482    }
483
484    /// Sets the compatibility level for a subject.
485    ///
486    /// # Errors
487    ///
488    /// Returns `ConnectorError` if the HTTP request fails.
489    pub async fn set_compatibility_level(
490        &self,
491        subject: &str,
492        level: CompatibilityLevel,
493    ) -> Result<(), ConnectorError> {
494        let url = format!("{}/config/{}", self.base_url, subject);
495        let body = ConfigUpdateRequest {
496            compatibility: level.as_str().to_string(),
497        };
498
499        let mut req = self.client.put(&url).json(&body);
500        if let Some(ref auth) = self.auth {
501            req = req.basic_auth(&auth.username, Some(&auth.password));
502        }
503
504        let resp = req
505            .send()
506            .await
507            .map_err(|e| ConnectorError::ConnectionFailed(format!("schema registry: {e}")))?;
508
509        if !resp.status().is_success() {
510            let status = resp.status();
511            let text = resp.text().await.unwrap_or_default();
512            return Err(ConnectorError::ConnectionFailed(format!(
513                "schema registry config update failed: {status} {text}"
514            )));
515        }
516
517        Ok(())
518    }
519
520    /// Resolves a Confluent schema ID, returning from cache if available.
521    ///
522    /// This is the hot-path method called during Avro deserialization to
523    /// look up schemas by the 4-byte ID in the Confluent wire format.
524    ///
525    /// # Errors
526    ///
527    /// Returns `ConnectorError` if the schema cannot be fetched.
528    pub async fn resolve_confluent_id(&self, id: i32) -> Result<CachedSchema, ConnectorError> {
529        self.get_schema_by_id(id).await
530    }
531
532    /// Registers a schema with the Schema Registry under the given subject.
533    ///
534    /// Returns the schema ID assigned by the registry. Caches the result
535    /// so subsequent calls with the same subject return immediately.
536    ///
537    /// # Errors
538    ///
539    /// Returns `ConnectorError` if the HTTP request fails or the response
540    /// is malformed.
541    pub async fn register_schema(
542        &self,
543        subject: &str,
544        schema_str: &str,
545        schema_type: SchemaType,
546    ) -> Result<i32, ConnectorError> {
547        // Check subject cache — only return cached ID if schema hasn't changed.
548        if let Some(entry) = self.subject_cache.get(subject) {
549            if entry.value().schema_str == schema_str {
550                return Ok(entry.value().id);
551            }
552        }
553
554        let url = format!("{}/subjects/{}/versions", self.base_url, subject);
555        let body = RegisterSchemaRequest {
556            schema: schema_str.to_string(),
557            schema_type: schema_type.to_string(),
558        };
559
560        let mut req = self.client.post(&url).json(&body);
561        if let Some(ref auth) = self.auth {
562            req = req.basic_auth(&auth.username, Some(&auth.password));
563        }
564
565        let resp = req
566            .send()
567            .await
568            .map_err(|e| ConnectorError::ConnectionFailed(format!("schema registry: {e}")))?;
569
570        if !resp.status().is_success() {
571            let status = resp.status();
572            let text = resp.text().await.unwrap_or_default();
573            return Err(ConnectorError::ConnectionFailed(format!(
574                "schema registry register failed: {status} {text}"
575            )));
576        }
577
578        let result: RegisterSchemaResponse = resp.json().await.map_err(|e| {
579            ConnectorError::Internal(format!("failed to parse register schema response: {e}"))
580        })?;
581
582        let arrow_schema = avro_to_arrow_schema(schema_str)?;
583        let cached = CachedSchema {
584            id: result.id,
585            version: 0,
586            schema_type,
587            schema_str: schema_str.to_string(),
588            arrow_schema,
589            inserted_at: Instant::now(),
590        };
591        self.cache_insert(result.id, cached.clone());
592        self.subject_cache.insert(subject.to_string(), cached);
593
594        Ok(result.id)
595    }
596
597    /// Validates compatibility and registers a schema in one step.
598    ///
599    /// If the subject already has schemas registered, checks compatibility
600    /// first. Returns `SerdeError::SchemaIncompatible` if the new schema
601    /// is not compatible with the existing versions.
602    ///
603    /// # Errors
604    ///
605    /// Returns `ConnectorError::Serde(SchemaIncompatible)` if incompatible,
606    /// or `ConnectorError` for HTTP/network errors.
607    pub async fn validate_and_register_schema(
608        &self,
609        subject: &str,
610        schema_str: &str,
611        schema_type: SchemaType,
612    ) -> Result<i32, ConnectorError> {
613        // Check compatibility first (404 means no existing schema — OK to proceed).
614        match self.check_compatibility(subject, schema_str).await {
615            Ok(result) => {
616                if !result.is_compatible {
617                    let message = if result.messages.is_empty() {
618                        "new schema is not compatible with existing version".to_string()
619                    } else {
620                        result.messages.join("; ")
621                    };
622                    return Err(ConnectorError::Serde(SerdeError::SchemaIncompatible {
623                        subject: subject.to_string(),
624                        message,
625                    }));
626                }
627            }
628            Err(ConnectorError::ConnectionFailed(msg)) if msg.contains("404") => {
629                // No existing schema — first registration, skip compatibility.
630            }
631            Err(e) => return Err(e),
632        }
633
634        self.register_schema(subject, schema_str, schema_type).await
635    }
636
637    /// Returns `true` if the schema ID is in the local cache.
638    #[must_use]
639    pub fn is_cached(&self, id: i32) -> bool {
640        self.cache.contains(&id)
641    }
642
643    /// Returns the number of cached schemas.
644    #[must_use]
645    pub fn cache_size(&self) -> usize {
646        self.cache.usage()
647    }
648
649    /// Helper to perform a GET request and deserialize JSON.
650    ///
651    /// Retries transient failures (5xx, timeouts) up to 3 attempts with
652    /// exponential backoff (100ms, 500ms). 4xx client errors fail immediately.
653    async fn get_json<T: serde::de::DeserializeOwned>(
654        &self,
655        url: &str,
656    ) -> Result<T, ConnectorError> {
657        let backoffs = [
658            std::time::Duration::from_millis(100),
659            std::time::Duration::from_millis(500),
660        ];
661        let mut last_err = None;
662
663        for (attempt, backoff) in std::iter::once(&std::time::Duration::ZERO)
664            .chain(backoffs.iter())
665            .enumerate()
666        {
667            if attempt > 0 {
668                tokio::time::sleep(*backoff).await;
669            }
670
671            let mut req = self.client.get(url);
672            if let Some(ref auth) = self.auth {
673                req = req.basic_auth(&auth.username, Some(&auth.password));
674            }
675
676            let resp = match req.send().await {
677                Ok(r) => r,
678                Err(e) => {
679                    tracing::warn!(
680                        attempt = attempt + 1,
681                        error = %e,
682                        "schema registry request failed, retrying"
683                    );
684                    last_err = Some(ConnectorError::ConnectionFailed(format!(
685                        "schema registry: {e}"
686                    )));
687                    continue;
688                }
689            };
690
691            let status = resp.status();
692            if status.is_success() {
693                return resp.json::<T>().await.map_err(|e| {
694                    ConnectorError::Internal(format!(
695                        "failed to parse schema registry response: {e}"
696                    ))
697                });
698            }
699
700            // Client errors (4xx) are not retryable.
701            if status.is_client_error() {
702                let text = resp.text().await.unwrap_or_default();
703                return Err(ConnectorError::ConnectionFailed(format!(
704                    "schema registry client error: {status} {text}"
705                )));
706            }
707
708            // Server errors (5xx) are retryable.
709            let text = resp.text().await.unwrap_or_default();
710            tracing::warn!(
711                attempt = attempt + 1,
712                status = %status,
713                "schema registry server error, retrying"
714            );
715            last_err = Some(ConnectorError::ConnectionFailed(format!(
716                "schema registry request failed: {status} {text}"
717            )));
718        }
719
720        Err(last_err.unwrap_or_else(|| {
721            ConnectorError::ConnectionFailed("schema registry: all retries exhausted".into())
722        }))
723    }
724}
725
726impl std::fmt::Debug for SchemaRegistryClient {
727    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728        f.debug_struct("SchemaRegistryClient")
729            .field("base_url", &self.base_url)
730            .field("has_auth", &self.auth.is_some())
731            .field("cached_schemas", &self.cache.usage())
732            .field("cached_subjects", &self.subject_cache.usage())
733            .finish_non_exhaustive()
734    }
735}
736
737/// Converts an Avro JSON schema string to an Arrow [`SchemaRef`].
738///
739/// Supports Avro record schemas with primitive field types.
740///
741/// # Errors
742///
743/// Returns `ConnectorError::SchemaMismatch` if the schema JSON is invalid
744/// or contains unsupported types.
745pub fn avro_to_arrow_schema(avro_schema_str: &str) -> Result<SchemaRef, ConnectorError> {
746    let avro: serde_json::Value = serde_json::from_str(avro_schema_str)
747        .map_err(|e| ConnectorError::SchemaMismatch(format!("invalid Avro schema JSON: {e}")))?;
748
749    let fields_val = avro.get("fields").ok_or_else(|| {
750        ConnectorError::SchemaMismatch("Avro schema missing 'fields' array".into())
751    })?;
752
753    let fields_arr = fields_val.as_array().ok_or_else(|| {
754        ConnectorError::SchemaMismatch("Avro schema 'fields' is not an array".into())
755    })?;
756
757    let mut arrow_fields = Vec::with_capacity(fields_arr.len());
758    for field in fields_arr {
759        let name = field
760            .get("name")
761            .and_then(|v| v.as_str())
762            .ok_or_else(|| ConnectorError::SchemaMismatch("Avro field missing 'name'".into()))?;
763
764        let (data_type, nullable) = parse_avro_type(field.get("type").ok_or_else(|| {
765            ConnectorError::SchemaMismatch(format!("Avro field '{name}' missing 'type'"))
766        })?)?;
767
768        arrow_fields.push(Field::new(name, data_type, nullable));
769    }
770
771    Ok(Arc::new(Schema::new(arrow_fields)))
772}
773
774/// Parses an Avro type definition to an Arrow `DataType` and nullable flag.
775#[allow(clippy::too_many_lines)]
776fn parse_avro_type(avro_type: &serde_json::Value) -> Result<(DataType, bool), ConnectorError> {
777    match avro_type {
778        serde_json::Value::String(s) => Ok((avro_primitive_to_arrow(s)?, false)),
779        serde_json::Value::Array(union) => {
780            // Union type — check for ["null", T] pattern
781            let non_null: Vec<_> = union
782                .iter()
783                .filter(|v| v.as_str() != Some("null"))
784                .collect();
785            let nullable = union.iter().any(|v| v.as_str() == Some("null"));
786
787            if non_null.len() == 1 {
788                let (dt, _) = parse_avro_type(non_null[0])?;
789                Ok((dt, nullable))
790            } else {
791                // Multi-type union — fall back to string
792                Ok((DataType::Utf8, nullable))
793            }
794        }
795        serde_json::Value::Object(obj) => {
796            // Check logical type first.
797            if let Some(logical) = obj.get("logicalType").and_then(|v| v.as_str()) {
798                return match logical {
799                    "timestamp-millis" | "timestamp-micros" => Ok((DataType::Int64, false)),
800                    "date" => Ok((DataType::Int32, false)),
801                    "decimal" => Ok((DataType::Float64, false)),
802                    _ => Ok((DataType::Utf8, false)),
803                };
804            }
805
806            let type_str = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
807            match type_str {
808                "array" => {
809                    let items = obj.get("items").ok_or_else(|| {
810                        ConnectorError::SchemaMismatch("Avro array type missing 'items'".into())
811                    })?;
812                    let (item_type, _) = parse_avro_type(items)?;
813                    Ok((
814                        DataType::List(Arc::new(Field::new("item", item_type, true))),
815                        false,
816                    ))
817                }
818                "map" => {
819                    let values = obj.get("values").ok_or_else(|| {
820                        ConnectorError::SchemaMismatch("Avro map type missing 'values'".into())
821                    })?;
822                    let (value_type, _) = parse_avro_type(values)?;
823                    Ok((
824                        DataType::Map(
825                            Arc::new(Field::new(
826                                "entries",
827                                DataType::Struct(Fields::from(vec![
828                                    Field::new("key", DataType::Utf8, false),
829                                    Field::new("value", value_type, true),
830                                ])),
831                                false,
832                            )),
833                            false,
834                        ),
835                        false,
836                    ))
837                }
838                "record" => {
839                    let fields_val = obj.get("fields").ok_or_else(|| {
840                        ConnectorError::SchemaMismatch("Avro nested record missing 'fields'".into())
841                    })?;
842                    let fields_arr = fields_val.as_array().ok_or_else(|| {
843                        ConnectorError::SchemaMismatch(
844                            "Avro nested record 'fields' is not an array".into(),
845                        )
846                    })?;
847                    let mut arrow_fields = Vec::with_capacity(fields_arr.len());
848                    for f in fields_arr {
849                        let name = f.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
850                            ConnectorError::SchemaMismatch(
851                                "Avro nested record field missing 'name'".into(),
852                            )
853                        })?;
854                        let f_type = f.get("type").ok_or_else(|| {
855                            ConnectorError::SchemaMismatch(format!(
856                                "Avro nested field '{name}' missing 'type'"
857                            ))
858                        })?;
859                        let (dt, nullable) = parse_avro_type(f_type)?;
860                        arrow_fields.push(Field::new(name, dt, nullable));
861                    }
862                    Ok((DataType::Struct(Fields::from(arrow_fields)), false))
863                }
864                "enum" => Ok((
865                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
866                    false,
867                )),
868                "fixed" => {
869                    let size = obj
870                        .get("size")
871                        .and_then(serde_json::Value::as_u64)
872                        .ok_or_else(|| {
873                            ConnectorError::SchemaMismatch("Avro fixed type missing 'size'".into())
874                        })?;
875                    #[allow(clippy::cast_possible_truncation)]
876                    Ok((DataType::FixedSizeBinary(size as i32), false))
877                }
878                _ => Ok((DataType::Utf8, false)),
879            }
880        }
881        _ => Err(ConnectorError::SchemaMismatch(format!(
882            "unsupported Avro type: {avro_type}"
883        ))),
884    }
885}
886
887/// Maps an Avro primitive type name to Arrow `DataType`.
888fn avro_primitive_to_arrow(avro_type: &str) -> Result<DataType, ConnectorError> {
889    match avro_type {
890        "null" => Ok(DataType::Null),
891        "boolean" => Ok(DataType::Boolean),
892        "int" => Ok(DataType::Int32),
893        "long" => Ok(DataType::Int64),
894        "float" => Ok(DataType::Float32),
895        "double" => Ok(DataType::Float64),
896        "bytes" => Ok(DataType::Binary),
897        "string" => Ok(DataType::Utf8),
898        other => Err(ConnectorError::SchemaMismatch(format!(
899            "unsupported Avro primitive type: '{other}'"
900        ))),
901    }
902}
903
904/// Converts an Arrow [`SchemaRef`] to an Avro JSON schema string.
905///
906/// Generates a record schema named `"record"` with fields mapped from
907/// Arrow data types to Avro primitives.
908///
909/// # Errors
910///
911/// Returns `SerdeError` if an Arrow type has no Avro equivalent.
912pub fn arrow_to_avro_schema(schema: &SchemaRef, record_name: &str) -> Result<String, SerdeError> {
913    let mut fields = Vec::with_capacity(schema.fields().len());
914
915    for field in schema.fields() {
916        let avro_type = arrow_to_avro_type(field.data_type())?;
917
918        let field_type = if field.is_nullable() {
919            serde_json::json!(["null", avro_type])
920        } else {
921            avro_type
922        };
923
924        fields.push(serde_json::json!({
925            "name": field.name(),
926            "type": field_type,
927        }));
928    }
929
930    let schema = serde_json::json!({
931        "type": "record",
932        "name": record_name,
933        "fields": fields,
934    });
935
936    serde_json::to_string(&schema)
937        .map_err(|e| SerdeError::MalformedInput(format!("failed to serialize Avro schema: {e}")))
938}
939
940/// Maps an Arrow `DataType` to an Avro type JSON value.
941fn arrow_to_avro_type(data_type: &DataType) -> Result<serde_json::Value, SerdeError> {
942    match data_type {
943        DataType::Null => Ok(serde_json::json!("null")),
944        DataType::Boolean => Ok(serde_json::json!("boolean")),
945        DataType::Int8
946        | DataType::Int16
947        | DataType::Int32
948        | DataType::UInt8
949        | DataType::UInt16
950        | DataType::UInt32 => Ok(serde_json::json!("int")),
951        DataType::Int64 | DataType::UInt64 => Ok(serde_json::json!("long")),
952        DataType::Float32 => Ok(serde_json::json!("float")),
953        DataType::Float64 => Ok(serde_json::json!("double")),
954        DataType::Utf8 | DataType::LargeUtf8 => Ok(serde_json::json!("string")),
955        DataType::Binary | DataType::LargeBinary => Ok(serde_json::json!("bytes")),
956        DataType::List(item_field) => {
957            let items = arrow_to_avro_type(item_field.data_type())?;
958            Ok(serde_json::json!({
959                "type": "array",
960                "items": items,
961            }))
962        }
963        DataType::Map(entries_field, _) => {
964            // Map entries field is a Struct with "key" and "value" children.
965            if let DataType::Struct(fields) = entries_field.data_type() {
966                let value_field = fields.iter().find(|f| f.name() == "value").ok_or_else(|| {
967                    SerdeError::UnsupportedFormat(
968                        "Arrow Map missing 'value' field in entries struct".into(),
969                    )
970                })?;
971                let values = arrow_to_avro_type(value_field.data_type())?;
972                Ok(serde_json::json!({
973                    "type": "map",
974                    "values": values,
975                }))
976            } else {
977                Err(SerdeError::UnsupportedFormat(
978                    "Arrow Map entries field is not a Struct".into(),
979                ))
980            }
981        }
982        DataType::Struct(fields) => {
983            let mut avro_fields = Vec::with_capacity(fields.len());
984            for field in fields {
985                let avro_type = arrow_to_avro_type(field.data_type())?;
986                let field_type = if field.is_nullable() {
987                    serde_json::json!(["null", avro_type])
988                } else {
989                    avro_type
990                };
991                avro_fields.push(serde_json::json!({
992                    "name": field.name(),
993                    "type": field_type,
994                }));
995            }
996            Ok(serde_json::json!({
997                "type": "record",
998                "name": "nested",
999                "fields": avro_fields,
1000            }))
1001        }
1002        DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Utf8 => {
1003            Ok(serde_json::json!({
1004                "type": "enum",
1005                "name": "enum_field",
1006                "symbols": [],
1007            }))
1008        }
1009        DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
1010            Ok(serde_json::json!({"type": "long", "logicalType": "timestamp-millis"}))
1011        }
1012        DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
1013            Ok(serde_json::json!({"type": "long", "logicalType": "timestamp-micros"}))
1014        }
1015        DataType::Date32 => Ok(serde_json::json!({"type": "int", "logicalType": "date"})),
1016        DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
1017            Ok(serde_json::json!({"type": "int", "logicalType": "time-millis"}))
1018        }
1019        DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
1020            Ok(serde_json::json!({"type": "long", "logicalType": "time-micros"}))
1021        }
1022        DataType::FixedSizeBinary(size) => Ok(serde_json::json!({
1023            "type": "fixed",
1024            "name": "fixed_field",
1025            "size": size,
1026        })),
1027        other => Err(SerdeError::UnsupportedFormat(format!(
1028            "no Avro equivalent for Arrow type: {other}"
1029        ))),
1030    }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035    use super::*;
1036
1037    #[test]
1038    fn test_avro_to_arrow_simple_record() {
1039        let avro = r#"{
1040            "type": "record",
1041            "name": "test",
1042            "fields": [
1043                {"name": "id", "type": "long"},
1044                {"name": "name", "type": "string"},
1045                {"name": "active", "type": "boolean"}
1046            ]
1047        }"#;
1048
1049        let schema = avro_to_arrow_schema(avro).unwrap();
1050        assert_eq!(schema.fields().len(), 3);
1051        assert_eq!(schema.field(0).name(), "id");
1052        assert_eq!(schema.field(0).data_type(), &DataType::Int64);
1053        assert!(!schema.field(0).is_nullable());
1054        assert_eq!(schema.field(1).name(), "name");
1055        assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1056        assert_eq!(schema.field(2).name(), "active");
1057        assert_eq!(schema.field(2).data_type(), &DataType::Boolean);
1058    }
1059
1060    #[test]
1061    fn test_avro_to_arrow_nullable_union() {
1062        let avro = r#"{
1063            "type": "record",
1064            "name": "test",
1065            "fields": [
1066                {"name": "id", "type": "long"},
1067                {"name": "email", "type": ["null", "string"]}
1068            ]
1069        }"#;
1070
1071        let schema = avro_to_arrow_schema(avro).unwrap();
1072        assert_eq!(schema.fields().len(), 2);
1073        assert!(!schema.field(0).is_nullable());
1074        assert!(schema.field(1).is_nullable());
1075        assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1076    }
1077
1078    #[test]
1079    fn test_avro_to_arrow_all_primitives() {
1080        let avro = r#"{
1081            "type": "record",
1082            "name": "test",
1083            "fields": [
1084                {"name": "b", "type": "boolean"},
1085                {"name": "i", "type": "int"},
1086                {"name": "l", "type": "long"},
1087                {"name": "f", "type": "float"},
1088                {"name": "d", "type": "double"},
1089                {"name": "s", "type": "string"},
1090                {"name": "raw", "type": "bytes"}
1091            ]
1092        }"#;
1093
1094        let schema = avro_to_arrow_schema(avro).unwrap();
1095        assert_eq!(schema.field(0).data_type(), &DataType::Boolean);
1096        assert_eq!(schema.field(1).data_type(), &DataType::Int32);
1097        assert_eq!(schema.field(2).data_type(), &DataType::Int64);
1098        assert_eq!(schema.field(3).data_type(), &DataType::Float32);
1099        assert_eq!(schema.field(4).data_type(), &DataType::Float64);
1100        assert_eq!(schema.field(5).data_type(), &DataType::Utf8);
1101        assert_eq!(schema.field(6).data_type(), &DataType::Binary);
1102    }
1103
1104    #[test]
1105    fn test_avro_to_arrow_invalid_json() {
1106        assert!(avro_to_arrow_schema("not json").is_err());
1107    }
1108
1109    #[test]
1110    fn test_avro_to_arrow_missing_fields() {
1111        let avro = r#"{"type": "record", "name": "test"}"#;
1112        assert!(avro_to_arrow_schema(avro).is_err());
1113    }
1114
1115    #[test]
1116    fn test_schema_type_parsing() {
1117        assert_eq!("AVRO".parse::<SchemaType>().unwrap(), SchemaType::Avro);
1118        assert_eq!(
1119            "PROTOBUF".parse::<SchemaType>().unwrap(),
1120            SchemaType::Protobuf
1121        );
1122        assert_eq!("JSON".parse::<SchemaType>().unwrap(), SchemaType::Json);
1123        assert!("UNKNOWN".parse::<SchemaType>().is_err());
1124    }
1125
1126    #[test]
1127    fn test_schema_type_display() {
1128        assert_eq!(SchemaType::Avro.to_string(), "AVRO");
1129        assert_eq!(SchemaType::Protobuf.to_string(), "PROTOBUF");
1130        assert_eq!(SchemaType::Json.to_string(), "JSON");
1131    }
1132
1133    #[test]
1134    fn test_client_creation() {
1135        let client = SchemaRegistryClient::new("http://localhost:8081", None);
1136        assert_eq!(client.base_url(), "http://localhost:8081");
1137        assert!(!client.has_auth());
1138        assert_eq!(client.cache_size(), 0);
1139    }
1140
1141    #[test]
1142    fn test_client_with_auth() {
1143        let auth = SrAuth {
1144            username: "user".into(),
1145            password: "pass".into(),
1146        };
1147        let client = SchemaRegistryClient::new("http://localhost:8081", Some(auth));
1148        assert!(client.has_auth());
1149    }
1150
1151    #[test]
1152    fn test_client_trailing_slash_stripped() {
1153        let client = SchemaRegistryClient::new("http://localhost:8081/", None);
1154        assert_eq!(client.base_url(), "http://localhost:8081");
1155    }
1156
1157    #[test]
1158    fn test_arrow_to_avro_schema_simple() {
1159        let schema = Arc::new(Schema::new(vec![
1160            Field::new("id", DataType::Int64, false),
1161            Field::new("name", DataType::Utf8, false),
1162        ]));
1163
1164        let avro_str = arrow_to_avro_schema(&schema, "test_record").unwrap();
1165        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1166
1167        assert_eq!(avro["type"], "record");
1168        assert_eq!(avro["name"], "test_record");
1169
1170        let fields = avro["fields"].as_array().unwrap();
1171        assert_eq!(fields.len(), 2);
1172        assert_eq!(fields[0]["name"], "id");
1173        assert_eq!(fields[0]["type"], "long");
1174        assert_eq!(fields[1]["name"], "name");
1175        assert_eq!(fields[1]["type"], "string");
1176    }
1177
1178    #[test]
1179    fn test_arrow_to_avro_schema_nullable() {
1180        let schema = Arc::new(Schema::new(vec![
1181            Field::new("id", DataType::Int64, false),
1182            Field::new("email", DataType::Utf8, true),
1183        ]));
1184
1185        let avro_str = arrow_to_avro_schema(&schema, "record").unwrap();
1186        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1187
1188        let fields = avro["fields"].as_array().unwrap();
1189        // Non-nullable: plain type
1190        assert_eq!(fields[0]["type"], "long");
1191        // Nullable: union ["null", "string"]
1192        let union = fields[1]["type"].as_array().unwrap();
1193        assert_eq!(union.len(), 2);
1194        assert_eq!(union[0], "null");
1195        assert_eq!(union[1], "string");
1196    }
1197
1198    #[test]
1199    fn test_arrow_to_avro_all_primitives() {
1200        let schema = Arc::new(Schema::new(vec![
1201            Field::new("b", DataType::Boolean, false),
1202            Field::new("i32", DataType::Int32, false),
1203            Field::new("i64", DataType::Int64, false),
1204            Field::new("f32", DataType::Float32, false),
1205            Field::new("f64", DataType::Float64, false),
1206            Field::new("s", DataType::Utf8, false),
1207            Field::new("bin", DataType::Binary, false),
1208        ]));
1209
1210        let avro_str = arrow_to_avro_schema(&schema, "all_types").unwrap();
1211        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1212        let fields = avro["fields"].as_array().unwrap();
1213
1214        assert_eq!(fields[0]["type"], "boolean");
1215        assert_eq!(fields[1]["type"], "int");
1216        assert_eq!(fields[2]["type"], "long");
1217        assert_eq!(fields[3]["type"], "float");
1218        assert_eq!(fields[4]["type"], "double");
1219        assert_eq!(fields[5]["type"], "string");
1220        assert_eq!(fields[6]["type"], "bytes");
1221    }
1222
1223    #[test]
1224    fn test_arrow_to_avro_roundtrip() {
1225        let original = Arc::new(Schema::new(vec![
1226            Field::new("id", DataType::Int64, false),
1227            Field::new("name", DataType::Utf8, true),
1228            Field::new("active", DataType::Boolean, false),
1229        ]));
1230
1231        let avro_str = arrow_to_avro_schema(&original, "roundtrip").unwrap();
1232        let recovered = avro_to_arrow_schema(&avro_str).unwrap();
1233
1234        assert_eq!(recovered.fields().len(), 3);
1235        assert_eq!(recovered.field(0).data_type(), &DataType::Int64);
1236        assert!(!recovered.field(0).is_nullable());
1237        assert_eq!(recovered.field(1).data_type(), &DataType::Utf8);
1238        assert!(recovered.field(1).is_nullable());
1239        assert_eq!(recovered.field(2).data_type(), &DataType::Boolean);
1240    }
1241
1242    // ---- Complex type tests ----
1243
1244    #[test]
1245    fn test_avro_to_arrow_array_type() {
1246        let avro = r#"{
1247            "type": "record",
1248            "name": "test",
1249            "fields": [
1250                {"name": "tags", "type": {"type": "array", "items": "string"}}
1251            ]
1252        }"#;
1253
1254        let schema = avro_to_arrow_schema(avro).unwrap();
1255        assert_eq!(schema.fields().len(), 1);
1256        match schema.field(0).data_type() {
1257            DataType::List(item) => {
1258                assert_eq!(item.data_type(), &DataType::Utf8);
1259            }
1260            other => panic!("expected List, got {other:?}"),
1261        }
1262    }
1263
1264    #[test]
1265    fn test_avro_to_arrow_map_type() {
1266        let avro = r#"{
1267            "type": "record",
1268            "name": "test",
1269            "fields": [
1270                {"name": "metadata", "type": {"type": "map", "values": "long"}}
1271            ]
1272        }"#;
1273
1274        let schema = avro_to_arrow_schema(avro).unwrap();
1275        assert_eq!(schema.fields().len(), 1);
1276        match schema.field(0).data_type() {
1277            DataType::Map(entries, _) => {
1278                if let DataType::Struct(fields) = entries.data_type() {
1279                    assert_eq!(fields.len(), 2);
1280                    assert_eq!(fields[0].name(), "key");
1281                    assert_eq!(fields[0].data_type(), &DataType::Utf8);
1282                    assert_eq!(fields[1].name(), "value");
1283                    assert_eq!(fields[1].data_type(), &DataType::Int64);
1284                } else {
1285                    panic!("expected Struct entries");
1286                }
1287            }
1288            other => panic!("expected Map, got {other:?}"),
1289        }
1290    }
1291
1292    #[test]
1293    fn test_avro_to_arrow_nested_record() {
1294        let avro = r#"{
1295            "type": "record",
1296            "name": "test",
1297            "fields": [
1298                {
1299                    "name": "address",
1300                    "type": {
1301                        "type": "record",
1302                        "name": "Address",
1303                        "fields": [
1304                            {"name": "street", "type": "string"},
1305                            {"name": "zip", "type": "int"}
1306                        ]
1307                    }
1308                }
1309            ]
1310        }"#;
1311
1312        let schema = avro_to_arrow_schema(avro).unwrap();
1313        assert_eq!(schema.fields().len(), 1);
1314        match schema.field(0).data_type() {
1315            DataType::Struct(fields) => {
1316                assert_eq!(fields.len(), 2);
1317                assert_eq!(fields[0].name(), "street");
1318                assert_eq!(fields[0].data_type(), &DataType::Utf8);
1319                assert_eq!(fields[1].name(), "zip");
1320                assert_eq!(fields[1].data_type(), &DataType::Int32);
1321            }
1322            other => panic!("expected Struct, got {other:?}"),
1323        }
1324    }
1325
1326    #[test]
1327    fn test_avro_to_arrow_enum_type() {
1328        let avro = r#"{
1329            "type": "record",
1330            "name": "test",
1331            "fields": [
1332                {
1333                    "name": "status",
1334                    "type": {
1335                        "type": "enum",
1336                        "name": "Status",
1337                        "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
1338                    }
1339                }
1340            ]
1341        }"#;
1342
1343        let schema = avro_to_arrow_schema(avro).unwrap();
1344        assert_eq!(schema.fields().len(), 1);
1345        match schema.field(0).data_type() {
1346            DataType::Dictionary(key, value) => {
1347                assert_eq!(key.as_ref(), &DataType::Int32);
1348                assert_eq!(value.as_ref(), &DataType::Utf8);
1349            }
1350            other => panic!("expected Dictionary, got {other:?}"),
1351        }
1352    }
1353
1354    #[test]
1355    fn test_avro_to_arrow_fixed_type() {
1356        let avro = r#"{
1357            "type": "record",
1358            "name": "test",
1359            "fields": [
1360                {
1361                    "name": "uuid",
1362                    "type": {"type": "fixed", "name": "uuid", "size": 16}
1363                }
1364            ]
1365        }"#;
1366
1367        let schema = avro_to_arrow_schema(avro).unwrap();
1368        assert_eq!(schema.fields().len(), 1);
1369        assert_eq!(schema.field(0).data_type(), &DataType::FixedSizeBinary(16));
1370    }
1371
1372    #[test]
1373    fn test_avro_to_arrow_nullable_complex_in_union() {
1374        let avro = r#"{
1375            "type": "record",
1376            "name": "test",
1377            "fields": [
1378                {
1379                    "name": "tags",
1380                    "type": ["null", {"type": "array", "items": "string"}]
1381                }
1382            ]
1383        }"#;
1384
1385        let schema = avro_to_arrow_schema(avro).unwrap();
1386        assert!(schema.field(0).is_nullable());
1387        assert!(matches!(schema.field(0).data_type(), DataType::List(_)));
1388    }
1389
1390    #[test]
1391    fn test_avro_array_missing_items() {
1392        let avro = r#"{
1393            "type": "record",
1394            "name": "test",
1395            "fields": [
1396                {"name": "bad", "type": {"type": "array"}}
1397            ]
1398        }"#;
1399        assert!(avro_to_arrow_schema(avro).is_err());
1400    }
1401
1402    #[test]
1403    fn test_avro_map_missing_values() {
1404        let avro = r#"{
1405            "type": "record",
1406            "name": "test",
1407            "fields": [
1408                {"name": "bad", "type": {"type": "map"}}
1409            ]
1410        }"#;
1411        assert!(avro_to_arrow_schema(avro).is_err());
1412    }
1413
1414    #[test]
1415    fn test_arrow_to_avro_array_type() {
1416        let schema = Arc::new(Schema::new(vec![Field::new(
1417            "tags",
1418            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1419            false,
1420        )]));
1421
1422        let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1423        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1424        let field = &avro["fields"][0];
1425        assert_eq!(field["type"]["type"], "array");
1426        assert_eq!(field["type"]["items"], "string");
1427    }
1428
1429    #[test]
1430    fn test_arrow_to_avro_map_type() {
1431        let schema = Arc::new(Schema::new(vec![Field::new(
1432            "metadata",
1433            DataType::Map(
1434                Arc::new(Field::new(
1435                    "entries",
1436                    DataType::Struct(Fields::from(vec![
1437                        Field::new("key", DataType::Utf8, false),
1438                        Field::new("value", DataType::Int64, true),
1439                    ])),
1440                    false,
1441                )),
1442                false,
1443            ),
1444            false,
1445        )]));
1446
1447        let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1448        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1449        let field = &avro["fields"][0];
1450        assert_eq!(field["type"]["type"], "map");
1451        assert_eq!(field["type"]["values"], "long");
1452    }
1453
1454    #[test]
1455    fn test_arrow_to_avro_struct_type() {
1456        let schema = Arc::new(Schema::new(vec![Field::new(
1457            "address",
1458            DataType::Struct(Fields::from(vec![
1459                Field::new("street", DataType::Utf8, false),
1460                Field::new("zip", DataType::Int32, false),
1461            ])),
1462            false,
1463        )]));
1464
1465        let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1466        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1467        let field = &avro["fields"][0];
1468        assert_eq!(field["type"]["type"], "record");
1469        let nested = field["type"]["fields"].as_array().unwrap();
1470        assert_eq!(nested.len(), 2);
1471        assert_eq!(nested[0]["name"], "street");
1472        assert_eq!(nested[0]["type"], "string");
1473        assert_eq!(nested[1]["name"], "zip");
1474        assert_eq!(nested[1]["type"], "int");
1475    }
1476
1477    #[test]
1478    fn test_arrow_to_avro_fixed_type() {
1479        let schema = Arc::new(Schema::new(vec![Field::new(
1480            "uuid",
1481            DataType::FixedSizeBinary(16),
1482            false,
1483        )]));
1484
1485        let avro_str = arrow_to_avro_schema(&schema, "test").unwrap();
1486        let avro: serde_json::Value = serde_json::from_str(&avro_str).unwrap();
1487        let field = &avro["fields"][0];
1488        assert_eq!(field["type"]["type"], "fixed");
1489        assert_eq!(field["type"]["size"], 16);
1490    }
1491
1492    // ---- Cache eviction tests ----
1493
1494    fn make_cached_schema(id: i32) -> CachedSchema {
1495        CachedSchema {
1496            id,
1497            version: 1,
1498            schema_type: SchemaType::Avro,
1499            schema_str: format!(
1500                r#"{{"type":"record","name":"t{id}","fields":[{{"name":"x","type":"int"}}]}}"#
1501            ),
1502            arrow_schema: Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])),
1503            inserted_at: Instant::now(),
1504        }
1505    }
1506
1507    #[test]
1508    fn test_cache_config_defaults() {
1509        let config = SchemaRegistryCacheConfig::default();
1510        assert_eq!(config.max_entries, 1000);
1511        assert_eq!(config.ttl, Some(Duration::from_secs(3600)));
1512    }
1513
1514    #[test]
1515    fn test_cache_lru_eviction() {
1516        let config = SchemaRegistryCacheConfig {
1517            max_entries: 3,
1518            ttl: None,
1519        };
1520        let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1521
1522        // Insert 3 schemas.
1523        client.cache_insert(1, make_cached_schema(1));
1524        client.cache_insert(2, make_cached_schema(2));
1525        client.cache_insert(3, make_cached_schema(3));
1526        assert_eq!(client.cache_size(), 3);
1527
1528        // Insert a 4th — should evict one entry (foyer S3-FIFO eviction).
1529        client.cache_insert(4, make_cached_schema(4));
1530        assert!(client.cache_size() <= 3);
1531        // The most recently inserted should always be present.
1532        assert!(client.cache_get(4).is_some());
1533    }
1534
1535    #[test]
1536    fn test_cache_ttl_expiration() {
1537        let config = SchemaRegistryCacheConfig {
1538            max_entries: 100,
1539            ttl: Some(Duration::from_millis(50)),
1540        };
1541        let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1542
1543        client.cache_insert(1, make_cached_schema(1));
1544        assert!(client.cache_get(1).is_some());
1545
1546        // Wait for TTL to expire.
1547        std::thread::sleep(Duration::from_millis(60));
1548        // Lazy TTL: expired entry returns None on access.
1549        assert!(client.cache_get(1).is_none());
1550    }
1551
1552    #[test]
1553    fn test_cache_no_ttl() {
1554        let config = SchemaRegistryCacheConfig {
1555            max_entries: 100,
1556            ttl: None,
1557        };
1558        let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1559
1560        client.cache_insert(1, make_cached_schema(1));
1561        // No TTL — entry should stay.
1562        assert!(client.cache_get(1).is_some());
1563    }
1564
1565    #[test]
1566    fn test_cache_replace_existing_id() {
1567        let config = SchemaRegistryCacheConfig {
1568            max_entries: 10,
1569            ttl: None,
1570        };
1571        let client = SchemaRegistryClient::with_cache_config("http://localhost:8081", None, config);
1572
1573        client.cache_insert(1, make_cached_schema(1));
1574        client.cache_insert(2, make_cached_schema(2));
1575        assert_eq!(client.cache_size(), 2);
1576
1577        // Re-insert 1 with updated schema — should not increase size.
1578        client.cache_insert(1, make_cached_schema(1));
1579        assert_eq!(client.cache_size(), 2);
1580    }
1581
1582    #[test]
1583    fn test_schema_incompatible_error_via_serde() {
1584        let err = SerdeError::SchemaIncompatible {
1585            subject: "orders-value".into(),
1586            message: "READER_FIELD_MISSING_DEFAULT_VALUE: field 'new_field'".into(),
1587        };
1588        let conn_err: ConnectorError = err.into();
1589        assert!(matches!(
1590            conn_err,
1591            ConnectorError::Serde(SerdeError::SchemaIncompatible { .. })
1592        ));
1593        assert!(conn_err.to_string().contains("orders-value"));
1594    }
1595
1596    #[test]
1597    fn test_validate_and_register_method_exists() {
1598        // Verify the method exists and has the correct signature by referencing it.
1599        let client = SchemaRegistryClient::new("http://localhost:8081", None);
1600        // Just check the method is callable (we can't actually test without a registry).
1601        let _ = &client;
1602    }
1603
1604    #[test]
1605    fn test_complex_type_roundtrip() {
1606        let avro = r#"{
1607            "type": "record",
1608            "name": "test",
1609            "fields": [
1610                {"name": "tags", "type": {"type": "array", "items": "string"}},
1611                {"name": "metadata", "type": {"type": "map", "values": "long"}}
1612            ]
1613        }"#;
1614
1615        let arrow_schema = avro_to_arrow_schema(avro).unwrap();
1616        assert!(matches!(
1617            arrow_schema.field(0).data_type(),
1618            DataType::List(_)
1619        ));
1620        assert!(matches!(
1621            arrow_schema.field(1).data_type(),
1622            DataType::Map(_, _)
1623        ));
1624
1625        // Convert back to Avro
1626        let avro_str = arrow_to_avro_schema(&arrow_schema, "test").unwrap();
1627        let recovered = avro_to_arrow_schema(&avro_str).unwrap();
1628
1629        assert!(matches!(recovered.field(0).data_type(), DataType::List(_)));
1630        assert!(matches!(
1631            recovered.field(1).data_type(),
1632            DataType::Map(_, _)
1633        ));
1634    }
1635}