Skip to main content

SourceConnector

Trait SourceConnector 

Source
pub trait SourceConnector: Send {
Show 15 methods // Required methods fn open<'life0, 'life1, 'async_trait>( &'life0 mut self, config: &'life1 ConnectorConfig, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn poll_batch<'life0, 'async_trait>( &'life0 mut self, max_records: usize, ) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn schema(&self) -> SchemaRef; fn checkpoint(&self) -> SourceCheckpoint; fn restore<'life0, 'life1, 'async_trait>( &'life0 mut self, checkpoint: &'life1 SourceCheckpoint, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; // Provided methods fn health_check(&self) -> HealthStatus { ... } fn metrics(&self) -> ConnectorMetrics { ... } fn data_ready_notify(&self) -> Option<Arc<Notify>> { ... } fn as_schema_provider(&self) -> Option<&dyn SchemaProvider> { ... } fn as_schema_inferable(&self) -> Option<&dyn SchemaInferable> { ... } fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware> { ... } fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable> { ... } fn supports_replay(&self) -> bool { ... } fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> { ... }
}
Expand description

Trait for source connectors that read data from external systems.

Source connectors operate in Ring 1 and push data into Ring 0 via the streaming Source<ArrowRecord>::push_arrow() API.

§Lifecycle

  1. open() - Initialize connection, discover schema
  2. poll_batch() - Read batches in a loop
  3. checkpoint() / restore() - Manage offsets
  4. close() - Clean shutdown

§Example

struct MySource { /* ... */ }

#[async_trait]
impl SourceConnector for MySource {
    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
        // Connect to external system
        Ok(())
    }

    async fn poll_batch(&mut self, max_records: usize) -> Result<Option<SourceBatch>, ConnectorError> {
        // Read up to max_records from external system
        Ok(None) // None = no data available yet
    }

    // ... other methods
}

Required Methods§

Source

fn open<'life0, 'life1, 'async_trait>( &'life0 mut self, config: &'life1 ConnectorConfig, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Opens the connector and initializes the connection.

Called once before any polling begins. The connector should establish connections, discover the schema, and prepare for reading.

§Errors

Returns ConnectorError if connection or initialization fails.

Source

fn poll_batch<'life0, 'async_trait>( &'life0 mut self, max_records: usize, ) -> Pin<Box<dyn Future<Output = Result<Option<SourceBatch>, ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Polls for the next batch of records.

Returns Ok(Some(batch)) when records are available, or Ok(None) when no data is currently available (the runtime will poll again after a delay).

The max_records parameter is a hint; implementations may return fewer.

§Errors

Returns ConnectorError on read failure.

Source

fn schema(&self) -> SchemaRef

Returns the schema of records produced by this source.

Source

fn checkpoint(&self) -> SourceCheckpoint

Creates a checkpoint of the current source position.

The returned checkpoint contains enough information to resume reading from this position after a restart.

Source

fn restore<'life0, 'life1, 'async_trait>( &'life0 mut self, checkpoint: &'life1 SourceCheckpoint, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Restores the source to a previously checkpointed position.

Called during recovery before polling resumes.

§Errors

Returns ConnectorError if the checkpoint is invalid or the seek operation fails.

Source

fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Closes the connector and releases all resources.

§Errors

Returns ConnectorError if cleanup fails.

Provided Methods§

Source

fn health_check(&self) -> HealthStatus

Returns the current health status of the connector.

Defaults to Unknown; connectors should override with actual status.

Source

fn metrics(&self) -> ConnectorMetrics

Returns current metrics from the connector.

Source

fn data_ready_notify(&self) -> Option<Arc<Notify>>

Returns a [Notify] handle that is signalled when new data is available.

When Some, the pipeline coordinator awaits the notification instead of polling on a timer, eliminating idle CPU usage. Sources that receive data asynchronously (WebSocket, CDC replication streams, Kafka) should return Some and call notify.notify_one() when data arrives.

The default implementation returns None, which causes the pipeline to fall back to timer-based polling (suitable for batch/file sources).

Source

fn as_schema_provider(&self) -> Option<&dyn SchemaProvider>

Returns this connector as a SchemaProvider, if supported.

Source

fn as_schema_inferable(&self) -> Option<&dyn SchemaInferable>

Returns this connector as a SchemaInferable, if supported.

Source

fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>

Returns this connector as a SchemaRegistryAware, if supported.

Source

fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable>

Returns this connector as a SchemaEvolvable, if supported.

Source

fn supports_replay(&self) -> bool

Whether this source supports replay from a checkpointed position.

Sources that return false (e.g., WebSocket, raw TCP) cannot seek back to a previous offset on recovery. Checkpointing still captures their state for best-effort recovery, but exactly-once semantics are degraded to at-most-once for events from this source.

The default implementation returns true because most durable sources (Kafka, CDC, files) support replay.

Source

fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>>

Returns a shared flag that the source sets to true when it requests an immediate checkpoint.

This is used by sources that detect external state changes requiring a checkpoint before proceeding — for example, Kafka consumer group rebalance (partition revocation). The pipeline coordinator polls this flag each cycle and clears it after triggering the checkpoint.

The default returns None (no source-initiated checkpoints).

Implementors§