Skip to main content

laminar_connectors/schema/
traits.rs

1//! Capability traits for the extensible connector framework.
2//!
3//! Six traits define the schema capabilities a connector can implement:
4//!
5//! | Trait | Purpose | Sync/Async |
6//! |-------|---------|------------|
7//! | [`FormatDecoder`] | Decode raw bytes → Arrow | Sync |
8//! | [`FormatEncoder`] | Encode Arrow → raw bytes | Sync |
9//! | [`SchemaProvider`] | Authoritative schema from source | Async |
10//! | [`SchemaInferable`] | Sample-based schema inference | Async |
11//! | [`SchemaRegistryAware`] | Schema registry integration | Async |
12//! | [`SchemaEvolvable`] | Schema evolution & diffing | Sync |
13//!
14//! Connectors opt in to capabilities by implementing the relevant traits
15//! and returning `Some(self)` from the corresponding `as_*()` method on
16//! [`SourceConnector`](crate::connector::SourceConnector) or
17//! [`SinkConnector`](crate::connector::SinkConnector).
18#![allow(clippy::disallowed_types)] // cold path: schema management
19
20use std::collections::HashMap;
21
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, SchemaRef};
24use async_trait::async_trait;
25
26use super::error::SchemaResult;
27use super::types::RawRecord;
28
29// ── FormatDecoder ──────────────────────────────────────────────────
30
31/// Decodes raw bytes into Arrow `RecordBatch`es.
32///
33/// Unlike [`RecordDeserializer`](crate::serde::RecordDeserializer) which
34/// takes `&[u8]` slices, `FormatDecoder` works with [`RawRecord`]s that
35/// carry metadata, headers, and timestamps alongside the payload.
36pub trait FormatDecoder: Send + Sync {
37    /// Returns the Arrow schema produced by this decoder.
38    fn output_schema(&self) -> SchemaRef;
39
40    /// Decodes a batch of raw records into an Arrow `RecordBatch`.
41    ///
42    /// # Errors
43    ///
44    /// Returns [`SchemaError::DecodeError`](super::error::SchemaError::DecodeError)
45    /// if the input cannot be parsed.
46    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch>;
47
48    /// Decodes a single raw record into an Arrow `RecordBatch` with one row.
49    ///
50    /// Default implementation delegates to [`decode_batch`](Self::decode_batch)
51    /// with a single-element slice.
52    ///
53    /// # Errors
54    ///
55    /// Returns [`SchemaError::DecodeError`](super::error::SchemaError::DecodeError)
56    /// if the input cannot be parsed.
57    fn decode_one(&self, record: &RawRecord) -> SchemaResult<RecordBatch> {
58        self.decode_batch(std::slice::from_ref(record))
59    }
60
61    /// Returns the name of the format this decoder handles (e.g., `"json"`).
62    fn format_name(&self) -> &str;
63}
64
65// ── FormatEncoder ──────────────────────────────────────────────────
66
67/// Encodes Arrow `RecordBatch`es into raw bytes.
68pub trait FormatEncoder: Send + Sync {
69    /// Returns the expected input schema.
70    fn input_schema(&self) -> SchemaRef;
71
72    /// Encodes a `RecordBatch` into a vector of byte records.
73    ///
74    /// Each element in the returned vector represents one serialized record.
75    ///
76    /// # Errors
77    ///
78    /// Returns a schema error if encoding fails.
79    fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>>;
80
81    /// Returns the name of the format this encoder produces (e.g., `"json"`).
82    fn format_name(&self) -> &str;
83}
84
85// ── SchemaProvider ─────────────────────────────────────────────────
86
87/// A connector that can provide its schema from the source system.
88///
89/// Examples: `PostgreSQL` CDC reading `information_schema`, a Kafka topic
90/// with an embedded Avro schema, or a file source with a header row.
91#[async_trait]
92pub trait SchemaProvider: Send + Sync {
93    /// Fetches the schema from the source system.
94    ///
95    /// # Errors
96    ///
97    /// Returns a schema error if the schema cannot be retrieved.
98    async fn provide_schema(&self) -> SchemaResult<SchemaRef>;
99
100    /// Returns `true` if this provider's schema is authoritative
101    /// (i.e., should take precedence over inference).
102    fn is_authoritative(&self) -> bool {
103        false
104    }
105
106    /// Returns per-field metadata for the provided schema.
107    ///
108    /// Default returns an empty map.
109    async fn field_metadata(&self) -> SchemaResult<HashMap<String, super::types::FieldMeta>> {
110        Ok(HashMap::new())
111    }
112}
113
114// ── SchemaInferable ────────────────────────────────────────────────
115
116/// A connector that supports sample-based schema inference.
117///
118/// The connector provides raw sample records, and the inference engine
119/// determines the Arrow schema from the data.
120#[async_trait]
121pub trait SchemaInferable: Send + Sync {
122    /// Reads sample records from the source for inference.
123    ///
124    /// The `max_records` parameter is a hint; implementations may return fewer.
125    ///
126    /// # Errors
127    ///
128    /// Returns a schema error if samples cannot be read.
129    async fn sample_records(&self, max_records: usize) -> SchemaResult<Vec<RawRecord>>;
130
131    /// Infers a schema from sample records.
132    ///
133    /// Default implementation delegates to the global
134    /// [`FormatInferenceRegistry`](super::inference::FormatInferenceRegistry).
135    ///
136    /// # Errors
137    ///
138    /// Returns a schema error if inference fails.
139    async fn infer_from_samples(
140        &self,
141        samples: &[RawRecord],
142        config: &InferenceConfig,
143    ) -> SchemaResult<InferredSchema> {
144        super::inference::default_infer_from_samples(samples, config)
145    }
146}
147
148// ── Inference types ────────────────────────────────────────────────
149
150/// Configuration for schema inference.
151#[derive(Debug, Clone)]
152pub struct InferenceConfig {
153    /// Data format to use for inference.
154    pub format: String,
155
156    /// How to handle number type inference.
157    pub number_inference: NumberInference,
158
159    /// How to handle array type inference.
160    pub array_inference: ArrayInference,
161
162    /// Maximum number of samples to use.
163    pub max_samples: usize,
164
165    /// Minimum confidence threshold (0.0–1.0) for accepting an inferred type.
166    pub min_confidence: f64,
167
168    /// Type hints for specific fields.
169    pub type_hints: HashMap<String, DataType>,
170
171    /// Whether to treat empty strings as nulls.
172    pub empty_as_null: bool,
173}
174
175impl Default for InferenceConfig {
176    fn default() -> Self {
177        Self {
178            format: "json".to_string(),
179            number_inference: NumberInference::PreferLarger,
180            array_inference: ArrayInference::Utf8,
181            max_samples: 1000,
182            min_confidence: 0.8,
183            type_hints: HashMap::new(),
184            empty_as_null: false,
185        }
186    }
187}
188
189impl InferenceConfig {
190    /// Creates a new inference config for the given format.
191    #[must_use]
192    pub fn new(format: impl Into<String>) -> Self {
193        Self {
194            format: format.into(),
195            ..Self::default()
196        }
197    }
198
199    /// Sets the minimum confidence threshold.
200    #[must_use]
201    pub fn with_min_confidence(mut self, confidence: f64) -> Self {
202        self.min_confidence = confidence;
203        self
204    }
205
206    /// Sets the maximum number of samples.
207    #[must_use]
208    pub fn with_max_samples(mut self, n: usize) -> Self {
209        self.max_samples = n;
210        self
211    }
212
213    /// Adds a type hint for a specific field.
214    #[must_use]
215    pub fn with_type_hint(mut self, field: impl Into<String>, data_type: DataType) -> Self {
216        self.type_hints.insert(field.into(), data_type);
217        self
218    }
219
220    /// Enables treating empty strings as nulls.
221    #[must_use]
222    pub fn with_empty_as_null(mut self) -> Self {
223        self.empty_as_null = true;
224        self
225    }
226}
227
228/// How to infer numeric types.
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub enum NumberInference {
231    /// Prefer the smallest type that fits (i32 before i64).
232    PreferSmallest,
233    /// Prefer larger types (always i64, always f64).
234    PreferLarger,
235}
236
237/// How to infer array/object types in JSON.
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum ArrayInference {
240    /// Store arrays/objects as JSON-encoded Utf8 strings.
241    Utf8,
242    /// Attempt to infer Arrow List / Struct types.
243    NativeArrow,
244}
245
246/// The result of schema inference.
247#[derive(Debug, Clone)]
248pub struct InferredSchema {
249    /// The inferred Arrow schema.
250    pub schema: SchemaRef,
251
252    /// Overall confidence score (0.0–1.0).
253    pub confidence: f64,
254
255    /// Number of samples that were analyzed.
256    pub sample_count: usize,
257
258    /// Per-field inference details.
259    pub field_details: Vec<FieldInferenceDetail>,
260
261    /// Warnings generated during inference.
262    pub warnings: Vec<InferenceWarning>,
263}
264
265/// Per-field detail from inference.
266#[derive(Debug, Clone)]
267pub struct FieldInferenceDetail {
268    /// The field name.
269    pub field_name: String,
270
271    /// The inferred Arrow data type.
272    pub inferred_type: DataType,
273
274    /// Confidence for this specific field (0.0–1.0).
275    pub confidence: f64,
276
277    /// Number of non-null samples seen for this field.
278    pub non_null_count: usize,
279
280    /// Total number of samples that included this field.
281    pub total_count: usize,
282
283    /// Whether a type hint was applied.
284    pub hint_applied: bool,
285}
286
287/// A warning generated during inference.
288#[derive(Debug, Clone)]
289pub struct InferenceWarning {
290    /// The field this warning relates to, if any.
291    pub field: Option<String>,
292
293    /// Warning message.
294    pub message: String,
295
296    /// Severity level.
297    pub severity: WarningSeverity,
298}
299
300/// Severity level for inference warnings.
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302pub enum WarningSeverity {
303    /// Informational — inference succeeded but with caveats.
304    Info,
305    /// Warning — inference may be inaccurate.
306    Warning,
307    /// Error — inference for this field failed; a fallback type was used.
308    Error,
309}
310
311// ── SchemaRegistryAware ────────────────────────────────────────────
312
313/// A connector that integrates with a schema registry.
314///
315/// Supports fetching, registering, and checking compatibility of schemas
316/// against an external registry (e.g., Confluent Schema Registry).
317#[async_trait]
318pub trait SchemaRegistryAware: Send + Sync {
319    /// Fetches the latest schema for a subject.
320    ///
321    /// # Errors
322    ///
323    /// Returns [`SchemaError::RegistryError`](super::error::SchemaError::RegistryError)
324    /// if the registry is unreachable or the subject is not found.
325    async fn fetch_schema(&self, subject: &str) -> SchemaResult<RegisteredSchema>;
326
327    /// Fetches a schema by its numeric ID.
328    ///
329    /// # Errors
330    ///
331    /// Returns a schema error if the ID is not found.
332    async fn fetch_schema_by_id(&self, schema_id: i32) -> SchemaResult<RegisteredSchema>;
333
334    /// Checks whether `proposed` is compatible with the existing schema
335    /// for `subject` under the configured compatibility mode.
336    ///
337    /// # Errors
338    ///
339    /// Returns a schema error if the compatibility check itself fails.
340    async fn check_compatibility(&self, subject: &str, proposed: &SchemaRef) -> SchemaResult<bool>;
341
342    /// Registers a new schema version for a subject.
343    ///
344    /// # Errors
345    ///
346    /// Returns a schema error if registration fails.
347    async fn register_schema(
348        &self,
349        subject: &str,
350        schema: &SchemaRef,
351    ) -> SchemaResult<RegisteredSchema>;
352
353    /// Builds a [`FormatDecoder`] configured for the registry schema.
354    ///
355    /// This is a sync method because the decoder construction itself
356    /// is not async — the registry lookup should happen beforehand.
357    ///
358    /// # Errors
359    ///
360    /// Returns a schema error if the decoder cannot be constructed
361    /// (e.g., unsupported schema type).
362    fn build_registry_decoder(
363        &self,
364        schema: &RegisteredSchema,
365    ) -> SchemaResult<Box<dyn FormatDecoder>>;
366}
367
368/// Configuration for connecting to a schema registry.
369#[derive(Debug, Clone)]
370pub struct RegistryConfig {
371    /// Registry URL.
372    pub url: String,
373
374    /// Schema type/format used by the registry.
375    pub schema_type: RegistrySchemaType,
376
377    /// Compatibility mode.
378    pub compatibility: CompatibilityMode,
379
380    /// Optional credentials.
381    pub credentials: Option<RegistryCredentials>,
382}
383
384/// Schema type stored in the registry.
385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
386pub enum RegistrySchemaType {
387    /// Apache Avro.
388    Avro,
389    /// JSON Schema.
390    JsonSchema,
391    /// Protocol Buffers.
392    Protobuf,
393}
394
395/// Compatibility mode for schema evolution.
396#[derive(Debug, Clone, Copy, PartialEq, Eq)]
397pub enum CompatibilityMode {
398    /// No compatibility checks.
399    None,
400    /// New schema can read old data.
401    Backward,
402    /// Old schema can read new data.
403    Forward,
404    /// Both backward and forward compatible.
405    Full,
406    /// Backward compatible with all previous versions.
407    BackwardTransitive,
408    /// Forward compatible with all previous versions.
409    ForwardTransitive,
410    /// Fully compatible with all previous versions.
411    FullTransitive,
412}
413
414/// Credentials for schema registry authentication.
415#[derive(Debug, Clone)]
416pub struct RegistryCredentials {
417    /// Username or API key.
418    pub username: String,
419    /// Password or API secret.
420    pub password: String,
421}
422
423/// A schema registered in a schema registry.
424#[derive(Debug, Clone)]
425pub struct RegisteredSchema {
426    /// Numeric schema ID assigned by the registry.
427    pub id: i32,
428
429    /// Schema version within its subject.
430    pub version: i32,
431
432    /// The subject this schema belongs to.
433    pub subject: String,
434
435    /// The Arrow schema.
436    pub schema: SchemaRef,
437
438    /// The schema type in the registry.
439    pub schema_type: RegistrySchemaType,
440}
441
442// ── SchemaEvolvable ────────────────────────────────────────────────
443
444/// A connector that supports schema evolution.
445///
446/// Provides diffing, evaluation, and application of schema changes.
447pub trait SchemaEvolvable: Send + Sync {
448    /// Computes the differences between two schemas.
449    fn diff_schemas(&self, old: &SchemaRef, new: &SchemaRef) -> Vec<SchemaChange>;
450
451    /// Evaluates whether a set of schema changes is acceptable.
452    fn evaluate_evolution(&self, changes: &[SchemaChange]) -> EvolutionVerdict;
453
454    /// Applies schema changes, returning a column projection that maps
455    /// old data to the new schema.
456    ///
457    /// # Errors
458    ///
459    /// Returns a schema error if the evolution cannot be applied.
460    fn apply_evolution(
461        &self,
462        old: &SchemaRef,
463        changes: &[SchemaChange],
464    ) -> SchemaResult<ColumnProjection>;
465
466    /// Returns the current schema version, if tracked.
467    fn schema_version(&self) -> Option<u32> {
468        None
469    }
470}
471
472/// A single schema change detected by [`SchemaEvolvable::diff_schemas`].
473#[derive(Debug, Clone, PartialEq)]
474pub enum SchemaChange {
475    /// A new column was added.
476    ColumnAdded {
477        /// Column name.
478        name: String,
479        /// The new column's data type.
480        data_type: DataType,
481        /// Whether the column is nullable.
482        nullable: bool,
483    },
484
485    /// An existing column was removed.
486    ColumnRemoved {
487        /// Column name.
488        name: String,
489    },
490
491    /// A column's data type changed.
492    TypeChanged {
493        /// Column name.
494        name: String,
495        /// Previous data type.
496        old_type: DataType,
497        /// New data type.
498        new_type: DataType,
499    },
500
501    /// A column's nullability changed.
502    NullabilityChanged {
503        /// Column name.
504        name: String,
505        /// Previous nullable flag.
506        was_nullable: bool,
507        /// New nullable flag.
508        now_nullable: bool,
509    },
510
511    /// A column was renamed.
512    ColumnRenamed {
513        /// Previous name.
514        old_name: String,
515        /// New name.
516        new_name: String,
517    },
518}
519
520/// The result of evaluating a set of schema changes.
521#[derive(Debug, Clone, PartialEq, Eq)]
522pub enum EvolutionVerdict {
523    /// All changes are compatible — evolution can proceed.
524    Compatible,
525
526    /// Changes require data migration but are feasible.
527    RequiresMigration,
528
529    /// Changes are incompatible — evolution is rejected.
530    Incompatible(String),
531}
532
533/// Describes how to project columns from the old schema to the new schema.
534#[derive(Debug, Clone)]
535pub struct ColumnProjection {
536    /// For each column in the new schema, the index in the old schema
537    /// (or `None` if the column is newly added and should be filled
538    /// with the default/null).
539    pub mappings: Vec<Option<usize>>,
540
541    /// The resulting schema after projection.
542    pub target_schema: SchemaRef,
543}
544
545// ── Connector config schema (self-describing connectors) ───────────
546
547/// Self-describing connector configuration schema.
548///
549/// Allows connectors to declare the configuration options they accept,
550/// enabling UI generation and validation.
551#[derive(Debug, Clone)]
552pub struct ConnectorConfigSchema {
553    /// The connector type name.
554    pub connector_type: String,
555
556    /// Configuration options.
557    pub options: Vec<ConfigOption>,
558}
559
560/// A single configuration option in a connector's config schema.
561#[derive(Debug, Clone)]
562pub struct ConfigOption {
563    /// The option key.
564    pub key: String,
565
566    /// Human-readable description.
567    pub description: String,
568
569    /// Whether this option is required.
570    pub required: bool,
571
572    /// The expected value type.
573    pub value_type: ConfigValueType,
574
575    /// Default value, if any.
576    pub default: Option<String>,
577}
578
579/// The expected type of a configuration value.
580#[derive(Debug, Clone, Copy, PartialEq, Eq)]
581pub enum ConfigValueType {
582    /// A string value.
583    String,
584    /// An integer value.
585    Integer,
586    /// A floating-point value.
587    Float,
588    /// A boolean value.
589    Boolean,
590    /// A duration (e.g., `"30s"`, `"5m"`).
591    Duration,
592    /// A URL.
593    Url,
594    /// A file path.
595    Path,
596}
597
598// ── Object-safety assertions ───────────────────────────────────────
599
600// Compile-time checks that all six traits are object-safe.
601const _: () = {
602    fn _assert_format_decoder_object_safe(_: &dyn FormatDecoder) {}
603    fn _assert_format_encoder_object_safe(_: &dyn FormatEncoder) {}
604    fn _assert_schema_provider_object_safe(_: &dyn SchemaProvider) {}
605    fn _assert_schema_inferable_object_safe(_: &dyn SchemaInferable) {}
606    fn _assert_schema_registry_aware_object_safe(_: &dyn SchemaRegistryAware) {}
607    fn _assert_schema_evolvable_object_safe(_: &dyn SchemaEvolvable) {}
608};
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613    use std::sync::Arc;
614
615    use arrow_schema::{Field, Schema};
616
617    #[test]
618    fn test_inference_config_defaults() {
619        let cfg = InferenceConfig::default();
620        assert_eq!(cfg.format, "json");
621        assert_eq!(cfg.max_samples, 1000);
622        assert!((cfg.min_confidence - 0.8).abs() < f64::EPSILON);
623        assert!(cfg.type_hints.is_empty());
624    }
625
626    #[test]
627    fn test_inference_config_builder() {
628        let cfg = InferenceConfig::new("csv")
629            .with_min_confidence(0.9)
630            .with_max_samples(500)
631            .with_type_hint("id", DataType::Int32)
632            .with_empty_as_null();
633
634        assert_eq!(cfg.format, "csv");
635        assert!((cfg.min_confidence - 0.9).abs() < f64::EPSILON);
636        assert_eq!(cfg.max_samples, 500);
637        assert_eq!(cfg.type_hints.get("id"), Some(&DataType::Int32));
638        assert!(cfg.empty_as_null);
639    }
640
641    #[test]
642    fn test_inferred_schema() {
643        let schema = Arc::new(Schema::new(vec![
644            Field::new("id", DataType::Int64, false),
645            Field::new("name", DataType::Utf8, true),
646        ]));
647
648        let inferred = InferredSchema {
649            schema: schema.clone(),
650            confidence: 0.95,
651            sample_count: 100,
652            field_details: vec![
653                FieldInferenceDetail {
654                    field_name: "id".into(),
655                    inferred_type: DataType::Int64,
656                    confidence: 1.0,
657                    non_null_count: 100,
658                    total_count: 100,
659                    hint_applied: false,
660                },
661                FieldInferenceDetail {
662                    field_name: "name".into(),
663                    inferred_type: DataType::Utf8,
664                    confidence: 0.9,
665                    non_null_count: 90,
666                    total_count: 100,
667                    hint_applied: false,
668                },
669            ],
670            warnings: vec![],
671        };
672
673        assert_eq!(inferred.schema.fields().len(), 2);
674        assert!((inferred.confidence - 0.95).abs() < f64::EPSILON);
675        assert_eq!(inferred.field_details.len(), 2);
676    }
677
678    #[test]
679    fn test_schema_change_variants() {
680        let changes = [
681            SchemaChange::ColumnAdded {
682                name: "email".into(),
683                data_type: DataType::Utf8,
684                nullable: true,
685            },
686            SchemaChange::ColumnRemoved {
687                name: "legacy".into(),
688            },
689            SchemaChange::TypeChanged {
690                name: "age".into(),
691                old_type: DataType::Int32,
692                new_type: DataType::Int64,
693            },
694            SchemaChange::NullabilityChanged {
695                name: "name".into(),
696                was_nullable: false,
697                now_nullable: true,
698            },
699            SchemaChange::ColumnRenamed {
700                old_name: "fname".into(),
701                new_name: "first_name".into(),
702            },
703        ];
704        assert_eq!(changes.len(), 5);
705    }
706
707    #[test]
708    fn test_evolution_verdict() {
709        assert_eq!(EvolutionVerdict::Compatible, EvolutionVerdict::Compatible);
710        assert_ne!(
711            EvolutionVerdict::Compatible,
712            EvolutionVerdict::RequiresMigration
713        );
714    }
715
716    #[test]
717    fn test_column_projection() {
718        let schema = Arc::new(Schema::new(vec![
719            Field::new("a", DataType::Int64, false),
720            Field::new("b", DataType::Utf8, true),
721            Field::new("c", DataType::Float64, false),
722        ]));
723
724        let proj = ColumnProjection {
725            mappings: vec![Some(0), None, Some(1)],
726            target_schema: schema,
727        };
728
729        assert_eq!(proj.mappings.len(), 3);
730        assert_eq!(proj.mappings[0], Some(0));
731        assert_eq!(proj.mappings[1], None); // new column
732        assert_eq!(proj.mappings[2], Some(1));
733    }
734
735    #[test]
736    fn test_warning_severity() {
737        let w = InferenceWarning {
738            field: Some("price".into()),
739            message: "mixed int/float".into(),
740            severity: WarningSeverity::Warning,
741        };
742        assert_eq!(w.severity, WarningSeverity::Warning);
743        assert_eq!(w.field.as_deref(), Some("price"));
744    }
745
746    #[test]
747    fn test_registry_config() {
748        let cfg = RegistryConfig {
749            url: "http://localhost:8081".into(),
750            schema_type: RegistrySchemaType::Avro,
751            compatibility: CompatibilityMode::Backward,
752            credentials: Some(RegistryCredentials {
753                username: "user".into(),
754                password: "pass".into(),
755            }),
756        };
757        assert_eq!(cfg.schema_type, RegistrySchemaType::Avro);
758        assert_eq!(cfg.compatibility, CompatibilityMode::Backward);
759        assert!(cfg.credentials.is_some());
760    }
761
762    #[test]
763    fn test_connector_config_schema() {
764        let schema = ConnectorConfigSchema {
765            connector_type: "kafka".into(),
766            options: vec![
767                ConfigOption {
768                    key: "bootstrap.servers".into(),
769                    description: "Kafka broker addresses".into(),
770                    required: true,
771                    value_type: ConfigValueType::String,
772                    default: None,
773                },
774                ConfigOption {
775                    key: "batch.size".into(),
776                    description: "Batch size".into(),
777                    required: false,
778                    value_type: ConfigValueType::Integer,
779                    default: Some("1000".into()),
780                },
781            ],
782        };
783        assert_eq!(schema.options.len(), 2);
784        assert!(schema.options[0].required);
785        assert!(!schema.options[1].required);
786    }
787}