Skip to main content

laminar_connectors/
connector.rs

1//! Core connector traits.
2//!
3//! Defines the async traits that all source and sink connectors implement:
4//! - `SourceConnector`: Reads data from external systems
5//! - `SinkConnector`: Writes data to external systems
6//!
7//! These traits operate in Ring 1 (background) and communicate with Ring 0
8//! through the streaming API (`Source<T>::push_arrow()` and subscriptions).
9
10use std::fmt;
11use std::sync::Arc;
12
13use arrow_array::RecordBatch;
14use arrow_schema::SchemaRef;
15use async_trait::async_trait;
16use tokio::sync::Notify;
17
18use crate::checkpoint::SourceCheckpoint;
19use crate::config::ConnectorConfig;
20use crate::error::ConnectorError;
21use crate::health::HealthStatus;
22use crate::metrics::ConnectorMetrics;
23
24/// Delivery guarantee level for the pipeline.
25///
26/// Configures the expected end-to-end delivery semantics. The pipeline
27/// validates at startup that all sources and sinks meet the requirements
28/// for the chosen guarantee level.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum DeliveryGuarantee {
31    /// At-least-once: records may be replayed on recovery. Requires
32    /// checkpointing but tolerates non-replayable sources (with degradation).
33    #[default]
34    AtLeastOnce,
35    /// Exactly-once: no duplicates or losses. Requires all sources to
36    /// support replay, all sinks to support exactly-once, and checkpoint
37    /// to be enabled.
38    ExactlyOnce,
39}
40
41impl std::fmt::Display for DeliveryGuarantee {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            DeliveryGuarantee::AtLeastOnce => write!(f, "at-least-once"),
45            DeliveryGuarantee::ExactlyOnce => write!(f, "exactly-once"),
46        }
47    }
48}
49
50/// A batch of records read from a source connector.
51#[derive(Debug, Clone)]
52pub struct SourceBatch {
53    /// The records as an Arrow `RecordBatch`.
54    pub records: RecordBatch,
55
56    /// The partition this batch came from, if applicable.
57    pub partition: Option<PartitionInfo>,
58}
59
60impl SourceBatch {
61    /// Creates a new source batch.
62    #[must_use]
63    pub fn new(records: RecordBatch) -> Self {
64        Self {
65            records,
66            partition: None,
67        }
68    }
69
70    /// Creates a new source batch from a specific partition.
71    #[must_use]
72    pub fn with_partition(records: RecordBatch, partition: PartitionInfo) -> Self {
73        Self {
74            records,
75            partition: Some(partition),
76        }
77    }
78
79    /// Returns the number of records in the batch.
80    #[must_use]
81    pub fn num_rows(&self) -> usize {
82        self.records.num_rows()
83    }
84}
85
86/// Information about a source partition.
87#[derive(Debug, Clone, PartialEq, Eq, Hash)]
88pub struct PartitionInfo {
89    /// Partition identifier (e.g., Kafka partition number, CDC slot name).
90    pub id: String,
91
92    /// Current offset within this partition.
93    pub offset: String,
94}
95
96impl PartitionInfo {
97    /// Creates a new partition info.
98    #[must_use]
99    pub fn new(id: impl Into<String>, offset: impl Into<String>) -> Self {
100        Self {
101            id: id.into(),
102            offset: offset.into(),
103        }
104    }
105}
106
107impl fmt::Display for PartitionInfo {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        write!(f, "{}@{}", self.id, self.offset)
110    }
111}
112
113/// Result of writing a batch to a sink connector.
114#[derive(Debug, Clone)]
115pub struct WriteResult {
116    /// Number of records successfully written.
117    pub records_written: usize,
118
119    /// Number of bytes written.
120    pub bytes_written: u64,
121}
122
123impl WriteResult {
124    /// Creates a new write result.
125    #[must_use]
126    pub fn new(records_written: usize, bytes_written: u64) -> Self {
127        Self {
128            records_written,
129            bytes_written,
130        }
131    }
132}
133
134/// Capabilities declared by a sink connector.
135///
136/// Describes the capabilities of an external sink connector implementation.
137#[derive(Debug, Clone, Default)]
138#[allow(clippy::struct_excessive_bools)]
139pub struct SinkConnectorCapabilities {
140    /// Whether the sink supports exactly-once semantics via epochs.
141    pub exactly_once: bool,
142
143    /// Whether the sink supports idempotent writes.
144    pub idempotent: bool,
145
146    /// Whether the sink supports upsert (insert-or-update) writes.
147    pub upsert: bool,
148
149    /// Whether the sink can handle changelog/retraction records.
150    pub changelog: bool,
151
152    /// Whether the sink supports two-phase commit (pre-commit + commit).
153    pub two_phase_commit: bool,
154
155    /// Whether the sink supports schema evolution.
156    pub schema_evolution: bool,
157
158    /// Whether the sink supports partitioned writes.
159    pub partitioned: bool,
160}
161
162impl SinkConnectorCapabilities {
163    /// Creates capabilities with exactly-once support.
164    #[must_use]
165    pub fn with_exactly_once(mut self) -> Self {
166        self.exactly_once = true;
167        self
168    }
169
170    /// Creates capabilities with idempotent write support.
171    #[must_use]
172    pub fn with_idempotent(mut self) -> Self {
173        self.idempotent = true;
174        self
175    }
176
177    /// Creates capabilities with upsert support.
178    #[must_use]
179    pub fn with_upsert(mut self) -> Self {
180        self.upsert = true;
181        self
182    }
183
184    /// Creates capabilities with changelog support.
185    #[must_use]
186    pub fn with_changelog(mut self) -> Self {
187        self.changelog = true;
188        self
189    }
190
191    /// Creates capabilities with two-phase commit support (pre-commit + commit).
192    #[must_use]
193    pub fn with_two_phase_commit(mut self) -> Self {
194        self.two_phase_commit = true;
195        self
196    }
197
198    /// Creates capabilities with schema evolution support.
199    #[must_use]
200    pub fn with_schema_evolution(mut self) -> Self {
201        self.schema_evolution = true;
202        self
203    }
204
205    /// Creates capabilities with partitioned write support.
206    #[must_use]
207    pub fn with_partitioned(mut self) -> Self {
208        self.partitioned = true;
209        self
210    }
211}
212
213/// Trait for source connectors that read data from external systems.
214///
215/// Source connectors operate in Ring 1 and push data into Ring 0 via
216/// the streaming `Source<ArrowRecord>::push_arrow()` API.
217///
218/// # Lifecycle
219///
220/// 1. `open()` - Initialize connection, discover schema
221/// 2. `poll_batch()` - Read batches in a loop
222/// 3. `checkpoint()` / `restore()` - Manage offsets
223/// 4. `close()` - Clean shutdown
224///
225/// # Example
226///
227/// ```rust,ignore
228/// struct MySource { /* ... */ }
229///
230/// #[async_trait]
231/// impl SourceConnector for MySource {
232///     async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
233///         // Connect to external system
234///         Ok(())
235///     }
236///
237///     async fn poll_batch(&mut self, max_records: usize) -> Result<Option<SourceBatch>, ConnectorError> {
238///         // Read up to max_records from external system
239///         Ok(None) // None = no data available yet
240///     }
241///
242///     // ... other methods
243/// }
244/// ```
245#[async_trait]
246pub trait SourceConnector: Send {
247    /// Opens the connector and initializes the connection.
248    ///
249    /// Called once before any polling begins. The connector should establish
250    /// connections, discover the schema, and prepare for reading.
251    ///
252    /// # Errors
253    ///
254    /// Returns `ConnectorError` if connection or initialization fails.
255    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
256
257    /// Polls for the next batch of records.
258    ///
259    /// Returns `Ok(Some(batch))` when records are available, or `Ok(None)` when
260    /// no data is currently available (the runtime will poll again after a delay).
261    ///
262    /// The `max_records` parameter is a hint; implementations may return fewer.
263    ///
264    /// # Errors
265    ///
266    /// Returns `ConnectorError` on read failure.
267    async fn poll_batch(
268        &mut self,
269        max_records: usize,
270    ) -> Result<Option<SourceBatch>, ConnectorError>;
271
272    /// Returns the schema of records produced by this source.
273    fn schema(&self) -> SchemaRef;
274
275    /// Creates a checkpoint of the current source position.
276    ///
277    /// The returned checkpoint contains enough information to resume
278    /// reading from this position after a restart.
279    fn checkpoint(&self) -> SourceCheckpoint;
280
281    /// Restores the source to a previously checkpointed position.
282    ///
283    /// Called during recovery before polling resumes.
284    ///
285    /// # Errors
286    ///
287    /// Returns `ConnectorError` if the checkpoint is invalid or the
288    /// seek operation fails.
289    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
290
291    /// Returns the current health status of the connector.
292    ///
293    /// Defaults to `Unknown`; connectors should override with actual status.
294    fn health_check(&self) -> HealthStatus {
295        HealthStatus::Unknown
296    }
297
298    /// Returns current metrics from the connector.
299    fn metrics(&self) -> ConnectorMetrics {
300        ConnectorMetrics::default()
301    }
302
303    /// Closes the connector and releases all resources.
304    ///
305    /// # Errors
306    ///
307    /// Returns `ConnectorError` if cleanup fails.
308    async fn close(&mut self) -> Result<(), ConnectorError>;
309
310    /// Returns a [`Notify`] handle that is signalled when new data is available.
311    ///
312    /// When `Some`, the pipeline coordinator awaits the notification instead of
313    /// polling on a timer, eliminating idle CPU usage. Sources that receive data
314    /// asynchronously (WebSocket, CDC replication streams, Kafka) should return
315    /// `Some` and call `notify.notify_one()` when data arrives.
316    ///
317    /// The default implementation returns `None`, which causes the pipeline to
318    /// fall back to timer-based polling (suitable for batch/file sources).
319    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
320        None
321    }
322
323    /// Returns this connector as a [`SchemaProvider`](crate::schema::SchemaProvider), if supported.
324    fn as_schema_provider(&self) -> Option<&dyn crate::schema::SchemaProvider> {
325        None
326    }
327
328    /// Returns this connector as a [`SchemaInferable`](crate::schema::SchemaInferable), if supported.
329    fn as_schema_inferable(&self) -> Option<&dyn crate::schema::SchemaInferable> {
330        None
331    }
332
333    /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
334    fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
335        None
336    }
337
338    /// Returns this connector as a [`SchemaEvolvable`](crate::schema::SchemaEvolvable), if supported.
339    fn as_schema_evolvable(&self) -> Option<&dyn crate::schema::SchemaEvolvable> {
340        None
341    }
342
343    /// Whether this source supports replay from a checkpointed position.
344    ///
345    /// Sources that return `false` (e.g., WebSocket, raw TCP) cannot seek
346    /// back to a previous offset on recovery. Checkpointing still captures
347    /// their state for best-effort recovery, but exactly-once semantics
348    /// are degraded to at-most-once for events from this source.
349    ///
350    /// The default implementation returns `true` because most durable
351    /// sources (Kafka, CDC, files) support replay.
352    fn supports_replay(&self) -> bool {
353        true
354    }
355
356    /// Returns a shared flag that the source sets to `true` when it
357    /// requests an immediate checkpoint.
358    ///
359    /// This is used by sources that detect external state changes requiring
360    /// a checkpoint before proceeding — for example, Kafka consumer group
361    /// rebalance (partition revocation). The pipeline coordinator polls
362    /// this flag each cycle and clears it after triggering the checkpoint.
363    ///
364    /// The default returns `None` (no source-initiated checkpoints).
365    fn checkpoint_requested(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
366        None
367    }
368}
369
370/// Trait for sink connectors that write data to external systems.
371///
372/// Sink connectors operate in Ring 1, receiving data from Ring 0 via
373/// subscriptions and writing to external systems.
374///
375/// # Exactly-Once Support
376///
377/// Sinks that support exactly-once semantics implement the epoch-based
378/// methods (`begin_epoch`, `commit_epoch`, `rollback_epoch`). The runtime
379/// calls these in coordination with the checkpoint manager.
380///
381/// # Lifecycle
382///
383/// 1. `open()` - Initialize connection
384/// 2. For each epoch:
385///    a. `begin_epoch()` - Start transaction
386///    b. `write_batch()` - Write records (may be called multiple times)
387///    c. `commit_epoch()` - Commit transaction
388/// 3. `close()` - Clean shutdown
389#[async_trait]
390pub trait SinkConnector: Send {
391    /// Opens the connector and initializes the connection.
392    ///
393    /// # Errors
394    ///
395    /// Returns `ConnectorError` if connection or initialization fails.
396    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
397
398    /// Writes a batch of records to the external system.
399    ///
400    /// # Errors
401    ///
402    /// Returns `ConnectorError` on write failure.
403    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError>;
404
405    /// Returns the expected input schema for this sink.
406    fn schema(&self) -> SchemaRef;
407
408    /// Begins a new epoch for exactly-once processing.
409    ///
410    /// Called by the runtime when a new checkpoint epoch starts.
411    /// Default implementation does nothing (at-least-once semantics).
412    ///
413    /// # Errors
414    ///
415    /// Returns `ConnectorError` if the epoch cannot be started.
416    async fn begin_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
417        Ok(())
418    }
419
420    /// Pre-commits the current epoch (phase 1 of two-phase commit).
421    ///
422    /// Called after all writes for this epoch are complete but before the
423    /// checkpoint manifest is persisted. The sink should flush any buffered
424    /// data and prepare for commit, but must NOT finalize the transaction.
425    ///
426    /// The protocol is:
427    /// 1. `pre_commit(epoch)` — flush/prepare (this method)
428    /// 2. Manifest persisted to disk
429    /// 3. `commit_epoch(epoch)` — finalize transaction
430    /// 4. On failure: `rollback_epoch(epoch)`
431    ///
432    /// Default implementation delegates to `flush()`.
433    ///
434    /// # Errors
435    ///
436    /// Returns `ConnectorError` if the pre-commit fails.
437    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
438        self.flush().await
439    }
440
441    /// Commits the current epoch (phase 2 of two-phase commit).
442    ///
443    /// Called by the runtime after the checkpoint manifest is successfully
444    /// persisted. The sink should finalize any pending transactions.
445    /// Default implementation does nothing (at-least-once semantics).
446    ///
447    /// # Errors
448    ///
449    /// Returns `ConnectorError` if the commit fails.
450    async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
451        Ok(())
452    }
453
454    /// Rolls back the current epoch.
455    ///
456    /// Called by the runtime when a checkpoint fails.
457    /// Default implementation does nothing.
458    ///
459    /// # Errors
460    ///
461    /// Returns `ConnectorError` if the rollback fails.
462    async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
463        Ok(())
464    }
465
466    /// Returns the current health status of the connector.
467    ///
468    /// Defaults to `Unknown`; connectors should override with actual status.
469    fn health_check(&self) -> HealthStatus {
470        HealthStatus::Unknown
471    }
472
473    /// Returns current metrics from the connector.
474    fn metrics(&self) -> ConnectorMetrics {
475        ConnectorMetrics::default()
476    }
477
478    /// Returns the capabilities of this sink connector.
479    fn capabilities(&self) -> SinkConnectorCapabilities {
480        SinkConnectorCapabilities::default()
481    }
482
483    /// Flushes any buffered data to the external system.
484    ///
485    /// # Errors
486    ///
487    /// Returns `ConnectorError` if the flush fails.
488    async fn flush(&mut self) -> Result<(), ConnectorError> {
489        Ok(())
490    }
491
492    /// Closes the connector and releases all resources.
493    ///
494    /// # Errors
495    ///
496    /// Returns `ConnectorError` if cleanup fails.
497    async fn close(&mut self) -> Result<(), ConnectorError>;
498
499    /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
500    fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
501        None
502    }
503
504    /// Returns this connector as a [`SchemaEvolvable`](crate::schema::SchemaEvolvable), if supported.
505    fn as_schema_evolvable(&self) -> Option<&dyn crate::schema::SchemaEvolvable> {
506        None
507    }
508}
509
510#[cfg(test)]
511#[allow(clippy::cast_possible_wrap)]
512mod tests {
513    use super::*;
514    use arrow_array::Int64Array;
515    use arrow_schema::{DataType, Field, Schema};
516    use std::sync::Arc;
517
518    fn test_schema() -> SchemaRef {
519        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
520    }
521
522    fn test_batch(n: usize) -> RecordBatch {
523        #[allow(clippy::cast_possible_wrap)]
524        let ids: Vec<i64> = (0..n as i64).collect();
525        RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
526    }
527
528    #[test]
529    fn test_source_batch() {
530        let batch = SourceBatch::new(test_batch(10));
531        assert_eq!(batch.num_rows(), 10);
532        assert!(batch.partition.is_none());
533    }
534
535    #[test]
536    fn test_source_batch_with_partition() {
537        let partition = PartitionInfo::new("0", "1234");
538        let batch = SourceBatch::with_partition(test_batch(5), partition);
539        assert_eq!(batch.num_rows(), 5);
540        assert_eq!(batch.partition.as_ref().unwrap().id, "0");
541        assert_eq!(batch.partition.as_ref().unwrap().offset, "1234");
542    }
543
544    #[test]
545    fn test_partition_info_display() {
546        let p = PartitionInfo::new("3", "42");
547        assert_eq!(p.to_string(), "3@42");
548    }
549
550    #[test]
551    fn test_write_result() {
552        let result = WriteResult::new(100, 5000);
553        assert_eq!(result.records_written, 100);
554        assert_eq!(result.bytes_written, 5000);
555    }
556
557    #[test]
558    fn test_sink_capabilities_builder() {
559        let caps = SinkConnectorCapabilities::default()
560            .with_exactly_once()
561            .with_changelog()
562            .with_partitioned();
563
564        assert!(caps.exactly_once);
565        assert!(!caps.idempotent);
566        assert!(!caps.upsert);
567        assert!(caps.changelog);
568        assert!(!caps.schema_evolution);
569        assert!(caps.partitioned);
570    }
571}