pub struct PostgresSink { /* private fields */ }Expand description
PostgreSQL sink connector.
Writes Arrow RecordBatch to PostgreSQL tables using COPY BINARY
(append) or UNNEST-based upsert, with optional exactly-once semantics
via co-transactional epoch storage.
Implementations§
Source§impl PostgresSink
impl PostgresSink
Sourcepub fn new(schema: SchemaRef, config: PostgresSinkConfig) -> Self
pub fn new(schema: SchemaRef, config: PostgresSinkConfig) -> Self
Creates a new PostgreSQL sink connector.
Sourcepub fn state(&self) -> ConnectorState
pub fn state(&self) -> ConnectorState
Returns the current connector state.
Sourcepub fn current_epoch(&self) -> u64
pub fn current_epoch(&self) -> u64
Returns the current epoch.
Sourcepub fn last_committed_epoch(&self) -> u64
pub fn last_committed_epoch(&self) -> u64
Returns the last committed epoch.
Sourcepub fn buffered_rows(&self) -> usize
pub fn buffered_rows(&self) -> usize
Returns the number of buffered rows pending flush.
Sourcepub fn sink_metrics(&self) -> &PostgresSinkMetrics
pub fn sink_metrics(&self) -> &PostgresSinkMetrics
Returns a reference to the sink metrics.
Sourcepub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String
pub fn build_copy_sql(schema: &SchemaRef, config: &PostgresSinkConfig) -> String
Builds the COPY BINARY SQL statement.
COPY public.events (id, value, ts) FROM STDIN BINARYSourcepub fn build_upsert_sql(
schema: &SchemaRef,
config: &PostgresSinkConfig,
) -> String
pub fn build_upsert_sql( schema: &SchemaRef, config: &PostgresSinkConfig, ) -> String
Builds the UNNEST-based upsert SQL statement.
INSERT INTO public.target (id, value, updated_at)
SELECT * FROM UNNEST($1::int8[], $2::text[], $3::timestamptz[])
ON CONFLICT (id) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_atSourcepub fn build_delete_sql(
schema: &SchemaRef,
config: &PostgresSinkConfig,
) -> String
pub fn build_delete_sql( schema: &SchemaRef, config: &PostgresSinkConfig, ) -> String
Builds the DELETE SQL for changelog deletes.
DELETE FROM public.events WHERE id = ANY($1::int8[])Sourcepub fn build_create_table_sql(
schema: &SchemaRef,
config: &PostgresSinkConfig,
) -> String
pub fn build_create_table_sql( schema: &SchemaRef, config: &PostgresSinkConfig, ) -> String
Builds CREATE TABLE DDL from the Arrow schema.
CREATE TABLE IF NOT EXISTS public.events (
id BIGINT NOT NULL,
value TEXT,
ts TIMESTAMPTZ,
PRIMARY KEY (id)
)Sourcepub fn build_offset_table_sql() -> &'static str
pub fn build_offset_table_sql() -> &'static str
Builds CREATE TABLE DDL for the offset tracking table.
Sourcepub fn build_epoch_commit_sql() -> &'static str
pub fn build_epoch_commit_sql() -> &'static str
Builds the epoch commit SQL.
Sourcepub fn build_epoch_recover_sql() -> &'static str
pub fn build_epoch_recover_sql() -> &'static str
Builds the epoch recovery SQL.
Sourcepub fn split_changelog_batch(
batch: &RecordBatch,
) -> Result<(RecordBatch, RecordBatch), ConnectorError>
pub fn split_changelog_batch( batch: &RecordBatch, ) -> Result<(RecordBatch, RecordBatch), ConnectorError>
Splits a changelog RecordBatch into insert and delete batches.
Uses the _op metadata column:
"I"(insert),"U"(update-after),"r"(snapshot read) → insert batch"D"(delete) → delete batch
The returned batches exclude metadata columns (those starting with _).
§Errors
Returns ConnectorError::ConfigurationError if the _op column is
missing or not a string type.
Trait Implementations§
Source§impl Debug for PostgresSink
impl Debug for PostgresSink
Source§impl SinkConnector for PostgresSink
impl SinkConnector for PostgresSink
Source§fn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn open<'life0, 'life1, 'async_trait>(
&'life0 mut self,
config: &'life1 ConnectorConfig,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn write_batch<'life0, 'life1, 'async_trait>(
&'life0 mut self,
batch: &'life1 RecordBatch,
) -> Pin<Box<dyn Future<Output = Result<WriteResult, ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn write_batch<'life0, 'life1, 'async_trait>(
&'life0 mut self,
batch: &'life1 RecordBatch,
) -> Pin<Box<dyn Future<Output = Result<WriteResult, ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn begin_epoch<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn begin_epoch<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn pre_commit<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn pre_commit<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn commit_epoch<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn commit_epoch<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn rollback_epoch<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn rollback_epoch<'life0, 'async_trait>(
&'life0 mut self,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn health_check(&self) -> HealthStatus
fn health_check(&self) -> HealthStatus
Source§fn metrics(&self) -> ConnectorMetrics
fn metrics(&self) -> ConnectorMetrics
Source§fn capabilities(&self) -> SinkConnectorCapabilities
fn capabilities(&self) -> SinkConnectorCapabilities
Source§fn flush<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn flush<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), ConnectorError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>
fn as_schema_registry_aware(&self) -> Option<&dyn SchemaRegistryAware>
SchemaRegistryAware, if supported.Source§fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable>
fn as_schema_evolvable(&self) -> Option<&dyn SchemaEvolvable>
SchemaEvolvable, if supported.Auto Trait Implementations§
impl !Freeze for PostgresSink
impl !RefUnwindSafe for PostgresSink
impl Send for PostgresSink
impl Sync for PostgresSink
impl Unpin for PostgresSink
impl UnsafeUnpin for PostgresSink
impl !UnwindSafe for PostgresSink
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Pointee for T
impl<T> Pointee for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.