laminar_connectors/connector.rs
1//! Core connector traits.
2//!
3//! Defines the async traits that all source and sink connectors implement:
4//! - `SourceConnector`: Reads data from external systems
5//! - `SinkConnector`: Writes data to external systems
6//!
7//! These traits operate in Ring 1 (background) and communicate with Ring 0
8//! through the streaming API (`Source<T>::push_arrow()` and subscriptions).
9
10use std::fmt;
11use std::sync::Arc;
12
13use arrow_array::RecordBatch;
14use arrow_schema::SchemaRef;
15use async_trait::async_trait;
16use tokio::sync::Notify;
17
18use crate::checkpoint::SourceCheckpoint;
19use crate::config::ConnectorConfig;
20use crate::error::ConnectorError;
21use crate::health::HealthStatus;
22use crate::metrics::ConnectorMetrics;
23
24/// Delivery guarantee level for the pipeline.
25///
26/// Configures the expected end-to-end delivery semantics. The pipeline
27/// validates at startup that all sources and sinks meet the requirements
28/// for the chosen guarantee level.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum DeliveryGuarantee {
31 /// At-least-once: records may be replayed on recovery. Requires
32 /// checkpointing but tolerates non-replayable sources (with degradation).
33 #[default]
34 AtLeastOnce,
35 /// Exactly-once: no duplicates or losses. Requires all sources to
36 /// support replay, all sinks to support exactly-once, and checkpoint
37 /// to be enabled.
38 ExactlyOnce,
39}
40
41impl std::fmt::Display for DeliveryGuarantee {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 DeliveryGuarantee::AtLeastOnce => write!(f, "at-least-once"),
45 DeliveryGuarantee::ExactlyOnce => write!(f, "exactly-once"),
46 }
47 }
48}
49
50/// A batch of records read from a source connector.
51#[derive(Debug, Clone)]
52pub struct SourceBatch {
53 /// The records as an Arrow `RecordBatch`.
54 pub records: RecordBatch,
55
56 /// The partition this batch came from, if applicable.
57 pub partition: Option<PartitionInfo>,
58}
59
60impl SourceBatch {
61 /// Creates a new source batch.
62 #[must_use]
63 pub fn new(records: RecordBatch) -> Self {
64 Self {
65 records,
66 partition: None,
67 }
68 }
69
70 /// Creates a new source batch from a specific partition.
71 #[must_use]
72 pub fn with_partition(records: RecordBatch, partition: PartitionInfo) -> Self {
73 Self {
74 records,
75 partition: Some(partition),
76 }
77 }
78
79 /// Returns the number of records in the batch.
80 #[must_use]
81 pub fn num_rows(&self) -> usize {
82 self.records.num_rows()
83 }
84}
85
86/// Information about a source partition.
87#[derive(Debug, Clone, PartialEq, Eq, Hash)]
88pub struct PartitionInfo {
89 /// Partition identifier (e.g., Kafka partition number, CDC slot name).
90 pub id: String,
91
92 /// Current offset within this partition.
93 pub offset: String,
94}
95
96impl PartitionInfo {
97 /// Creates a new partition info.
98 #[must_use]
99 pub fn new(id: impl Into<String>, offset: impl Into<String>) -> Self {
100 Self {
101 id: id.into(),
102 offset: offset.into(),
103 }
104 }
105}
106
107impl fmt::Display for PartitionInfo {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 write!(f, "{}@{}", self.id, self.offset)
110 }
111}
112
113/// Result of writing a batch to a sink connector.
114#[derive(Debug, Clone)]
115pub struct WriteResult {
116 /// Number of records successfully written.
117 pub records_written: usize,
118
119 /// Number of bytes written.
120 pub bytes_written: u64,
121}
122
123impl WriteResult {
124 /// Creates a new write result.
125 #[must_use]
126 pub fn new(records_written: usize, bytes_written: u64) -> Self {
127 Self {
128 records_written,
129 bytes_written,
130 }
131 }
132}
133
134/// Capabilities declared by a sink connector.
135///
136/// Describes the capabilities of an external sink connector implementation.
137#[derive(Debug, Clone, Default)]
138#[allow(clippy::struct_excessive_bools)]
139pub struct SinkConnectorCapabilities {
140 /// Whether the sink supports exactly-once semantics via epochs.
141 pub exactly_once: bool,
142
143 /// Whether the sink supports idempotent writes.
144 pub idempotent: bool,
145
146 /// Whether the sink supports upsert (insert-or-update) writes.
147 pub upsert: bool,
148
149 /// Whether the sink can handle changelog/retraction records.
150 pub changelog: bool,
151
152 /// Whether the sink supports two-phase commit (pre-commit + commit).
153 pub two_phase_commit: bool,
154
155 /// Whether the sink supports schema evolution.
156 pub schema_evolution: bool,
157
158 /// Whether the sink supports partitioned writes.
159 pub partitioned: bool,
160}
161
162impl SinkConnectorCapabilities {
163 /// Creates capabilities with exactly-once support.
164 #[must_use]
165 pub fn with_exactly_once(mut self) -> Self {
166 self.exactly_once = true;
167 self
168 }
169
170 /// Creates capabilities with idempotent write support.
171 #[must_use]
172 pub fn with_idempotent(mut self) -> Self {
173 self.idempotent = true;
174 self
175 }
176
177 /// Creates capabilities with upsert support.
178 #[must_use]
179 pub fn with_upsert(mut self) -> Self {
180 self.upsert = true;
181 self
182 }
183
184 /// Creates capabilities with changelog support.
185 #[must_use]
186 pub fn with_changelog(mut self) -> Self {
187 self.changelog = true;
188 self
189 }
190
191 /// Creates capabilities with two-phase commit support (pre-commit + commit).
192 #[must_use]
193 pub fn with_two_phase_commit(mut self) -> Self {
194 self.two_phase_commit = true;
195 self
196 }
197
198 /// Creates capabilities with schema evolution support.
199 #[must_use]
200 pub fn with_schema_evolution(mut self) -> Self {
201 self.schema_evolution = true;
202 self
203 }
204
205 /// Creates capabilities with partitioned write support.
206 #[must_use]
207 pub fn with_partitioned(mut self) -> Self {
208 self.partitioned = true;
209 self
210 }
211}
212
213/// Trait for source connectors that read data from external systems.
214///
215/// Source connectors operate in Ring 1 and push data into Ring 0 via
216/// the streaming `Source<ArrowRecord>::push_arrow()` API.
217///
218/// # Lifecycle
219///
220/// 1. `open()` - Initialize connection, discover schema
221/// 2. `poll_batch()` - Read batches in a loop
222/// 3. `checkpoint()` / `restore()` - Manage offsets
223/// 4. `close()` - Clean shutdown
224///
225/// # Example
226///
227/// ```rust,ignore
228/// struct MySource { /* ... */ }
229///
230/// #[async_trait]
231/// impl SourceConnector for MySource {
232/// async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
233/// // Connect to external system
234/// Ok(())
235/// }
236///
237/// async fn poll_batch(&mut self, max_records: usize) -> Result<Option<SourceBatch>, ConnectorError> {
238/// // Read up to max_records from external system
239/// Ok(None) // None = no data available yet
240/// }
241///
242/// // ... other methods
243/// }
244/// ```
245#[async_trait]
246pub trait SourceConnector: Send {
247 /// Opens the connector and initializes the connection.
248 ///
249 /// Called once before any polling begins. The connector should establish
250 /// connections, discover the schema, and prepare for reading.
251 ///
252 /// # Errors
253 ///
254 /// Returns `ConnectorError` if connection or initialization fails.
255 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
256
257 /// Polls for the next batch of records.
258 ///
259 /// Returns `Ok(Some(batch))` when records are available, or `Ok(None)` when
260 /// no data is currently available (the runtime will poll again after a delay).
261 ///
262 /// The `max_records` parameter is a hint; implementations may return fewer.
263 ///
264 /// # Errors
265 ///
266 /// Returns `ConnectorError` on read failure.
267 async fn poll_batch(
268 &mut self,
269 max_records: usize,
270 ) -> Result<Option<SourceBatch>, ConnectorError>;
271
272 /// Returns the schema of records produced by this source.
273 fn schema(&self) -> SchemaRef;
274
275 /// Creates a checkpoint of the current source position.
276 ///
277 /// The returned checkpoint contains enough information to resume
278 /// reading from this position after a restart.
279 fn checkpoint(&self) -> SourceCheckpoint;
280
281 /// Restores the source to a previously checkpointed position.
282 ///
283 /// Called during recovery before polling resumes.
284 ///
285 /// # Errors
286 ///
287 /// Returns `ConnectorError` if the checkpoint is invalid or the
288 /// seek operation fails.
289 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
290
291 /// Returns the current health status of the connector.
292 ///
293 /// Defaults to `Unknown`; connectors should override with actual status.
294 fn health_check(&self) -> HealthStatus {
295 HealthStatus::Unknown
296 }
297
298 /// Returns current metrics from the connector.
299 fn metrics(&self) -> ConnectorMetrics {
300 ConnectorMetrics::default()
301 }
302
303 /// Closes the connector and releases all resources.
304 ///
305 /// # Errors
306 ///
307 /// Returns `ConnectorError` if cleanup fails.
308 async fn close(&mut self) -> Result<(), ConnectorError>;
309
310 /// Returns a [`Notify`] handle that is signalled when new data is available.
311 ///
312 /// When `Some`, the pipeline coordinator awaits the notification instead of
313 /// polling on a timer, eliminating idle CPU usage. Sources that receive data
314 /// asynchronously (WebSocket, CDC replication streams, Kafka) should return
315 /// `Some` and call `notify.notify_one()` when data arrives.
316 ///
317 /// The default implementation returns `None`, which causes the pipeline to
318 /// fall back to timer-based polling (suitable for batch/file sources).
319 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
320 None
321 }
322
323 /// Returns this connector as a [`SchemaProvider`](crate::schema::SchemaProvider), if supported.
324 fn as_schema_provider(&self) -> Option<&dyn crate::schema::SchemaProvider> {
325 None
326 }
327
328 /// Returns this connector as a [`SchemaInferable`](crate::schema::SchemaInferable), if supported.
329 fn as_schema_inferable(&self) -> Option<&dyn crate::schema::SchemaInferable> {
330 None
331 }
332
333 /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
334 fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
335 None
336 }
337
338 /// Returns this connector as a [`SchemaEvolvable`](crate::schema::SchemaEvolvable), if supported.
339 fn as_schema_evolvable(&self) -> Option<&dyn crate::schema::SchemaEvolvable> {
340 None
341 }
342
343 /// Whether this source supports replay from a checkpointed position.
344 ///
345 /// Sources that return `false` (e.g., WebSocket, raw TCP) cannot seek
346 /// back to a previous offset on recovery. Checkpointing still captures
347 /// their state for best-effort recovery, but exactly-once semantics
348 /// are degraded to at-most-once for events from this source.
349 ///
350 /// The default implementation returns `true` because most durable
351 /// sources (Kafka, CDC, files) support replay.
352 fn supports_replay(&self) -> bool {
353 true
354 }
355
356 /// Returns a shared flag that the source sets to `true` when it
357 /// requests an immediate checkpoint.
358 ///
359 /// This is used by sources that detect external state changes requiring
360 /// a checkpoint before proceeding — for example, Kafka consumer group
361 /// rebalance (partition revocation). The pipeline coordinator polls
362 /// this flag each cycle and clears it after triggering the checkpoint.
363 ///
364 /// The default returns `None` (no source-initiated checkpoints).
365 fn checkpoint_requested(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
366 None
367 }
368}
369
370/// Trait for sink connectors that write data to external systems.
371///
372/// Sink connectors operate in Ring 1, receiving data from Ring 0 via
373/// subscriptions and writing to external systems.
374///
375/// # Exactly-Once Support
376///
377/// Sinks that support exactly-once semantics implement the epoch-based
378/// methods (`begin_epoch`, `commit_epoch`, `rollback_epoch`). The runtime
379/// calls these in coordination with the checkpoint manager.
380///
381/// # Lifecycle
382///
383/// 1. `open()` - Initialize connection
384/// 2. For each epoch:
385/// a. `begin_epoch()` - Start transaction
386/// b. `write_batch()` - Write records (may be called multiple times)
387/// c. `commit_epoch()` - Commit transaction
388/// 3. `close()` - Clean shutdown
389#[async_trait]
390pub trait SinkConnector: Send {
391 /// Opens the connector and initializes the connection.
392 ///
393 /// # Errors
394 ///
395 /// Returns `ConnectorError` if connection or initialization fails.
396 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
397
398 /// Writes a batch of records to the external system.
399 ///
400 /// # Errors
401 ///
402 /// Returns `ConnectorError` on write failure.
403 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError>;
404
405 /// Returns the expected input schema for this sink.
406 fn schema(&self) -> SchemaRef;
407
408 /// Begins a new epoch for exactly-once processing.
409 ///
410 /// Called by the runtime when a new checkpoint epoch starts.
411 /// Default implementation does nothing (at-least-once semantics).
412 ///
413 /// # Errors
414 ///
415 /// Returns `ConnectorError` if the epoch cannot be started.
416 async fn begin_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
417 Ok(())
418 }
419
420 /// Pre-commits the current epoch (phase 1 of two-phase commit).
421 ///
422 /// Called after all writes for this epoch are complete but before the
423 /// checkpoint manifest is persisted. The sink should flush any buffered
424 /// data and prepare for commit, but must NOT finalize the transaction.
425 ///
426 /// The protocol is:
427 /// 1. `pre_commit(epoch)` — flush/prepare (this method)
428 /// 2. Manifest persisted to disk
429 /// 3. `commit_epoch(epoch)` — finalize transaction
430 /// 4. On failure: `rollback_epoch(epoch)`
431 ///
432 /// Default implementation delegates to `flush()`.
433 ///
434 /// # Errors
435 ///
436 /// Returns `ConnectorError` if the pre-commit fails.
437 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
438 self.flush().await
439 }
440
441 /// Commits the current epoch (phase 2 of two-phase commit).
442 ///
443 /// Called by the runtime after the checkpoint manifest is successfully
444 /// persisted. The sink should finalize any pending transactions.
445 /// Default implementation does nothing (at-least-once semantics).
446 ///
447 /// # Errors
448 ///
449 /// Returns `ConnectorError` if the commit fails.
450 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
451 Ok(())
452 }
453
454 /// Rolls back the current epoch.
455 ///
456 /// Called by the runtime when a checkpoint fails.
457 /// Default implementation does nothing.
458 ///
459 /// # Errors
460 ///
461 /// Returns `ConnectorError` if the rollback fails.
462 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
463 Ok(())
464 }
465
466 /// Returns the current health status of the connector.
467 ///
468 /// Defaults to `Unknown`; connectors should override with actual status.
469 fn health_check(&self) -> HealthStatus {
470 HealthStatus::Unknown
471 }
472
473 /// Returns current metrics from the connector.
474 fn metrics(&self) -> ConnectorMetrics {
475 ConnectorMetrics::default()
476 }
477
478 /// Returns the capabilities of this sink connector.
479 fn capabilities(&self) -> SinkConnectorCapabilities {
480 SinkConnectorCapabilities::default()
481 }
482
483 /// Flushes any buffered data to the external system.
484 ///
485 /// # Errors
486 ///
487 /// Returns `ConnectorError` if the flush fails.
488 async fn flush(&mut self) -> Result<(), ConnectorError> {
489 Ok(())
490 }
491
492 /// Closes the connector and releases all resources.
493 ///
494 /// # Errors
495 ///
496 /// Returns `ConnectorError` if cleanup fails.
497 async fn close(&mut self) -> Result<(), ConnectorError>;
498
499 /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
500 fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
501 None
502 }
503
504 /// Returns this connector as a [`SchemaEvolvable`](crate::schema::SchemaEvolvable), if supported.
505 fn as_schema_evolvable(&self) -> Option<&dyn crate::schema::SchemaEvolvable> {
506 None
507 }
508}
509
510#[cfg(test)]
511#[allow(clippy::cast_possible_wrap)]
512mod tests {
513 use super::*;
514 use arrow_array::Int64Array;
515 use arrow_schema::{DataType, Field, Schema};
516 use std::sync::Arc;
517
518 fn test_schema() -> SchemaRef {
519 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
520 }
521
522 fn test_batch(n: usize) -> RecordBatch {
523 #[allow(clippy::cast_possible_wrap)]
524 let ids: Vec<i64> = (0..n as i64).collect();
525 RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
526 }
527
528 #[test]
529 fn test_source_batch() {
530 let batch = SourceBatch::new(test_batch(10));
531 assert_eq!(batch.num_rows(), 10);
532 assert!(batch.partition.is_none());
533 }
534
535 #[test]
536 fn test_source_batch_with_partition() {
537 let partition = PartitionInfo::new("0", "1234");
538 let batch = SourceBatch::with_partition(test_batch(5), partition);
539 assert_eq!(batch.num_rows(), 5);
540 assert_eq!(batch.partition.as_ref().unwrap().id, "0");
541 assert_eq!(batch.partition.as_ref().unwrap().offset, "1234");
542 }
543
544 #[test]
545 fn test_partition_info_display() {
546 let p = PartitionInfo::new("3", "42");
547 assert_eq!(p.to_string(), "3@42");
548 }
549
550 #[test]
551 fn test_write_result() {
552 let result = WriteResult::new(100, 5000);
553 assert_eq!(result.records_written, 100);
554 assert_eq!(result.bytes_written, 5000);
555 }
556
557 #[test]
558 fn test_sink_capabilities_builder() {
559 let caps = SinkConnectorCapabilities::default()
560 .with_exactly_once()
561 .with_changelog()
562 .with_partitioned();
563
564 assert!(caps.exactly_once);
565 assert!(!caps.idempotent);
566 assert!(!caps.upsert);
567 assert!(caps.changelog);
568 assert!(!caps.schema_evolution);
569 assert!(caps.partitioned);
570 }
571}