pub struct KafkaSourceConfig {Show 40 fields
pub bootstrap_servers: String,
pub group_id: String,
pub subscription: TopicSubscription,
pub security_protocol: SecurityProtocol,
pub sasl_mechanism: Option<SaslMechanism>,
pub sasl_username: Option<String>,
pub sasl_password: Option<String>,
pub ssl_ca_location: Option<String>,
pub ssl_certificate_location: Option<String>,
pub ssl_key_location: Option<String>,
pub ssl_key_password: Option<String>,
pub format: Format,
pub schema_registry_url: Option<String>,
pub schema_registry_auth: Option<SrAuth>,
pub schema_compatibility: Option<CompatibilityLevel>,
pub schema_registry_ssl_ca_location: Option<String>,
pub schema_registry_ssl_certificate_location: Option<String>,
pub schema_registry_ssl_key_location: Option<String>,
pub event_time_column: Option<String>,
pub include_metadata: bool,
pub include_headers: bool,
pub startup_mode: StartupMode,
pub auto_offset_reset: OffsetReset,
pub isolation_level: IsolationLevel,
pub max_poll_records: usize,
pub partition_assignment_strategy: AssignmentStrategy,
pub fetch_min_bytes: Option<i32>,
pub fetch_max_bytes: Option<i32>,
pub fetch_max_wait_ms: Option<i32>,
pub max_partition_fetch_bytes: Option<i32>,
pub max_out_of_orderness: Duration,
pub idle_timeout: Duration,
pub enable_watermark_tracking: bool,
pub alignment_group_id: Option<String>,
pub alignment_max_drift: Option<Duration>,
pub alignment_mode: Option<KafkaAlignmentMode>,
pub broker_commit_interval: Duration,
pub backpressure_high_watermark: f64,
pub backpressure_low_watermark: f64,
pub kafka_properties: HashMap<String, String>,
}Expand description
Kafka source connector configuration.
Uses a custom Debug impl that redacts sasl_password and
ssl_key_password to prevent credential leakage in logs.
Fields§
§bootstrap_servers: StringComma-separated list of broker addresses.
group_id: StringConsumer group identifier.
subscription: TopicSubscriptionTopic subscription (explicit list or regex pattern).
security_protocol: SecurityProtocolSecurity protocol for broker connections.
sasl_mechanism: Option<SaslMechanism>SASL authentication mechanism.
sasl_username: Option<String>SASL username (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
sasl_password: Option<String>SASL password (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
ssl_ca_location: Option<String>Path to SSL CA certificate file (PEM format).
ssl_certificate_location: Option<String>Path to client SSL certificate file (PEM format).
ssl_key_location: Option<String>Path to client SSL private key file (PEM format).
ssl_key_password: Option<String>Password for encrypted SSL private key.
format: FormatData format for deserialization.
schema_registry_url: Option<String>Confluent Schema Registry URL.
schema_registry_auth: Option<SrAuth>Schema Registry authentication credentials.
schema_compatibility: Option<CompatibilityLevel>Override compatibility level for the subject.
schema_registry_ssl_ca_location: Option<String>Schema Registry SSL CA certificate path.
schema_registry_ssl_certificate_location: Option<String>Schema Registry SSL client certificate path.
schema_registry_ssl_key_location: Option<String>Schema Registry SSL client key path.
event_time_column: Option<String>Column name containing the event timestamp.
include_metadata: boolWhether to include Kafka metadata columns (_partition, _offset, _timestamp).
include_headers: boolWhether to include Kafka headers as a map column (_headers).
startup_mode: StartupModeConsumer startup mode (controls initial offset positioning).
auto_offset_reset: OffsetResetWhere to start reading when no committed offset exists.
isolation_level: IsolationLevelConsumer transaction isolation level.
max_poll_records: usizeMaximum records per poll batch.
partition_assignment_strategy: AssignmentStrategyPartition assignment strategy.
fetch_min_bytes: Option<i32>Minimum bytes to return from a fetch (allows batching).
fetch_max_bytes: Option<i32>Maximum bytes to return from broker per request.
fetch_max_wait_ms: Option<i32>Maximum time broker waits for fetch.min.bytes.
max_partition_fetch_bytes: Option<i32>Maximum bytes per partition to return from broker.
max_out_of_orderness: DurationMaximum expected out-of-orderness for watermark generation.
idle_timeout: DurationTimeout before marking a partition as idle.
enable_watermark_tracking: boolEnable per-partition watermark tracking (integrates with watermark tracking).
alignment_group_id: Option<String>Alignment group ID for multi-source coordination (integrates with watermark tracking).
alignment_max_drift: Option<Duration>Maximum allowed drift between sources in alignment group.
alignment_mode: Option<KafkaAlignmentMode>Enforcement mode for watermark alignment.
broker_commit_interval: DurationInterval at which to asynchronously commit offsets to the Kafka broker.
This is advisory — it keeps kafka-consumer-groups lag monitoring
accurate. The authoritative offset state lives in LaminarDB’s
checkpoint system. Set to Duration::ZERO to disable periodic
broker commits (default: 60s).
backpressure_high_watermark: f64Channel fill ratio at which to pause consumption.
backpressure_low_watermark: f64Channel fill ratio at which to resume consumption.
kafka_properties: HashMap<String, String>Additional rdkafka properties passed directly to librdkafka.
Implementations§
Source§impl KafkaSourceConfig
impl KafkaSourceConfig
Sourcepub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError>
pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError>
Parses a KafkaSourceConfig from a ConnectorConfig.
§Errors
Returns ConnectorError if required fields are missing or values are invalid.
Sourcepub fn validate(&self) -> Result<(), ConnectorError>
pub fn validate(&self) -> Result<(), ConnectorError>
Validates the configuration.
§Errors
Returns ConnectorError::ConfigurationError if the configuration is invalid.
Sourcepub fn to_rdkafka_config(&self) -> ClientConfig
pub fn to_rdkafka_config(&self) -> ClientConfig
Builds an rdkafka [ClientConfig] from this configuration.
Trait Implementations§
Source§impl Clone for KafkaSourceConfig
impl Clone for KafkaSourceConfig
Source§fn clone(&self) -> KafkaSourceConfig
fn clone(&self) -> KafkaSourceConfig
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for KafkaSourceConfig
impl Debug for KafkaSourceConfig
Auto Trait Implementations§
impl Freeze for KafkaSourceConfig
impl RefUnwindSafe for KafkaSourceConfig
impl Send for KafkaSourceConfig
impl Sync for KafkaSourceConfig
impl Unpin for KafkaSourceConfig
impl UnsafeUnpin for KafkaSourceConfig
impl UnwindSafe for KafkaSourceConfig
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Pointee for T
impl<T> Pointee for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.