pub trait SourceConnector: Send {
Show 15 methods
// Required methods
fn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn poll_batch<'life0, 'async_trait>(
&'life0 mut self,
max_records: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn schema(&self) -> SchemaRef;
fn checkpoint(&self) -> SourceCheckpoint;
fn restore<'life0, 'life1, 'async_trait>(
&'life0 mut self,
checkpoint: &'life1 SourceCheckpoint,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn health_check(&self) -> HealthStatus { ... }
fn metrics(&self) -> ConnectorMetrics { ... }
fn data_ready_notify(&self) -> Option<Arc<Notify>> { ... }
fn as_schema_provider(&self) -> Option<&dyn SchemaProvider> { ... }
fn as_schema_inferable(&self) -> Option<&dyn SchemaInferable> { ... }
fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware> { ... }
fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable> { ... }
fn supports_replay(&self) -> bool { ... }
fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> { ... }
}Expand description
Trait for source connectors that read data from external systems.
Source connectors operate in Ring 1 and push data into Ring 0 via
the streaming Source<ArrowRecord>::push_arrow() API.
§Lifecycle
open()- Initialize connection, discover schemapoll_batch()- Read batches in a loopcheckpoint()/restore()- Manage offsetsclose()- Clean shutdown
§Example
struct MySource { /* ... */ }
#[async_trait]
impl SourceConnector for MySource {
async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
// Connect to external system
Ok(())
}
async fn poll_batch(&mut self, max_records: usize) -> Result<Option<SourceBatch>, ConnectorError> {
// Read up to max_records from external system
Ok(None) // None = no data available yet
}
// ... other methods
}Required Methods§
Sourcefn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Opens the connector and initializes the connection.
Called once before any polling begins. The connector should establish connections, discover the schema, and prepare for reading.
§Errors
Returns ConnectorError if connection or initialization fails.
Sourcefn poll_batch<'life0, 'async_trait>(
&'life0 mut self,
max_records: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn poll_batch<'life0, 'async_trait>(
&'life0 mut self,
max_records: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Polls for the next batch of records.
Returns Ok(Some(batch)) when records are available, or Ok(None) when
no data is currently available (the runtime will poll again after a delay).
The max_records parameter is a hint; implementations may return fewer.
§Errors
Returns ConnectorError on read failure.
Sourcefn checkpoint(&self) -> SourceCheckpoint
fn checkpoint(&self) -> SourceCheckpoint
Creates a checkpoint of the current source position.
The returned checkpoint contains enough information to resume reading from this position after a restart.
Sourcefn restore<'life0, 'life1, 'async_trait>(
&'life0 mut self,
checkpoint: &'life1 SourceCheckpoint,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn restore<'life0, 'life1, 'async_trait>(
&'life0 mut self,
checkpoint: &'life1 SourceCheckpoint,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Restores the source to a previously checkpointed position.
Called during recovery before polling resumes.
§Errors
Returns ConnectorError if the checkpoint is invalid or the
seek operation fails.
Provided Methods§
Sourcefn health_check(&self) -> HealthStatus
fn health_check(&self) -> HealthStatus
Returns the current health status of the connector.
Defaults to Unknown; connectors should override with actual status.
Sourcefn metrics(&self) -> ConnectorMetrics
fn metrics(&self) -> ConnectorMetrics
Returns current metrics from the connector.
Sourcefn data_ready_notify(&self) -> Option<Arc<Notify>>
fn data_ready_notify(&self) -> Option<Arc<Notify>>
Returns a [Notify] handle that is signalled when new data is available.
When Some, the pipeline coordinator awaits the notification instead of
polling on a timer, eliminating idle CPU usage. Sources that receive data
asynchronously (WebSocket, CDC replication streams, Kafka) should return
Some and call notify.notify_one() when data arrives.
The default implementation returns None, which causes the pipeline to
fall back to timer-based polling (suitable for batch/file sources).
Sourcefn as_schema_provider(&self) -> Option<&dyn SchemaProvider>
fn as_schema_provider(&self) -> Option<&dyn SchemaProvider>
Returns this connector as a SchemaProvider, if supported.
Sourcefn as_schema_inferable(&self) -> Option<&dyn SchemaInferable>
fn as_schema_inferable(&self) -> Option<&dyn SchemaInferable>
Returns this connector as a SchemaInferable, if supported.
Sourcefn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>
fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>
Returns this connector as a SchemaRegistryAware, if supported.
Sourcefn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable>
fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable>
Returns this connector as a SchemaEvolvable, if supported.
Sourcefn supports_replay(&self) -> bool
fn supports_replay(&self) -> bool
Whether this source supports replay from a checkpointed position.
Sources that return false (e.g., WebSocket, raw TCP) cannot seek
back to a previous offset on recovery. Checkpointing still captures
their state for best-effort recovery, but exactly-once semantics
are degraded to at-most-once for events from this source.
The default implementation returns true because most durable
sources (Kafka, CDC, files) support replay.
Sourcefn checkpoint_requested(&self) -> Option<Arc<AtomicBool>>
fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>>
Returns a shared flag that the source sets to true when it
requests an immediate checkpoint.
This is used by sources that detect external state changes requiring a checkpoint before proceeding — for example, Kafka consumer group rebalance (partition revocation). The pipeline coordinator polls this flag each cycle and clears it after triggering the checkpoint.
The default returns None (no source-initiated checkpoints).