Skip to main content

laminar_connectors/
connector.rs

1//! Connector traits — async `SourceConnector` / `SinkConnector`.
2
3use std::fmt;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use tokio::sync::Notify;
11
12use crate::checkpoint::SourceCheckpoint;
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15use crate::health::HealthStatus;
16use crate::metrics::ConnectorMetrics;
17
18/// Delivery guarantee level for the pipeline.
19///
20/// Configures the expected end-to-end delivery semantics. The pipeline
21/// validates at startup that all sources and sinks meet the requirements
22/// for the chosen guarantee level.
23#[derive(
24    Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
25)]
26pub enum DeliveryGuarantee {
27    /// At-least-once: records may be replayed on recovery. Requires
28    /// checkpointing but tolerates non-replayable sources (with degradation).
29    #[default]
30    AtLeastOnce,
31    /// Exactly-once: no duplicates or losses. Requires all sources to
32    /// support replay, all sinks to support exactly-once, and checkpoint
33    /// to be enabled.
34    ExactlyOnce,
35}
36
37impl std::fmt::Display for DeliveryGuarantee {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            DeliveryGuarantee::AtLeastOnce => write!(f, "at-least-once"),
41            DeliveryGuarantee::ExactlyOnce => write!(f, "exactly-once"),
42        }
43    }
44}
45
46impl FromStr for DeliveryGuarantee {
47    type Err = String;
48
49    fn from_str(s: &str) -> Result<Self, Self::Err> {
50        match s.to_lowercase().replace('-', "_").as_str() {
51            "at_least_once" | "atleastonce" => Ok(Self::AtLeastOnce),
52            "exactly_once" | "exactlyonce" => Ok(Self::ExactlyOnce),
53            other => Err(format!("unknown delivery guarantee: '{other}'")),
54        }
55    }
56}
57
58/// SSL connection mode for `PostgreSQL`-compatible connectors.
59///
60/// Shared by the `PostgreSQL` sink and `PostgreSQL` CDC source. Variant names
61/// follow the `libpq` `sslmode` parameter.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum PostgresSslMode {
64    /// No SSL.
65    Disable,
66    /// Try SSL, fall back to unencrypted.
67    #[default]
68    Prefer,
69    /// Require SSL.
70    Require,
71    /// Require SSL and verify CA certificate.
72    VerifyCa,
73    /// Require SSL, verify certificate and hostname.
74    VerifyFull,
75}
76
77impl std::fmt::Display for PostgresSslMode {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            Self::Disable => write!(f, "disable"),
81            Self::Prefer => write!(f, "prefer"),
82            Self::Require => write!(f, "require"),
83            Self::VerifyCa => write!(f, "verify-ca"),
84            Self::VerifyFull => write!(f, "verify-full"),
85        }
86    }
87}
88
89impl FromStr for PostgresSslMode {
90    type Err = String;
91
92    fn from_str(s: &str) -> Result<Self, Self::Err> {
93        match s.to_lowercase().replace('-', "_").as_str() {
94            "disable" | "off" => Ok(Self::Disable),
95            "prefer" => Ok(Self::Prefer),
96            "require" => Ok(Self::Require),
97            "verify_ca" | "verifyca" => Ok(Self::VerifyCa),
98            "verify_full" | "verifyfull" => Ok(Self::VerifyFull),
99            other => Err(format!("unknown SSL mode: '{other}'")),
100        }
101    }
102}
103
104/// A batch of records read from a source connector.
105#[derive(Debug, Clone)]
106pub struct SourceBatch {
107    /// Arrow batch carrying the records.
108    pub records: RecordBatch,
109    /// The partition this batch came from, if the source is partitioned.
110    pub partition: Option<PartitionInfo>,
111}
112
113impl SourceBatch {
114    /// Construct without partition metadata.
115    #[must_use]
116    pub fn new(records: RecordBatch) -> Self {
117        Self {
118            records,
119            partition: None,
120        }
121    }
122
123    /// Construct with partition metadata attached.
124    #[must_use]
125    pub fn with_partition(records: RecordBatch, partition: PartitionInfo) -> Self {
126        Self {
127            records,
128            partition: Some(partition),
129        }
130    }
131
132    /// Record count in the batch.
133    #[must_use]
134    pub fn num_rows(&self) -> usize {
135        self.records.num_rows()
136    }
137}
138
139/// Source partition identity + current offset (Kafka partition number,
140/// CDC slot name, etc.).
141#[derive(Debug, Clone, PartialEq, Eq, Hash)]
142pub struct PartitionInfo {
143    /// Partition id — free-form string (Kafka partition number as string,
144    /// CDC slot name, file path, …).
145    pub id: String,
146    /// Current offset — interpretation is connector-specific (Kafka offset
147    /// as string, CDC LSN, etc.).
148    pub offset: String,
149}
150
151impl PartitionInfo {
152    /// Construct from id/offset strings or anything that converts.
153    #[must_use]
154    pub fn new(id: impl Into<String>, offset: impl Into<String>) -> Self {
155        Self {
156            id: id.into(),
157            offset: offset.into(),
158        }
159    }
160}
161
162impl fmt::Display for PartitionInfo {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        write!(f, "{}@{}", self.id, self.offset)
165    }
166}
167
168/// Summary of a successful `write_batch` call.
169#[derive(Debug, Clone)]
170pub struct WriteResult {
171    /// Records accepted by the sink.
172    pub records_written: usize,
173    /// Bytes written to the underlying transport (may be estimated).
174    pub bytes_written: u64,
175}
176
177impl WriteResult {
178    /// Construct with raw counts.
179    #[must_use]
180    pub fn new(records_written: usize, bytes_written: u64) -> Self {
181        Self {
182            records_written,
183            bytes_written,
184        }
185    }
186}
187
188/// Capabilities declared by a sink connector.
189#[derive(Debug, Clone)]
190#[allow(clippy::struct_excessive_bools)]
191pub struct SinkConnectorCapabilities {
192    /// Whether the sink supports exactly-once semantics via epochs.
193    pub exactly_once: bool,
194
195    /// Whether the sink supports idempotent writes.
196    pub idempotent: bool,
197
198    /// Whether the sink supports upsert (insert-or-update) writes.
199    pub upsert: bool,
200
201    /// Whether the sink can handle changelog/retraction records.
202    pub changelog: bool,
203
204    /// Whether the sink supports two-phase commit (pre-commit + commit).
205    pub two_phase_commit: bool,
206
207    /// Whether the sink supports schema evolution.
208    pub schema_evolution: bool,
209
210    /// Whether the sink supports partitioned writes.
211    pub partitioned: bool,
212
213    /// Default per-call `write_batch` I/O timeout. Users can override via
214    /// the `sink.write.timeout.ms` connector property.
215    pub suggested_write_timeout: std::time::Duration,
216}
217
218impl SinkConnectorCapabilities {
219    /// All booleans default to `false`; flip via `with_*` below or by
220    /// assigning the fields directly (they're `pub`).
221    #[must_use]
222    pub fn new(suggested_write_timeout: std::time::Duration) -> Self {
223        Self {
224            exactly_once: false,
225            idempotent: false,
226            upsert: false,
227            changelog: false,
228            two_phase_commit: false,
229            schema_evolution: false,
230            partitioned: false,
231            suggested_write_timeout,
232        }
233    }
234
235    /// Enable exactly-once semantics (requires epoch + 2PC impl).
236    #[must_use]
237    pub fn with_exactly_once(mut self) -> Self {
238        self.exactly_once = true;
239        self
240    }
241    /// Enable idempotent writes (safe re-delivery on retry).
242    #[must_use]
243    pub fn with_idempotent(mut self) -> Self {
244        self.idempotent = true;
245        self
246    }
247    /// Enable upsert (key-based insert-or-update).
248    #[must_use]
249    pub fn with_upsert(mut self) -> Self {
250        self.upsert = true;
251        self
252    }
253    /// Enable changelog/retraction records.
254    #[must_use]
255    pub fn with_changelog(mut self) -> Self {
256        self.changelog = true;
257        self
258    }
259    /// Enable two-phase commit (`pre_commit` + `commit_epoch`).
260    #[must_use]
261    pub fn with_two_phase_commit(mut self) -> Self {
262        self.two_phase_commit = true;
263        self
264    }
265    /// Enable additive schema evolution.
266    #[must_use]
267    pub fn with_schema_evolution(mut self) -> Self {
268        self.schema_evolution = true;
269        self
270    }
271    /// Enable partitioned writes.
272    #[must_use]
273    pub fn with_partitioned(mut self) -> Self {
274        self.partitioned = true;
275        self
276    }
277}
278
279/// Trait for source connectors that read data from external systems.
280///
281/// Source connectors operate in Ring 1 and push data into Ring 0 via
282/// the streaming `Source<ArrowRecord>::push_arrow()` API.
283///
284/// # Lifecycle
285///
286/// 1. `open()` — initialize connection, discover schema
287/// 2. `poll_batch()` — read batches in a loop
288/// 3. `checkpoint()` / `restore()` — manage offsets
289/// 4. `close()` — clean shutdown
290#[async_trait]
291pub trait SourceConnector: Send {
292    /// Called once before polling begins.
293    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
294
295    /// `Ok(None)` = no data currently available; runtime retries after a delay.
296    /// `max_records` is a hint — implementations may return fewer.
297    async fn poll_batch(
298        &mut self,
299        max_records: usize,
300    ) -> Result<Option<SourceBatch>, ConnectorError>;
301
302    /// Resolve the source schema from the `WITH (...)` properties before
303    /// DDL reaches the planner. Implementations that hit the network
304    /// (e.g. Kafka fetching an Avro schema from a Schema Registry) must
305    /// bound their I/O with a timeout and leave the schema empty on
306    /// failure rather than hang.
307    async fn discover_schema(&mut self, _properties: &std::collections::HashMap<String, String>) {}
308
309    /// Arrow schema of records this source produces.
310    fn schema(&self) -> SchemaRef;
311
312    /// Returned checkpoint must contain enough info to resume from the
313    /// current position after a restart.
314    fn checkpoint(&self) -> SourceCheckpoint;
315
316    /// Called during recovery before polling resumes.
317    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
318
319    /// Defaults to `Unknown`; connectors should override.
320    fn health_check(&self) -> HealthStatus {
321        HealthStatus::Unknown
322    }
323
324    /// Current connector metrics snapshot.
325    fn metrics(&self) -> ConnectorMetrics {
326        ConnectorMetrics::default()
327    }
328
329    /// Close the connection and release resources.
330    async fn close(&mut self) -> Result<(), ConnectorError>;
331
332    /// Returns a [`Notify`] handle that is signalled when new data is available.
333    ///
334    /// When `Some`, the pipeline coordinator awaits the notification instead of
335    /// polling on a timer, eliminating idle CPU usage. Sources that receive data
336    /// asynchronously (WebSocket, CDC replication streams, Kafka) should return
337    /// `Some` and call `notify.notify_one()` when data arrives.
338    ///
339    /// The default implementation returns `None`, which causes the pipeline to
340    /// fall back to timer-based polling (suitable for batch/file sources).
341    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
342        None
343    }
344
345    /// Returns this connector as a [`SchemaProvider`](crate::schema::SchemaProvider), if supported.
346    fn as_schema_provider(&self) -> Option<&dyn crate::schema::SchemaProvider> {
347        None
348    }
349
350    /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
351    fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
352        None
353    }
354
355    /// Whether this source supports replay from a checkpointed position.
356    ///
357    /// Sources that return `false` (e.g., WebSocket, raw TCP) cannot seek
358    /// back to a previous offset on recovery. Checkpointing still captures
359    /// their state for best-effort recovery, but exactly-once semantics
360    /// are degraded to at-most-once for events from this source.
361    ///
362    /// The default implementation returns `true` because most durable
363    /// sources (Kafka, CDC, files) support replay.
364    fn supports_replay(&self) -> bool {
365        true
366    }
367
368    /// Returns a shared flag that the source sets to `true` when it
369    /// requests an immediate checkpoint.
370    ///
371    /// This is used by sources that detect external state changes requiring
372    /// a checkpoint before proceeding — for example, Kafka consumer group
373    /// rebalance (partition revocation). The pipeline coordinator polls
374    /// this flag each cycle and clears it after triggering the checkpoint.
375    ///
376    /// The default returns `None` (no source-initiated checkpoints).
377    fn checkpoint_requested(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
378        None
379    }
380
381    /// Acknowledge that `epoch` has been durably committed.
382    ///
383    /// Called after the manifest is persisted and every exactly-once sink
384    /// committed the epoch. Idempotent — a retry after cancellation is
385    /// legal.
386    ///
387    /// # Errors
388    ///
389    /// Errors are logged; they do not roll back the committed epoch.
390    async fn notify_epoch_committed(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
391        Ok(())
392    }
393}
394
395/// Trait for sink connectors that write data to external systems.
396///
397/// Sink connectors operate in Ring 1, receiving data from Ring 0 and
398/// writing to external systems. Implementations that advertise
399/// `exactly_once` also implement `begin_epoch`/`pre_commit`/`commit_epoch`/
400/// `rollback_epoch`; the runtime drives them via the checkpoint coordinator.
401///
402/// Lifecycle: `open()` → loop over epochs of `begin_epoch()`,
403/// `write_batch()*`, `pre_commit()`, `commit_epoch()` (or `rollback_epoch()`
404/// on failure) → `close()`.
405#[async_trait]
406pub trait SinkConnector: Send {
407    /// Open the connection and prepare to accept writes.
408    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
409
410    /// Must be cancellation-safe: the runtime wraps this in
411    /// `tokio::time::timeout`. Don't split a `&mut self` mutation
412    /// across an `.await`. In-flight transactional state may remain open
413    /// after cancellation; the caller will `rollback_epoch` it.
414    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError>;
415
416    /// Expected Arrow schema of input batches.
417    fn schema(&self) -> SchemaRef;
418
419    /// Default: no-op (at-least-once semantics).
420    async fn begin_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
421        Ok(())
422    }
423
424    /// Phase 1 of 2PC: flush + prepare, do NOT finalize the txn. The
425    /// runtime persists the manifest between `pre_commit` and
426    /// `commit_epoch`; on failure it calls `rollback_epoch`. Default
427    /// delegates to `flush()`.
428    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
429        self.flush().await
430    }
431
432    /// Phase 2 of 2PC: finalize the txn. Called after the manifest is
433    /// durable. Default: no-op (at-least-once semantics).
434    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
435        Ok(())
436    }
437
438    /// Must be idempotent: the runtime calls this on every exactly-once
439    /// sink after a `pre_commit` failure, including sinks that never
440    /// `pre_commit`ed.
441    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
442        Ok(())
443    }
444
445    /// Defaults to `Unknown`; connectors should override.
446    fn health_check(&self) -> HealthStatus {
447        HealthStatus::Unknown
448    }
449
450    /// Current sink metrics snapshot.
451    fn metrics(&self) -> ConnectorMetrics {
452        ConnectorMetrics::default()
453    }
454
455    /// Required (no default) so every implementation declares
456    /// `suggested_write_timeout`.
457    fn capabilities(&self) -> SinkConnectorCapabilities;
458
459    /// Must be internally bounded — the sink task's periodic timer
460    /// calls this on every tick. Thorough drains belong in `pre_commit`
461    /// / `commit_epoch` / `close`, not here.
462    async fn flush(&mut self) -> Result<(), ConnectorError> {
463        Ok(())
464    }
465
466    /// Close the sink and release resources.
467    async fn close(&mut self) -> Result<(), ConnectorError>;
468
469    /// Return a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware)
470    /// view, if the sink speaks a schema registry protocol.
471    fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
472        None
473    }
474}
475
476#[cfg(test)]
477#[allow(clippy::cast_possible_wrap)]
478mod tests {
479    use super::*;
480    use arrow_array::Int64Array;
481    use arrow_schema::{DataType, Field, Schema};
482    use std::sync::Arc;
483
484    fn test_schema() -> SchemaRef {
485        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
486    }
487
488    fn test_batch(n: usize) -> RecordBatch {
489        #[allow(clippy::cast_possible_wrap)]
490        let ids: Vec<i64> = (0..n as i64).collect();
491        RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
492    }
493
494    #[test]
495    fn test_source_batch() {
496        let batch = SourceBatch::new(test_batch(10));
497        assert_eq!(batch.num_rows(), 10);
498        assert!(batch.partition.is_none());
499    }
500
501    #[test]
502    fn test_source_batch_with_partition() {
503        let partition = PartitionInfo::new("0", "1234");
504        let batch = SourceBatch::with_partition(test_batch(5), partition);
505        assert_eq!(batch.num_rows(), 5);
506        assert_eq!(batch.partition.as_ref().unwrap().id, "0");
507        assert_eq!(batch.partition.as_ref().unwrap().offset, "1234");
508    }
509
510    #[test]
511    fn test_partition_info_display() {
512        let p = PartitionInfo::new("3", "42");
513        assert_eq!(p.to_string(), "3@42");
514    }
515
516    #[test]
517    fn test_write_result() {
518        let result = WriteResult::new(100, 5000);
519        assert_eq!(result.records_written, 100);
520        assert_eq!(result.bytes_written, 5000);
521    }
522
523    #[test]
524    fn test_sink_capabilities_builder() {
525        let caps = SinkConnectorCapabilities::new(std::time::Duration::from_secs(5))
526            .with_exactly_once()
527            .with_changelog()
528            .with_partitioned();
529
530        assert!(caps.exactly_once);
531        assert!(!caps.idempotent);
532        assert!(!caps.upsert);
533        assert!(caps.changelog);
534        assert!(!caps.schema_evolution);
535        assert!(caps.partitioned);
536        assert_eq!(
537            caps.suggested_write_timeout,
538            std::time::Duration::from_secs(5)
539        );
540    }
541}