Skip to main content

laminar_connectors/
connector.rs

1//! Connector traits — async `SourceConnector` / `SinkConnector`.
2
3use std::fmt;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use tokio::sync::Notify;
11
12use crate::checkpoint::SourceCheckpoint;
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15
16/// Delivery guarantee level for the pipeline.
17///
18/// Configures the expected end-to-end delivery semantics. The pipeline
19/// validates at startup that all sources and sinks meet the requirements
20/// for the chosen guarantee level.
21#[derive(
22    Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
23)]
24pub enum DeliveryGuarantee {
25    /// At-least-once: records may be replayed on recovery. Requires
26    /// checkpointing but tolerates non-replayable sources (with degradation).
27    #[default]
28    AtLeastOnce,
29    /// Exactly-once: no duplicates or losses. Requires all sources to
30    /// support replay, all sinks to support exactly-once, and checkpoint
31    /// to be enabled.
32    ExactlyOnce,
33}
34
35impl std::fmt::Display for DeliveryGuarantee {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            DeliveryGuarantee::AtLeastOnce => write!(f, "at-least-once"),
39            DeliveryGuarantee::ExactlyOnce => write!(f, "exactly-once"),
40        }
41    }
42}
43
44impl FromStr for DeliveryGuarantee {
45    type Err = String;
46
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        match s.to_lowercase().replace('-', "_").as_str() {
49            "at_least_once" | "atleastonce" => Ok(Self::AtLeastOnce),
50            "exactly_once" | "exactlyonce" => Ok(Self::ExactlyOnce),
51            other => Err(format!("unknown delivery guarantee: '{other}'")),
52        }
53    }
54}
55
56/// SSL connection mode for `PostgreSQL`-compatible connectors.
57///
58/// Shared by the `PostgreSQL` sink and `PostgreSQL` CDC source. Variant names
59/// follow the `libpq` `sslmode` parameter.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub enum PostgresSslMode {
62    /// No SSL.
63    Disable,
64    /// Try SSL, fall back to unencrypted.
65    #[default]
66    Prefer,
67    /// Require SSL.
68    Require,
69    /// Require SSL and verify CA certificate.
70    VerifyCa,
71    /// Require SSL, verify certificate and hostname.
72    VerifyFull,
73}
74
75impl std::fmt::Display for PostgresSslMode {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            Self::Disable => write!(f, "disable"),
79            Self::Prefer => write!(f, "prefer"),
80            Self::Require => write!(f, "require"),
81            Self::VerifyCa => write!(f, "verify-ca"),
82            Self::VerifyFull => write!(f, "verify-full"),
83        }
84    }
85}
86
87impl FromStr for PostgresSslMode {
88    type Err = String;
89
90    fn from_str(s: &str) -> Result<Self, Self::Err> {
91        match s.to_lowercase().replace('-', "_").as_str() {
92            "disable" | "off" => Ok(Self::Disable),
93            "prefer" => Ok(Self::Prefer),
94            "require" => Ok(Self::Require),
95            "verify_ca" | "verifyca" => Ok(Self::VerifyCa),
96            "verify_full" | "verifyfull" => Ok(Self::VerifyFull),
97            other => Err(format!("unknown SSL mode: '{other}'")),
98        }
99    }
100}
101
102/// A batch of records read from a source connector.
103#[derive(Debug, Clone)]
104pub struct SourceBatch {
105    /// Arrow batch carrying the records.
106    pub records: RecordBatch,
107    /// The partition this batch came from, if the source is partitioned.
108    pub partition: Option<PartitionInfo>,
109}
110
111impl SourceBatch {
112    /// Construct without partition metadata.
113    #[must_use]
114    pub fn new(records: RecordBatch) -> Self {
115        Self {
116            records,
117            partition: None,
118        }
119    }
120
121    /// Construct with partition metadata attached.
122    #[must_use]
123    pub fn with_partition(records: RecordBatch, partition: PartitionInfo) -> Self {
124        Self {
125            records,
126            partition: Some(partition),
127        }
128    }
129
130    /// Record count in the batch.
131    #[must_use]
132    pub fn num_rows(&self) -> usize {
133        self.records.num_rows()
134    }
135}
136
137/// Source partition identity + current offset (Kafka partition number,
138/// CDC slot name, etc.).
139#[derive(Debug, Clone, PartialEq, Eq, Hash)]
140pub struct PartitionInfo {
141    /// Partition id — free-form string (Kafka partition number as string,
142    /// CDC slot name, file path, …).
143    pub id: String,
144    /// Current offset — interpretation is connector-specific (Kafka offset
145    /// as string, CDC LSN, etc.).
146    pub offset: String,
147}
148
149impl PartitionInfo {
150    /// Construct from id/offset strings or anything that converts.
151    #[must_use]
152    pub fn new(id: impl Into<String>, offset: impl Into<String>) -> Self {
153        Self {
154            id: id.into(),
155            offset: offset.into(),
156        }
157    }
158}
159
160impl fmt::Display for PartitionInfo {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        write!(f, "{}@{}", self.id, self.offset)
163    }
164}
165
166/// Summary of a successful `write_batch` call.
167#[derive(Debug, Clone)]
168pub struct WriteResult {
169    /// Records accepted by the sink.
170    pub records_written: usize,
171    /// Bytes written to the underlying transport (may be estimated).
172    pub bytes_written: u64,
173}
174
175impl WriteResult {
176    /// Construct with raw counts.
177    #[must_use]
178    pub fn new(records_written: usize, bytes_written: u64) -> Self {
179        Self {
180            records_written,
181            bytes_written,
182        }
183    }
184}
185
186/// Capabilities declared by a sink connector.
187#[derive(Debug, Clone)]
188#[allow(clippy::struct_excessive_bools)]
189pub struct SinkConnectorCapabilities {
190    /// Whether the sink supports exactly-once semantics via epochs.
191    pub exactly_once: bool,
192
193    /// Whether the sink supports idempotent writes.
194    pub idempotent: bool,
195
196    /// Whether the sink supports upsert (insert-or-update) writes.
197    pub upsert: bool,
198
199    /// Whether the sink can handle changelog/retraction records.
200    pub changelog: bool,
201
202    /// Whether the sink supports two-phase commit (pre-commit + commit).
203    pub two_phase_commit: bool,
204
205    /// Whether the sink supports schema evolution.
206    pub schema_evolution: bool,
207
208    /// Whether the sink supports partitioned writes.
209    pub partitioned: bool,
210
211    /// Whether output pending in an abandoned epoch survives into the
212    /// next epoch's commit WITHOUT a connector rollback (e.g. a Kafka
213    /// transactional producer's single open transaction). When set, a
214    /// live coordination failure skips the connector rollback so the
215    /// pending rows are not discarded — sources only rewind on
216    /// restart, never on a live abort. When unset, live failures roll
217    /// the connector back.
218    pub preserves_pending_on_abandon: bool,
219
220    /// Default per-call `write_batch` I/O timeout. Users can override via
221    /// the `sink.write.timeout.ms` connector property.
222    pub suggested_write_timeout: std::time::Duration,
223}
224
225impl SinkConnectorCapabilities {
226    /// All booleans default to `false`; flip via `with_*` below or by
227    /// assigning the fields directly (they're `pub`).
228    #[must_use]
229    pub fn new(suggested_write_timeout: std::time::Duration) -> Self {
230        Self {
231            exactly_once: false,
232            idempotent: false,
233            upsert: false,
234            changelog: false,
235            two_phase_commit: false,
236            schema_evolution: false,
237            partitioned: false,
238            preserves_pending_on_abandon: false,
239            suggested_write_timeout,
240        }
241    }
242
243    /// Declare that pending output survives an abandoned epoch into the
244    /// next commit without a rollback (see the field docs).
245    #[must_use]
246    pub fn with_preserves_pending_on_abandon(mut self) -> Self {
247        self.preserves_pending_on_abandon = true;
248        self
249    }
250
251    /// Enable exactly-once semantics (requires epoch + 2PC impl).
252    #[must_use]
253    pub fn with_exactly_once(mut self) -> Self {
254        self.exactly_once = true;
255        self
256    }
257    /// Enable idempotent writes (safe re-delivery on retry).
258    #[must_use]
259    pub fn with_idempotent(mut self) -> Self {
260        self.idempotent = true;
261        self
262    }
263    /// Enable upsert (key-based insert-or-update).
264    #[must_use]
265    pub fn with_upsert(mut self) -> Self {
266        self.upsert = true;
267        self
268    }
269    /// Enable changelog/retraction records.
270    #[must_use]
271    pub fn with_changelog(mut self) -> Self {
272        self.changelog = true;
273        self
274    }
275    /// Enable two-phase commit (`pre_commit` + `commit_epoch`).
276    #[must_use]
277    pub fn with_two_phase_commit(mut self) -> Self {
278        self.two_phase_commit = true;
279        self
280    }
281    /// Enable additive schema evolution.
282    #[must_use]
283    pub fn with_schema_evolution(mut self) -> Self {
284        self.schema_evolution = true;
285        self
286    }
287    /// Enable partitioned writes.
288    #[must_use]
289    pub fn with_partitioned(mut self) -> Self {
290        self.partitioned = true;
291        self
292    }
293}
294
295/// Trait for source connectors that read data from external systems.
296///
297/// Source connectors operate in Ring 1 and push data into Ring 0 via
298/// the streaming `Source<ArrowRecord>::push_arrow()` API.
299///
300/// # Lifecycle
301///
302/// 1. `open()` — initialize connection, discover schema
303/// 2. `poll_batch()` — read batches in a loop
304/// 3. `checkpoint()` / `restore()` — manage offsets
305/// 4. `close()` — clean shutdown
306#[async_trait]
307pub trait SourceConnector: Send {
308    /// Called once before polling begins.
309    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
310
311    /// `Ok(None)` = no data currently available; runtime retries after a delay.
312    /// `max_records` is a hint — implementations may return fewer.
313    async fn poll_batch(
314        &mut self,
315        max_records: usize,
316    ) -> Result<Option<SourceBatch>, ConnectorError>;
317
318    /// Resolve the source schema from the `WITH (...)` properties before
319    /// DDL reaches the planner. Implementations that hit the network
320    /// (e.g. Kafka fetching an Avro schema from a Schema Registry) must
321    /// bound their I/O with a timeout. Return `Err(ConnectorError::…)` on
322    /// failure so the runtime can surface the cause to DDL — do not log
323    /// and swallow.
324    async fn discover_schema(
325        &mut self,
326        _properties: &std::collections::HashMap<String, String>,
327    ) -> Result<(), ConnectorError> {
328        Ok(())
329    }
330
331    /// Arrow schema of records this source produces.
332    fn schema(&self) -> SchemaRef;
333
334    /// Returned checkpoint must contain enough info to resume from the
335    /// current position after a restart.
336    fn checkpoint(&self) -> SourceCheckpoint;
337
338    /// Called during recovery before polling resumes.
339    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
340
341    /// Install the cluster vnode assignment so a partitioned source can bind
342    /// its input partitions to vnodes (`partition % vnode_count`) and consume
343    /// only those it owns, re-binding when the assignment rotates. Called by
344    /// the cluster startup wiring before [`open`](Self::open).
345    ///
346    /// Default: no-op — single-node deployments and sources without a natural
347    /// partitioning ignore it. Only the Kafka source overrides it today.
348    fn set_vnode_assignment(
349        &mut self,
350        _registry: Arc<laminar_core::state::VnodeRegistry>,
351        _self_id: laminar_core::state::NodeId,
352    ) {
353    }
354
355    /// Close the connection and release resources.
356    async fn close(&mut self) -> Result<(), ConnectorError>;
357
358    /// Returns a [`Notify`] handle that is signalled when new data is available.
359    ///
360    /// When `Some`, the pipeline coordinator awaits the notification instead of
361    /// polling on a timer, eliminating idle CPU usage. Sources that receive data
362    /// asynchronously (WebSocket, CDC replication streams, Kafka) should return
363    /// `Some` and call `notify.notify_one()` when data arrives.
364    ///
365    /// The default implementation returns `None`, which causes the pipeline to
366    /// fall back to timer-based polling (suitable for batch/file sources).
367    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
368        None
369    }
370
371    /// Returns this connector as a [`SchemaProvider`](crate::schema::SchemaProvider), if supported.
372    fn as_schema_provider(&self) -> Option<&dyn crate::schema::SchemaProvider> {
373        None
374    }
375
376    /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
377    fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
378        None
379    }
380
381    /// Whether this source supports replay from a checkpointed position.
382    ///
383    /// Sources that return `false` (e.g., WebSocket, raw TCP) cannot seek
384    /// back to a previous offset on recovery. Checkpointing still captures
385    /// their state for best-effort recovery, but exactly-once semantics
386    /// are degraded to at-most-once for events from this source.
387    ///
388    /// The default implementation returns `true` because most durable
389    /// sources (Kafka, CDC, files) support replay.
390    fn supports_replay(&self) -> bool {
391        true
392    }
393
394    /// Returns a shared flag that the source sets to `true` when it
395    /// requests an immediate checkpoint.
396    ///
397    /// This is used by sources that detect external state changes requiring
398    /// a checkpoint before proceeding — for example, Kafka consumer group
399    /// rebalance (partition revocation). The pipeline coordinator polls
400    /// this flag each cycle and clears it after triggering the checkpoint.
401    ///
402    /// The default returns `None` (no source-initiated checkpoints).
403    fn checkpoint_requested(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
404        None
405    }
406
407    /// Acknowledge that `epoch` has been durably committed.
408    ///
409    /// Called after the manifest is persisted and every exactly-once sink
410    /// committed the epoch. The `checkpoint` argument is the exact
411    /// per-source `SourceCheckpoint` that was persisted into the manifest
412    /// for this epoch — sources can rely on it to advance external offset
413    /// state (broker group offsets, lookup-DB cursors, ack tokens) using
414    /// values that match what's durable.
415    ///
416    /// May be called with an empty `checkpoint` for timer-driven commits
417    /// where no per-source state was captured; implementations should
418    /// treat that as a no-op for any externally-visible advancement.
419    ///
420    /// Idempotent — a retry after cancellation is legal.
421    ///
422    /// # Errors
423    ///
424    /// Errors are logged; they do not roll back the committed epoch.
425    async fn notify_epoch_committed(
426        &mut self,
427        _epoch: u64,
428        _checkpoint: &SourceCheckpoint,
429    ) -> Result<(), ConnectorError> {
430        Ok(())
431    }
432}
433
434/// Trait for sink connectors that write data to external systems.
435///
436/// Sink connectors operate in Ring 1, receiving data from Ring 0 and
437/// writing to external systems. Implementations that advertise
438/// `exactly_once` also implement `begin_epoch`/`pre_commit`/`commit_epoch`/
439/// `rollback_epoch`; the runtime drives them via the checkpoint coordinator.
440///
441/// Lifecycle: `open()` → loop over epochs of `begin_epoch()`,
442/// `write_batch()*`, `pre_commit()`, `commit_epoch()` (or `rollback_epoch()`
443/// on failure) → `close()`.
444#[async_trait]
445pub trait SinkConnector: Send {
446    /// Open the connection and prepare to accept writes.
447    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
448
449    /// Must be cancellation-safe: the runtime wraps this in
450    /// `tokio::time::timeout`. Don't split a `&mut self` mutation
451    /// across an `.await`. In-flight transactional state may remain open
452    /// after cancellation; the caller will `rollback_epoch` it.
453    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError>;
454
455    /// Expected Arrow schema of input batches.
456    fn schema(&self) -> SchemaRef;
457
458    /// Default: no-op (at-least-once semantics).
459    async fn begin_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
460        Ok(())
461    }
462
463    /// Phase 1 of 2PC: flush + prepare, do NOT finalize the txn. The
464    /// runtime persists the manifest between `pre_commit` and
465    /// `commit_epoch`; on failure it calls `rollback_epoch`. Default
466    /// delegates to `flush()`.
467    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
468        self.flush().await
469    }
470
471    /// Phase 2 of 2PC: finalize the txn. Called after the manifest is
472    /// durable. Default: no-op (at-least-once semantics).
473    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
474        Ok(())
475    }
476
477    /// Must be idempotent: the runtime calls this on every exactly-once
478    /// sink after a `pre_commit` failure, including sinks that never
479    /// `pre_commit`ed.
480    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
481        Ok(())
482    }
483
484    /// Required (no default) so every implementation declares
485    /// `suggested_write_timeout`.
486    fn capabilities(&self) -> SinkConnectorCapabilities;
487
488    /// Must be internally bounded — the sink task's periodic timer
489    /// calls this on every tick. Thorough drains belong in `pre_commit`
490    /// / `commit_epoch` / `close`, not here.
491    async fn flush(&mut self) -> Result<(), ConnectorError> {
492        Ok(())
493    }
494
495    /// Close the sink and release resources.
496    async fn close(&mut self) -> Result<(), ConnectorError>;
497
498    /// Return a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware)
499    /// view, if the sink speaks a schema registry protocol.
500    fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
501        None
502    }
503}
504
505#[cfg(test)]
506#[allow(clippy::cast_possible_wrap)]
507mod tests {
508    use super::*;
509    use arrow_array::Int64Array;
510    use arrow_schema::{DataType, Field, Schema};
511    use std::sync::Arc;
512
513    fn test_schema() -> SchemaRef {
514        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
515    }
516
517    fn test_batch(n: usize) -> RecordBatch {
518        #[allow(clippy::cast_possible_wrap)]
519        let ids: Vec<i64> = (0..n as i64).collect();
520        RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
521    }
522
523    #[test]
524    fn test_source_batch() {
525        let batch = SourceBatch::new(test_batch(10));
526        assert_eq!(batch.num_rows(), 10);
527        assert!(batch.partition.is_none());
528    }
529
530    #[test]
531    fn test_source_batch_with_partition() {
532        let partition = PartitionInfo::new("0", "1234");
533        let batch = SourceBatch::with_partition(test_batch(5), partition);
534        assert_eq!(batch.num_rows(), 5);
535        assert_eq!(batch.partition.as_ref().unwrap().id, "0");
536        assert_eq!(batch.partition.as_ref().unwrap().offset, "1234");
537    }
538
539    #[test]
540    fn test_partition_info_display() {
541        let p = PartitionInfo::new("3", "42");
542        assert_eq!(p.to_string(), "3@42");
543    }
544
545    #[test]
546    fn test_write_result() {
547        let result = WriteResult::new(100, 5000);
548        assert_eq!(result.records_written, 100);
549        assert_eq!(result.bytes_written, 5000);
550    }
551
552    #[test]
553    fn test_sink_capabilities_builder() {
554        let caps = SinkConnectorCapabilities::new(std::time::Duration::from_secs(5))
555            .with_exactly_once()
556            .with_changelog()
557            .with_partitioned();
558
559        assert!(caps.exactly_once);
560        assert!(!caps.idempotent);
561        assert!(!caps.upsert);
562        assert!(caps.changelog);
563        assert!(!caps.schema_evolution);
564        assert!(caps.partitioned);
565        assert_eq!(
566            caps.suggested_write_timeout,
567            std::time::Duration::from_secs(5)
568        );
569    }
570}