pub trait SourceConnector: Send {
Show 15 methods
// Required methods
fn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn poll_batch<'life0, 'async_trait>(
&'life0 mut self,
max_records: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn schema(&self) -> SchemaRef;
fn checkpoint(&self) -> SourceCheckpoint;
fn restore<'life0, 'life1, 'async_trait>(
&'life0 mut self,
checkpoint: &'life1 SourceCheckpoint,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn discover_schema<'life0, 'life1, 'async_trait>(
&'life0 mut self,
_properties: &'life1 HashMap<String, String>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn health_check(&self) -> HealthStatus { ... }
fn metrics(&self) -> ConnectorMetrics { ... }
fn data_ready_notify(&self) -> Option<Arc<Notify>> { ... }
fn as_schema_provider(&self) -> Option<&dyn SchemaProvider> { ... }
fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware> { ... }
fn supports_replay(&self) -> bool { ... }
fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> { ... }
fn notify_epoch_committed<'life0, 'async_trait>(
&'life0 mut self,
_epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
Trait for source connectors that read data from external systems.
Source connectors operate in Ring 1 and push data into Ring 0 via
the streaming Source<ArrowRecord>::push_arrow() API.
§Lifecycle
open()— initialize connection, discover schemapoll_batch()— read batches in a loopcheckpoint()/restore()— manage offsetsclose()— clean shutdown
Required Methods§
Sourcefn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Called once before polling begins.
Sourcefn poll_batch<'life0, 'async_trait>(
&'life0 mut self,
max_records: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn poll_batch<'life0, 'async_trait>(
&'life0 mut self,
max_records: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Ok(None) = no data currently available; runtime retries after a delay.
max_records is a hint — implementations may return fewer.
Sourcefn checkpoint(&self) -> SourceCheckpoint
fn checkpoint(&self) -> SourceCheckpoint
Returned checkpoint must contain enough info to resume from the current position after a restart.
Sourcefn restore<'life0, 'life1, 'async_trait>(
&'life0 mut self,
checkpoint: &'life1 SourceCheckpoint,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn restore<'life0, 'life1, 'async_trait>(
&'life0 mut self,
checkpoint: &'life1 SourceCheckpoint,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Called during recovery before polling resumes.
Provided Methods§
Sourcefn discover_schema<'life0, 'life1, 'async_trait>(
&'life0 mut self,
_properties: &'life1 HashMap<String, String>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn discover_schema<'life0, 'life1, 'async_trait>(
&'life0 mut self,
_properties: &'life1 HashMap<String, String>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Resolve the source schema from the WITH (...) properties before
DDL reaches the planner. Implementations that hit the network
(e.g. Kafka fetching an Avro schema from a Schema Registry) must
bound their I/O with a timeout and leave the schema empty on
failure rather than hang.
Sourcefn health_check(&self) -> HealthStatus
fn health_check(&self) -> HealthStatus
Defaults to Unknown; connectors should override.
Sourcefn metrics(&self) -> ConnectorMetrics
fn metrics(&self) -> ConnectorMetrics
Current connector metrics snapshot.
Sourcefn data_ready_notify(&self) -> Option<Arc<Notify>>
fn data_ready_notify(&self) -> Option<Arc<Notify>>
Returns a [Notify] handle that is signalled when new data is available.
When Some, the pipeline coordinator awaits the notification instead of
polling on a timer, eliminating idle CPU usage. Sources that receive data
asynchronously (WebSocket, CDC replication streams, Kafka) should return
Some and call notify.notify_one() when data arrives.
The default implementation returns None, which causes the pipeline to
fall back to timer-based polling (suitable for batch/file sources).
Sourcefn as_schema_provider(&self) -> Option<&dyn SchemaProvider>
fn as_schema_provider(&self) -> Option<&dyn SchemaProvider>
Returns this connector as a SchemaProvider, if supported.
Sourcefn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>
fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>
Returns this connector as a SchemaRegistryAware, if supported.
Sourcefn supports_replay(&self) -> bool
fn supports_replay(&self) -> bool
Whether this source supports replay from a checkpointed position.
Sources that return false (e.g., WebSocket, raw TCP) cannot seek
back to a previous offset on recovery. Checkpointing still captures
their state for best-effort recovery, but exactly-once semantics
are degraded to at-most-once for events from this source.
The default implementation returns true because most durable
sources (Kafka, CDC, files) support replay.
Sourcefn checkpoint_requested(&self) -> Option<Arc<AtomicBool>>
fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>>
Returns a shared flag that the source sets to true when it
requests an immediate checkpoint.
This is used by sources that detect external state changes requiring a checkpoint before proceeding — for example, Kafka consumer group rebalance (partition revocation). The pipeline coordinator polls this flag each cycle and clears it after triggering the checkpoint.
The default returns None (no source-initiated checkpoints).
Sourcefn notify_epoch_committed<'life0, 'async_trait>(
&'life0 mut self,
_epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn notify_epoch_committed<'life0, 'async_trait>(
&'life0 mut self,
_epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Acknowledge that epoch has been durably committed.
Called after the manifest is persisted and every exactly-once sink committed the epoch. Idempotent — a retry after cancellation is legal.
§Errors
Errors are logged; they do not roll back the committed epoch.