Skip to main content

SinkConnector

Trait SinkConnector 

Source
pub trait SinkConnector: Send {
Show 14 methods // Required methods fn open<'life0, 'life1, 'async_trait>( &'life0 mut self, config: &'life1 ConnectorConfig, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn write_batch<'life0, 'life1, 'async_trait>( &'life0 mut self, batch: &'life1 RecordBatch, ) -> Pin<Box<dyn Future<Output = Result<WriteResult, ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn schema(&self) -> SchemaRef; fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; // Provided methods fn begin_epoch<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn pre_commit<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn commit_epoch<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn rollback_epoch<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn health_check(&self) -> HealthStatus { ... } fn metrics(&self) -> ConnectorMetrics { ... } fn capabilities(&self) -> SinkConnectorCapabilities { ... } fn flush<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware> { ... } fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable> { ... }
}
Expand description

Trait for sink connectors that write data to external systems.

Sink connectors operate in Ring 1, receiving data from Ring 0 via subscriptions and writing to external systems.

§Exactly-Once Support

Sinks that support exactly-once semantics implement the epoch-based methods (begin_epoch, commit_epoch, rollback_epoch). The runtime calls these in coordination with the checkpoint manager.

§Lifecycle

  1. open() - Initialize connection
  2. For each epoch: a. begin_epoch() - Start transaction b. write_batch() - Write records (may be called multiple times) c. commit_epoch() - Commit transaction
  3. close() - Clean shutdown

Required Methods§

Source

fn open<'life0, 'life1, 'async_trait>( &'life0 mut self, config: &'life1 ConnectorConfig, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Opens the connector and initializes the connection.

§Errors

Returns ConnectorError if connection or initialization fails.

Source

fn write_batch<'life0, 'life1, 'async_trait>( &'life0 mut self, batch: &'life1 RecordBatch, ) -> Pin<Box<dyn Future<Output = Result<WriteResult, ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Writes a batch of records to the external system.

§Errors

Returns ConnectorError on write failure.

Source

fn schema(&self) -> SchemaRef

Returns the expected input schema for this sink.

Source

fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Closes the connector and releases all resources.

§Errors

Returns ConnectorError if cleanup fails.

Provided Methods§

Source

fn begin_epoch<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Begins a new epoch for exactly-once processing.

Called by the runtime when a new checkpoint epoch starts. Default implementation does nothing (at-least-once semantics).

§Errors

Returns ConnectorError if the epoch cannot be started.

Source

fn pre_commit<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Pre-commits the current epoch (phase 1 of two-phase commit).

Called after all writes for this epoch are complete but before the checkpoint manifest is persisted. The sink should flush any buffered data and prepare for commit, but must NOT finalize the transaction.

The protocol is:

  1. pre_commit(epoch) — flush/prepare (this method)
  2. Manifest persisted to disk
  3. commit_epoch(epoch) — finalize transaction
  4. On failure: rollback_epoch(epoch)

Default implementation delegates to flush().

§Errors

Returns ConnectorError if the pre-commit fails.

Source

fn commit_epoch<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Commits the current epoch (phase 2 of two-phase commit).

Called by the runtime after the checkpoint manifest is successfully persisted. The sink should finalize any pending transactions. Default implementation does nothing (at-least-once semantics).

§Errors

Returns ConnectorError if the commit fails.

Source

fn rollback_epoch<'life0, 'async_trait>( &'life0 mut self, _epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Rolls back the current epoch.

Called by the runtime when a checkpoint fails. Default implementation does nothing.

§Errors

Returns ConnectorError if the rollback fails.

Source

fn health_check(&self) -> HealthStatus

Returns the current health status of the connector.

Defaults to Unknown; connectors should override with actual status.

Source

fn metrics(&self) -> ConnectorMetrics

Returns current metrics from the connector.

Source

fn capabilities(&self) -> SinkConnectorCapabilities

Returns the capabilities of this sink connector.

Source

fn flush<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Flushes any buffered data to the external system.

§Errors

Returns ConnectorError if the flush fails.

Source

fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>

Returns this connector as a SchemaRegistryAware, if supported.

Source

fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable>

Returns this connector as a SchemaEvolvable, if supported.

Implementors§