Skip to main content

SourceConnector

Trait SourceConnector 

Source
pub trait SourceConnector: Send {
Show 15 methods // Required methods fn open<'life0, 'life1, 'async_trait>( &'life0 mut self, config: &'life1 ConnectorConfig, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn poll_batch<'life0, 'async_trait>( &'life0 mut self, max_records: usize, ) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn schema(&self) -> SchemaRef; fn checkpoint(&self) -> SourceCheckpoint; fn restore<'life0, 'life1, 'async_trait>( &'life0 mut self, checkpoint: &'life1 SourceCheckpoint, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; // Provided methods fn discover_schema<'life0, 'life1, 'async_trait>( &'life0 mut self, _properties: &'life1 HashMap<String, String>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn health_check(&self) -> HealthStatus { ... } fn metrics(&self) -> ConnectorMetrics { ... } fn data_ready_notify(&self) -> Option<Arc<Notify>> { ... } fn as_schema_provider(&self) -> Option<&dyn SchemaProvider> { ... } fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware> { ... } fn supports_replay(&self) -> bool { ... } fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> { ... } fn notify_epoch_committed<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... }
}
Expand description

Trait for source connectors that read data from external systems.

Source connectors operate in Ring 1 and push data into Ring 0 via the streaming Source<ArrowRecord>::push_arrow() API.

§Lifecycle

  1. open() — initialize connection, discover schema
  2. poll_batch() — read batches in a loop
  3. checkpoint() / restore() — manage offsets
  4. close() — clean shutdown

Required Methods§

Source

fn open<'life0, 'life1, 'async_trait>( &'life0 mut self, config: &'life1 ConnectorConfig, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Called once before polling begins.

Source

fn poll_batch<'life0, 'async_trait>( &'life0 mut self, max_records: usize, ) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Ok(None) = no data currently available; runtime retries after a delay. max_records is a hint — implementations may return fewer.

Source

fn schema(&self) -> SchemaRef

Arrow schema of records this source produces.

Source

fn checkpoint(&self) -> SourceCheckpoint

Returned checkpoint must contain enough info to resume from the current position after a restart.

Source

fn restore<'life0, 'life1, 'async_trait>( &'life0 mut self, checkpoint: &'life1 SourceCheckpoint, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Called during recovery before polling resumes.

Source

fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Close the connection and release resources.

Provided Methods§

Source

fn discover_schema<'life0, 'life1, 'async_trait>( &'life0 mut self, _properties: &'life1 HashMap<String, String>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Resolve the source schema from the WITH (...) properties before DDL reaches the planner. Implementations that hit the network (e.g. Kafka fetching an Avro schema from a Schema Registry) must bound their I/O with a timeout and leave the schema empty on failure rather than hang.

Source

fn health_check(&self) -> HealthStatus

Defaults to Unknown; connectors should override.

Source

fn metrics(&self) -> ConnectorMetrics

Current connector metrics snapshot.

Source

fn data_ready_notify(&self) -> Option<Arc<Notify>>

Returns a [Notify] handle that is signalled when new data is available.

When Some, the pipeline coordinator awaits the notification instead of polling on a timer, eliminating idle CPU usage. Sources that receive data asynchronously (WebSocket, CDC replication streams, Kafka) should return Some and call notify.notify_one() when data arrives.

The default implementation returns None, which causes the pipeline to fall back to timer-based polling (suitable for batch/file sources).

Source

fn as_schema_provider(&self) -> Option<&dyn SchemaProvider>

Returns this connector as a SchemaProvider, if supported.

Source

fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>

Returns this connector as a SchemaRegistryAware, if supported.

Source

fn supports_replay(&self) -> bool

Whether this source supports replay from a checkpointed position.

Sources that return false (e.g., WebSocket, raw TCP) cannot seek back to a previous offset on recovery. Checkpointing still captures their state for best-effort recovery, but exactly-once semantics are degraded to at-most-once for events from this source.

The default implementation returns true because most durable sources (Kafka, CDC, files) support replay.

Source

fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>>

Returns a shared flag that the source sets to true when it requests an immediate checkpoint.

This is used by sources that detect external state changes requiring a checkpoint before proceeding — for example, Kafka consumer group rebalance (partition revocation). The pipeline coordinator polls this flag each cycle and clears it after triggering the checkpoint.

The default returns None (no source-initiated checkpoints).

Source

fn notify_epoch_committed<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Acknowledge that epoch has been durably committed.

Called after the manifest is persisted and every exactly-once sink committed the epoch. Idempotent — a retry after cancellation is legal.

§Errors

Errors are logged; they do not roll back the committed epoch.

Implementors§