laminar_connectors/connector.rs
1//! Connector traits — async `SourceConnector` / `SinkConnector`.
2
3use std::fmt;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use tokio::sync::Notify;
11
12use crate::checkpoint::SourceCheckpoint;
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15use crate::health::HealthStatus;
16use crate::metrics::ConnectorMetrics;
17
18/// Delivery guarantee level for the pipeline.
19///
20/// Configures the expected end-to-end delivery semantics. The pipeline
21/// validates at startup that all sources and sinks meet the requirements
22/// for the chosen guarantee level.
23#[derive(
24 Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
25)]
26pub enum DeliveryGuarantee {
27 /// At-least-once: records may be replayed on recovery. Requires
28 /// checkpointing but tolerates non-replayable sources (with degradation).
29 #[default]
30 AtLeastOnce,
31 /// Exactly-once: no duplicates or losses. Requires all sources to
32 /// support replay, all sinks to support exactly-once, and checkpoint
33 /// to be enabled.
34 ExactlyOnce,
35}
36
37impl std::fmt::Display for DeliveryGuarantee {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 DeliveryGuarantee::AtLeastOnce => write!(f, "at-least-once"),
41 DeliveryGuarantee::ExactlyOnce => write!(f, "exactly-once"),
42 }
43 }
44}
45
46impl FromStr for DeliveryGuarantee {
47 type Err = String;
48
49 fn from_str(s: &str) -> Result<Self, Self::Err> {
50 match s.to_lowercase().replace('-', "_").as_str() {
51 "at_least_once" | "atleastonce" => Ok(Self::AtLeastOnce),
52 "exactly_once" | "exactlyonce" => Ok(Self::ExactlyOnce),
53 other => Err(format!("unknown delivery guarantee: '{other}'")),
54 }
55 }
56}
57
58/// SSL connection mode for `PostgreSQL`-compatible connectors.
59///
60/// Shared by the `PostgreSQL` sink and `PostgreSQL` CDC source. Variant names
61/// follow the `libpq` `sslmode` parameter.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum PostgresSslMode {
64 /// No SSL.
65 Disable,
66 /// Try SSL, fall back to unencrypted.
67 #[default]
68 Prefer,
69 /// Require SSL.
70 Require,
71 /// Require SSL and verify CA certificate.
72 VerifyCa,
73 /// Require SSL, verify certificate and hostname.
74 VerifyFull,
75}
76
77impl std::fmt::Display for PostgresSslMode {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 Self::Disable => write!(f, "disable"),
81 Self::Prefer => write!(f, "prefer"),
82 Self::Require => write!(f, "require"),
83 Self::VerifyCa => write!(f, "verify-ca"),
84 Self::VerifyFull => write!(f, "verify-full"),
85 }
86 }
87}
88
89impl FromStr for PostgresSslMode {
90 type Err = String;
91
92 fn from_str(s: &str) -> Result<Self, Self::Err> {
93 match s.to_lowercase().replace('-', "_").as_str() {
94 "disable" | "off" => Ok(Self::Disable),
95 "prefer" => Ok(Self::Prefer),
96 "require" => Ok(Self::Require),
97 "verify_ca" | "verifyca" => Ok(Self::VerifyCa),
98 "verify_full" | "verifyfull" => Ok(Self::VerifyFull),
99 other => Err(format!("unknown SSL mode: '{other}'")),
100 }
101 }
102}
103
104/// A batch of records read from a source connector.
105#[derive(Debug, Clone)]
106pub struct SourceBatch {
107 /// Arrow batch carrying the records.
108 pub records: RecordBatch,
109 /// The partition this batch came from, if the source is partitioned.
110 pub partition: Option<PartitionInfo>,
111}
112
113impl SourceBatch {
114 /// Construct without partition metadata.
115 #[must_use]
116 pub fn new(records: RecordBatch) -> Self {
117 Self {
118 records,
119 partition: None,
120 }
121 }
122
123 /// Construct with partition metadata attached.
124 #[must_use]
125 pub fn with_partition(records: RecordBatch, partition: PartitionInfo) -> Self {
126 Self {
127 records,
128 partition: Some(partition),
129 }
130 }
131
132 /// Record count in the batch.
133 #[must_use]
134 pub fn num_rows(&self) -> usize {
135 self.records.num_rows()
136 }
137}
138
139/// Source partition identity + current offset (Kafka partition number,
140/// CDC slot name, etc.).
141#[derive(Debug, Clone, PartialEq, Eq, Hash)]
142pub struct PartitionInfo {
143 /// Partition id — free-form string (Kafka partition number as string,
144 /// CDC slot name, file path, …).
145 pub id: String,
146 /// Current offset — interpretation is connector-specific (Kafka offset
147 /// as string, CDC LSN, etc.).
148 pub offset: String,
149}
150
151impl PartitionInfo {
152 /// Construct from id/offset strings or anything that converts.
153 #[must_use]
154 pub fn new(id: impl Into<String>, offset: impl Into<String>) -> Self {
155 Self {
156 id: id.into(),
157 offset: offset.into(),
158 }
159 }
160}
161
162impl fmt::Display for PartitionInfo {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 write!(f, "{}@{}", self.id, self.offset)
165 }
166}
167
168/// Summary of a successful `write_batch` call.
169#[derive(Debug, Clone)]
170pub struct WriteResult {
171 /// Records accepted by the sink.
172 pub records_written: usize,
173 /// Bytes written to the underlying transport (may be estimated).
174 pub bytes_written: u64,
175}
176
177impl WriteResult {
178 /// Construct with raw counts.
179 #[must_use]
180 pub fn new(records_written: usize, bytes_written: u64) -> Self {
181 Self {
182 records_written,
183 bytes_written,
184 }
185 }
186}
187
188/// Capabilities declared by a sink connector.
189#[derive(Debug, Clone)]
190#[allow(clippy::struct_excessive_bools)]
191pub struct SinkConnectorCapabilities {
192 /// Whether the sink supports exactly-once semantics via epochs.
193 pub exactly_once: bool,
194
195 /// Whether the sink supports idempotent writes.
196 pub idempotent: bool,
197
198 /// Whether the sink supports upsert (insert-or-update) writes.
199 pub upsert: bool,
200
201 /// Whether the sink can handle changelog/retraction records.
202 pub changelog: bool,
203
204 /// Whether the sink supports two-phase commit (pre-commit + commit).
205 pub two_phase_commit: bool,
206
207 /// Whether the sink supports schema evolution.
208 pub schema_evolution: bool,
209
210 /// Whether the sink supports partitioned writes.
211 pub partitioned: bool,
212
213 /// Default per-call `write_batch` I/O timeout. Users can override via
214 /// the `sink.write.timeout.ms` connector property.
215 pub suggested_write_timeout: std::time::Duration,
216}
217
218impl SinkConnectorCapabilities {
219 /// All booleans default to `false`; flip via `with_*` below or by
220 /// assigning the fields directly (they're `pub`).
221 #[must_use]
222 pub fn new(suggested_write_timeout: std::time::Duration) -> Self {
223 Self {
224 exactly_once: false,
225 idempotent: false,
226 upsert: false,
227 changelog: false,
228 two_phase_commit: false,
229 schema_evolution: false,
230 partitioned: false,
231 suggested_write_timeout,
232 }
233 }
234
235 /// Enable exactly-once semantics (requires epoch + 2PC impl).
236 #[must_use]
237 pub fn with_exactly_once(mut self) -> Self {
238 self.exactly_once = true;
239 self
240 }
241 /// Enable idempotent writes (safe re-delivery on retry).
242 #[must_use]
243 pub fn with_idempotent(mut self) -> Self {
244 self.idempotent = true;
245 self
246 }
247 /// Enable upsert (key-based insert-or-update).
248 #[must_use]
249 pub fn with_upsert(mut self) -> Self {
250 self.upsert = true;
251 self
252 }
253 /// Enable changelog/retraction records.
254 #[must_use]
255 pub fn with_changelog(mut self) -> Self {
256 self.changelog = true;
257 self
258 }
259 /// Enable two-phase commit (`pre_commit` + `commit_epoch`).
260 #[must_use]
261 pub fn with_two_phase_commit(mut self) -> Self {
262 self.two_phase_commit = true;
263 self
264 }
265 /// Enable additive schema evolution.
266 #[must_use]
267 pub fn with_schema_evolution(mut self) -> Self {
268 self.schema_evolution = true;
269 self
270 }
271 /// Enable partitioned writes.
272 #[must_use]
273 pub fn with_partitioned(mut self) -> Self {
274 self.partitioned = true;
275 self
276 }
277}
278
279/// Trait for source connectors that read data from external systems.
280///
281/// Source connectors operate in Ring 1 and push data into Ring 0 via
282/// the streaming `Source<ArrowRecord>::push_arrow()` API.
283///
284/// # Lifecycle
285///
286/// 1. `open()` — initialize connection, discover schema
287/// 2. `poll_batch()` — read batches in a loop
288/// 3. `checkpoint()` / `restore()` — manage offsets
289/// 4. `close()` — clean shutdown
290#[async_trait]
291pub trait SourceConnector: Send {
292 /// Called once before polling begins.
293 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
294
295 /// `Ok(None)` = no data currently available; runtime retries after a delay.
296 /// `max_records` is a hint — implementations may return fewer.
297 async fn poll_batch(
298 &mut self,
299 max_records: usize,
300 ) -> Result<Option<SourceBatch>, ConnectorError>;
301
302 /// Resolve the source schema from the `WITH (...)` properties before
303 /// DDL reaches the planner. Implementations that hit the network
304 /// (e.g. Kafka fetching an Avro schema from a Schema Registry) must
305 /// bound their I/O with a timeout and leave the schema empty on
306 /// failure rather than hang.
307 async fn discover_schema(&mut self, _properties: &std::collections::HashMap<String, String>) {}
308
309 /// Arrow schema of records this source produces.
310 fn schema(&self) -> SchemaRef;
311
312 /// Returned checkpoint must contain enough info to resume from the
313 /// current position after a restart.
314 fn checkpoint(&self) -> SourceCheckpoint;
315
316 /// Called during recovery before polling resumes.
317 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
318
319 /// Defaults to `Unknown`; connectors should override.
320 fn health_check(&self) -> HealthStatus {
321 HealthStatus::Unknown
322 }
323
324 /// Current connector metrics snapshot.
325 fn metrics(&self) -> ConnectorMetrics {
326 ConnectorMetrics::default()
327 }
328
329 /// Close the connection and release resources.
330 async fn close(&mut self) -> Result<(), ConnectorError>;
331
332 /// Returns a [`Notify`] handle that is signalled when new data is available.
333 ///
334 /// When `Some`, the pipeline coordinator awaits the notification instead of
335 /// polling on a timer, eliminating idle CPU usage. Sources that receive data
336 /// asynchronously (WebSocket, CDC replication streams, Kafka) should return
337 /// `Some` and call `notify.notify_one()` when data arrives.
338 ///
339 /// The default implementation returns `None`, which causes the pipeline to
340 /// fall back to timer-based polling (suitable for batch/file sources).
341 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
342 None
343 }
344
345 /// Returns this connector as a [`SchemaProvider`](crate::schema::SchemaProvider), if supported.
346 fn as_schema_provider(&self) -> Option<&dyn crate::schema::SchemaProvider> {
347 None
348 }
349
350 /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
351 fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
352 None
353 }
354
355 /// Whether this source supports replay from a checkpointed position.
356 ///
357 /// Sources that return `false` (e.g., WebSocket, raw TCP) cannot seek
358 /// back to a previous offset on recovery. Checkpointing still captures
359 /// their state for best-effort recovery, but exactly-once semantics
360 /// are degraded to at-most-once for events from this source.
361 ///
362 /// The default implementation returns `true` because most durable
363 /// sources (Kafka, CDC, files) support replay.
364 fn supports_replay(&self) -> bool {
365 true
366 }
367
368 /// Returns a shared flag that the source sets to `true` when it
369 /// requests an immediate checkpoint.
370 ///
371 /// This is used by sources that detect external state changes requiring
372 /// a checkpoint before proceeding — for example, Kafka consumer group
373 /// rebalance (partition revocation). The pipeline coordinator polls
374 /// this flag each cycle and clears it after triggering the checkpoint.
375 ///
376 /// The default returns `None` (no source-initiated checkpoints).
377 fn checkpoint_requested(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
378 None
379 }
380
381 /// Acknowledge that `epoch` has been durably committed.
382 ///
383 /// Called after the manifest is persisted and every exactly-once sink
384 /// committed the epoch. Idempotent — a retry after cancellation is
385 /// legal.
386 ///
387 /// # Errors
388 ///
389 /// Errors are logged; they do not roll back the committed epoch.
390 async fn notify_epoch_committed(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
391 Ok(())
392 }
393}
394
395/// Trait for sink connectors that write data to external systems.
396///
397/// Sink connectors operate in Ring 1, receiving data from Ring 0 and
398/// writing to external systems. Implementations that advertise
399/// `exactly_once` also implement `begin_epoch`/`pre_commit`/`commit_epoch`/
400/// `rollback_epoch`; the runtime drives them via the checkpoint coordinator.
401///
402/// Lifecycle: `open()` → loop over epochs of `begin_epoch()`,
403/// `write_batch()*`, `pre_commit()`, `commit_epoch()` (or `rollback_epoch()`
404/// on failure) → `close()`.
405#[async_trait]
406pub trait SinkConnector: Send {
407 /// Open the connection and prepare to accept writes.
408 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
409
410 /// Must be cancellation-safe: the runtime wraps this in
411 /// `tokio::time::timeout`. Don't split a `&mut self` mutation
412 /// across an `.await`. In-flight transactional state may remain open
413 /// after cancellation; the caller will `rollback_epoch` it.
414 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError>;
415
416 /// Expected Arrow schema of input batches.
417 fn schema(&self) -> SchemaRef;
418
419 /// Default: no-op (at-least-once semantics).
420 async fn begin_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
421 Ok(())
422 }
423
424 /// Phase 1 of 2PC: flush + prepare, do NOT finalize the txn. The
425 /// runtime persists the manifest between `pre_commit` and
426 /// `commit_epoch`; on failure it calls `rollback_epoch`. Default
427 /// delegates to `flush()`.
428 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
429 self.flush().await
430 }
431
432 /// Phase 2 of 2PC: finalize the txn. Called after the manifest is
433 /// durable. Default: no-op (at-least-once semantics).
434 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
435 Ok(())
436 }
437
438 /// Must be idempotent: the runtime calls this on every exactly-once
439 /// sink after a `pre_commit` failure, including sinks that never
440 /// `pre_commit`ed.
441 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
442 Ok(())
443 }
444
445 /// Defaults to `Unknown`; connectors should override.
446 fn health_check(&self) -> HealthStatus {
447 HealthStatus::Unknown
448 }
449
450 /// Current sink metrics snapshot.
451 fn metrics(&self) -> ConnectorMetrics {
452 ConnectorMetrics::default()
453 }
454
455 /// Required (no default) so every implementation declares
456 /// `suggested_write_timeout`.
457 fn capabilities(&self) -> SinkConnectorCapabilities;
458
459 /// Must be internally bounded — the sink task's periodic timer
460 /// calls this on every tick. Thorough drains belong in `pre_commit`
461 /// / `commit_epoch` / `close`, not here.
462 async fn flush(&mut self) -> Result<(), ConnectorError> {
463 Ok(())
464 }
465
466 /// Close the sink and release resources.
467 async fn close(&mut self) -> Result<(), ConnectorError>;
468
469 /// Return a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware)
470 /// view, if the sink speaks a schema registry protocol.
471 fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
472 None
473 }
474}
475
476#[cfg(test)]
477#[allow(clippy::cast_possible_wrap)]
478mod tests {
479 use super::*;
480 use arrow_array::Int64Array;
481 use arrow_schema::{DataType, Field, Schema};
482 use std::sync::Arc;
483
484 fn test_schema() -> SchemaRef {
485 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
486 }
487
488 fn test_batch(n: usize) -> RecordBatch {
489 #[allow(clippy::cast_possible_wrap)]
490 let ids: Vec<i64> = (0..n as i64).collect();
491 RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
492 }
493
494 #[test]
495 fn test_source_batch() {
496 let batch = SourceBatch::new(test_batch(10));
497 assert_eq!(batch.num_rows(), 10);
498 assert!(batch.partition.is_none());
499 }
500
501 #[test]
502 fn test_source_batch_with_partition() {
503 let partition = PartitionInfo::new("0", "1234");
504 let batch = SourceBatch::with_partition(test_batch(5), partition);
505 assert_eq!(batch.num_rows(), 5);
506 assert_eq!(batch.partition.as_ref().unwrap().id, "0");
507 assert_eq!(batch.partition.as_ref().unwrap().offset, "1234");
508 }
509
510 #[test]
511 fn test_partition_info_display() {
512 let p = PartitionInfo::new("3", "42");
513 assert_eq!(p.to_string(), "3@42");
514 }
515
516 #[test]
517 fn test_write_result() {
518 let result = WriteResult::new(100, 5000);
519 assert_eq!(result.records_written, 100);
520 assert_eq!(result.bytes_written, 5000);
521 }
522
523 #[test]
524 fn test_sink_capabilities_builder() {
525 let caps = SinkConnectorCapabilities::new(std::time::Duration::from_secs(5))
526 .with_exactly_once()
527 .with_changelog()
528 .with_partitioned();
529
530 assert!(caps.exactly_once);
531 assert!(!caps.idempotent);
532 assert!(!caps.upsert);
533 assert!(caps.changelog);
534 assert!(!caps.schema_evolution);
535 assert!(caps.partitioned);
536 assert_eq!(
537 caps.suggested_write_timeout,
538 std::time::Duration::from_secs(5)
539 );
540 }
541}