laminar_connectors/connector.rs
1//! Connector traits — async `SourceConnector` / `SinkConnector`.
2
3use std::fmt;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use tokio::sync::Notify;
11
12use crate::checkpoint::SourceCheckpoint;
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15
16/// Delivery guarantee level for the pipeline.
17///
18/// Configures the expected end-to-end delivery semantics. The pipeline
19/// validates at startup that all sources and sinks meet the requirements
20/// for the chosen guarantee level.
21#[derive(
22 Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
23)]
24pub enum DeliveryGuarantee {
25 /// At-least-once: records may be replayed on recovery. Requires
26 /// checkpointing but tolerates non-replayable sources (with degradation).
27 #[default]
28 AtLeastOnce,
29 /// Exactly-once: no duplicates or losses. Requires all sources to
30 /// support replay, all sinks to support exactly-once, and checkpoint
31 /// to be enabled.
32 ExactlyOnce,
33}
34
35impl std::fmt::Display for DeliveryGuarantee {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 DeliveryGuarantee::AtLeastOnce => write!(f, "at-least-once"),
39 DeliveryGuarantee::ExactlyOnce => write!(f, "exactly-once"),
40 }
41 }
42}
43
44impl FromStr for DeliveryGuarantee {
45 type Err = String;
46
47 fn from_str(s: &str) -> Result<Self, Self::Err> {
48 match s.to_lowercase().replace('-', "_").as_str() {
49 "at_least_once" | "atleastonce" => Ok(Self::AtLeastOnce),
50 "exactly_once" | "exactlyonce" => Ok(Self::ExactlyOnce),
51 other => Err(format!("unknown delivery guarantee: '{other}'")),
52 }
53 }
54}
55
56/// SSL connection mode for `PostgreSQL`-compatible connectors.
57///
58/// Shared by the `PostgreSQL` sink and `PostgreSQL` CDC source. Variant names
59/// follow the `libpq` `sslmode` parameter.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub enum PostgresSslMode {
62 /// No SSL.
63 Disable,
64 /// Try SSL, fall back to unencrypted.
65 #[default]
66 Prefer,
67 /// Require SSL.
68 Require,
69 /// Require SSL and verify CA certificate.
70 VerifyCa,
71 /// Require SSL, verify certificate and hostname.
72 VerifyFull,
73}
74
75impl std::fmt::Display for PostgresSslMode {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 Self::Disable => write!(f, "disable"),
79 Self::Prefer => write!(f, "prefer"),
80 Self::Require => write!(f, "require"),
81 Self::VerifyCa => write!(f, "verify-ca"),
82 Self::VerifyFull => write!(f, "verify-full"),
83 }
84 }
85}
86
87impl FromStr for PostgresSslMode {
88 type Err = String;
89
90 fn from_str(s: &str) -> Result<Self, Self::Err> {
91 match s.to_lowercase().replace('-', "_").as_str() {
92 "disable" | "off" => Ok(Self::Disable),
93 "prefer" => Ok(Self::Prefer),
94 "require" => Ok(Self::Require),
95 "verify_ca" | "verifyca" => Ok(Self::VerifyCa),
96 "verify_full" | "verifyfull" => Ok(Self::VerifyFull),
97 other => Err(format!("unknown SSL mode: '{other}'")),
98 }
99 }
100}
101
102/// A batch of records read from a source connector.
103#[derive(Debug, Clone)]
104pub struct SourceBatch {
105 /// Arrow batch carrying the records.
106 pub records: RecordBatch,
107 /// The partition this batch came from, if the source is partitioned.
108 pub partition: Option<PartitionInfo>,
109}
110
111impl SourceBatch {
112 /// Construct without partition metadata.
113 #[must_use]
114 pub fn new(records: RecordBatch) -> Self {
115 Self {
116 records,
117 partition: None,
118 }
119 }
120
121 /// Construct with partition metadata attached.
122 #[must_use]
123 pub fn with_partition(records: RecordBatch, partition: PartitionInfo) -> Self {
124 Self {
125 records,
126 partition: Some(partition),
127 }
128 }
129
130 /// Record count in the batch.
131 #[must_use]
132 pub fn num_rows(&self) -> usize {
133 self.records.num_rows()
134 }
135}
136
137/// Source partition identity + current offset (Kafka partition number,
138/// CDC slot name, etc.).
139#[derive(Debug, Clone, PartialEq, Eq, Hash)]
140pub struct PartitionInfo {
141 /// Partition id — free-form string (Kafka partition number as string,
142 /// CDC slot name, file path, …).
143 pub id: String,
144 /// Current offset — interpretation is connector-specific (Kafka offset
145 /// as string, CDC LSN, etc.).
146 pub offset: String,
147}
148
149impl PartitionInfo {
150 /// Construct from id/offset strings or anything that converts.
151 #[must_use]
152 pub fn new(id: impl Into<String>, offset: impl Into<String>) -> Self {
153 Self {
154 id: id.into(),
155 offset: offset.into(),
156 }
157 }
158}
159
160impl fmt::Display for PartitionInfo {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 write!(f, "{}@{}", self.id, self.offset)
163 }
164}
165
166/// Summary of a successful `write_batch` call.
167#[derive(Debug, Clone)]
168pub struct WriteResult {
169 /// Records accepted by the sink.
170 pub records_written: usize,
171 /// Bytes written to the underlying transport (may be estimated).
172 pub bytes_written: u64,
173}
174
175impl WriteResult {
176 /// Construct with raw counts.
177 #[must_use]
178 pub fn new(records_written: usize, bytes_written: u64) -> Self {
179 Self {
180 records_written,
181 bytes_written,
182 }
183 }
184}
185
186/// Capabilities declared by a sink connector.
187#[derive(Debug, Clone)]
188#[allow(clippy::struct_excessive_bools)]
189pub struct SinkConnectorCapabilities {
190 /// Whether the sink supports exactly-once semantics via epochs.
191 pub exactly_once: bool,
192
193 /// Whether the sink supports idempotent writes.
194 pub idempotent: bool,
195
196 /// Whether the sink supports upsert (insert-or-update) writes.
197 pub upsert: bool,
198
199 /// Whether the sink can handle changelog/retraction records.
200 pub changelog: bool,
201
202 /// Whether the sink supports two-phase commit (pre-commit + commit).
203 pub two_phase_commit: bool,
204
205 /// Whether the sink supports schema evolution.
206 pub schema_evolution: bool,
207
208 /// Whether the sink supports partitioned writes.
209 pub partitioned: bool,
210
211 /// Whether output pending in an abandoned epoch survives into the
212 /// next epoch's commit WITHOUT a connector rollback (e.g. a Kafka
213 /// transactional producer's single open transaction). When set, a
214 /// live coordination failure skips the connector rollback so the
215 /// pending rows are not discarded — sources only rewind on
216 /// restart, never on a live abort. When unset, live failures roll
217 /// the connector back.
218 pub preserves_pending_on_abandon: bool,
219
220 /// Default per-call `write_batch` I/O timeout. Users can override via
221 /// the `sink.write.timeout.ms` connector property.
222 pub suggested_write_timeout: std::time::Duration,
223}
224
225impl SinkConnectorCapabilities {
226 /// All booleans default to `false`; flip via `with_*` below or by
227 /// assigning the fields directly (they're `pub`).
228 #[must_use]
229 pub fn new(suggested_write_timeout: std::time::Duration) -> Self {
230 Self {
231 exactly_once: false,
232 idempotent: false,
233 upsert: false,
234 changelog: false,
235 two_phase_commit: false,
236 schema_evolution: false,
237 partitioned: false,
238 preserves_pending_on_abandon: false,
239 suggested_write_timeout,
240 }
241 }
242
243 /// Declare that pending output survives an abandoned epoch into the
244 /// next commit without a rollback (see the field docs).
245 #[must_use]
246 pub fn with_preserves_pending_on_abandon(mut self) -> Self {
247 self.preserves_pending_on_abandon = true;
248 self
249 }
250
251 /// Enable exactly-once semantics (requires epoch + 2PC impl).
252 #[must_use]
253 pub fn with_exactly_once(mut self) -> Self {
254 self.exactly_once = true;
255 self
256 }
257 /// Enable idempotent writes (safe re-delivery on retry).
258 #[must_use]
259 pub fn with_idempotent(mut self) -> Self {
260 self.idempotent = true;
261 self
262 }
263 /// Enable upsert (key-based insert-or-update).
264 #[must_use]
265 pub fn with_upsert(mut self) -> Self {
266 self.upsert = true;
267 self
268 }
269 /// Enable changelog/retraction records.
270 #[must_use]
271 pub fn with_changelog(mut self) -> Self {
272 self.changelog = true;
273 self
274 }
275 /// Enable two-phase commit (`pre_commit` + `commit_epoch`).
276 #[must_use]
277 pub fn with_two_phase_commit(mut self) -> Self {
278 self.two_phase_commit = true;
279 self
280 }
281 /// Enable additive schema evolution.
282 #[must_use]
283 pub fn with_schema_evolution(mut self) -> Self {
284 self.schema_evolution = true;
285 self
286 }
287 /// Enable partitioned writes.
288 #[must_use]
289 pub fn with_partitioned(mut self) -> Self {
290 self.partitioned = true;
291 self
292 }
293}
294
295/// Trait for source connectors that read data from external systems.
296///
297/// Source connectors operate in Ring 1 and push data into Ring 0 via
298/// the streaming `Source<ArrowRecord>::push_arrow()` API.
299///
300/// # Lifecycle
301///
302/// 1. `open()` — initialize connection, discover schema
303/// 2. `poll_batch()` — read batches in a loop
304/// 3. `checkpoint()` / `restore()` — manage offsets
305/// 4. `close()` — clean shutdown
306#[async_trait]
307pub trait SourceConnector: Send {
308 /// Called once before polling begins.
309 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
310
311 /// `Ok(None)` = no data currently available; runtime retries after a delay.
312 /// `max_records` is a hint — implementations may return fewer.
313 async fn poll_batch(
314 &mut self,
315 max_records: usize,
316 ) -> Result<Option<SourceBatch>, ConnectorError>;
317
318 /// Resolve the source schema from the `WITH (...)` properties before
319 /// DDL reaches the planner. Implementations that hit the network
320 /// (e.g. Kafka fetching an Avro schema from a Schema Registry) must
321 /// bound their I/O with a timeout. Return `Err(ConnectorError::…)` on
322 /// failure so the runtime can surface the cause to DDL — do not log
323 /// and swallow.
324 async fn discover_schema(
325 &mut self,
326 _properties: &std::collections::HashMap<String, String>,
327 ) -> Result<(), ConnectorError> {
328 Ok(())
329 }
330
331 /// Arrow schema of records this source produces.
332 fn schema(&self) -> SchemaRef;
333
334 /// Returned checkpoint must contain enough info to resume from the
335 /// current position after a restart.
336 fn checkpoint(&self) -> SourceCheckpoint;
337
338 /// Called during recovery before polling resumes.
339 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError>;
340
341 /// Install the cluster vnode assignment so a partitioned source can bind
342 /// its input partitions to vnodes (`partition % vnode_count`) and consume
343 /// only those it owns, re-binding when the assignment rotates. Called by
344 /// the cluster startup wiring before [`open`](Self::open).
345 ///
346 /// Default: no-op — single-node deployments and sources without a natural
347 /// partitioning ignore it. Only the Kafka source overrides it today.
348 fn set_vnode_assignment(
349 &mut self,
350 _registry: Arc<laminar_core::state::VnodeRegistry>,
351 _self_id: laminar_core::state::NodeId,
352 ) {
353 }
354
355 /// Close the connection and release resources.
356 async fn close(&mut self) -> Result<(), ConnectorError>;
357
358 /// Returns a [`Notify`] handle that is signalled when new data is available.
359 ///
360 /// When `Some`, the pipeline coordinator awaits the notification instead of
361 /// polling on a timer, eliminating idle CPU usage. Sources that receive data
362 /// asynchronously (WebSocket, CDC replication streams, Kafka) should return
363 /// `Some` and call `notify.notify_one()` when data arrives.
364 ///
365 /// The default implementation returns `None`, which causes the pipeline to
366 /// fall back to timer-based polling (suitable for batch/file sources).
367 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
368 None
369 }
370
371 /// Returns this connector as a [`SchemaProvider`](crate::schema::SchemaProvider), if supported.
372 fn as_schema_provider(&self) -> Option<&dyn crate::schema::SchemaProvider> {
373 None
374 }
375
376 /// Returns this connector as a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware), if supported.
377 fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
378 None
379 }
380
381 /// Whether this source supports replay from a checkpointed position.
382 ///
383 /// Sources that return `false` (e.g., WebSocket, raw TCP) cannot seek
384 /// back to a previous offset on recovery. Checkpointing still captures
385 /// their state for best-effort recovery, but exactly-once semantics
386 /// are degraded to at-most-once for events from this source.
387 ///
388 /// The default implementation returns `true` because most durable
389 /// sources (Kafka, CDC, files) support replay.
390 fn supports_replay(&self) -> bool {
391 true
392 }
393
394 /// Returns a shared flag that the source sets to `true` when it
395 /// requests an immediate checkpoint.
396 ///
397 /// This is used by sources that detect external state changes requiring
398 /// a checkpoint before proceeding — for example, Kafka consumer group
399 /// rebalance (partition revocation). The pipeline coordinator polls
400 /// this flag each cycle and clears it after triggering the checkpoint.
401 ///
402 /// The default returns `None` (no source-initiated checkpoints).
403 fn checkpoint_requested(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
404 None
405 }
406
407 /// Acknowledge that `epoch` has been durably committed.
408 ///
409 /// Called after the manifest is persisted and every exactly-once sink
410 /// committed the epoch. The `checkpoint` argument is the exact
411 /// per-source `SourceCheckpoint` that was persisted into the manifest
412 /// for this epoch — sources can rely on it to advance external offset
413 /// state (broker group offsets, lookup-DB cursors, ack tokens) using
414 /// values that match what's durable.
415 ///
416 /// May be called with an empty `checkpoint` for timer-driven commits
417 /// where no per-source state was captured; implementations should
418 /// treat that as a no-op for any externally-visible advancement.
419 ///
420 /// Idempotent — a retry after cancellation is legal.
421 ///
422 /// # Errors
423 ///
424 /// Errors are logged; they do not roll back the committed epoch.
425 async fn notify_epoch_committed(
426 &mut self,
427 _epoch: u64,
428 _checkpoint: &SourceCheckpoint,
429 ) -> Result<(), ConnectorError> {
430 Ok(())
431 }
432}
433
434/// Trait for sink connectors that write data to external systems.
435///
436/// Sink connectors operate in Ring 1, receiving data from Ring 0 and
437/// writing to external systems. Implementations that advertise
438/// `exactly_once` also implement `begin_epoch`/`pre_commit`/`commit_epoch`/
439/// `rollback_epoch`; the runtime drives them via the checkpoint coordinator.
440///
441/// Lifecycle: `open()` → loop over epochs of `begin_epoch()`,
442/// `write_batch()*`, `pre_commit()`, `commit_epoch()` (or `rollback_epoch()`
443/// on failure) → `close()`.
444#[async_trait]
445pub trait SinkConnector: Send {
446 /// Open the connection and prepare to accept writes.
447 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError>;
448
449 /// Must be cancellation-safe: the runtime wraps this in
450 /// `tokio::time::timeout`. Don't split a `&mut self` mutation
451 /// across an `.await`. In-flight transactional state may remain open
452 /// after cancellation; the caller will `rollback_epoch` it.
453 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError>;
454
455 /// Expected Arrow schema of input batches.
456 fn schema(&self) -> SchemaRef;
457
458 /// Default: no-op (at-least-once semantics).
459 async fn begin_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
460 Ok(())
461 }
462
463 /// Phase 1 of 2PC: flush + prepare, do NOT finalize the txn. The
464 /// runtime persists the manifest between `pre_commit` and
465 /// `commit_epoch`; on failure it calls `rollback_epoch`. Default
466 /// delegates to `flush()`.
467 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
468 self.flush().await
469 }
470
471 /// Phase 2 of 2PC: finalize the txn. Called after the manifest is
472 /// durable. Default: no-op (at-least-once semantics).
473 async fn commit_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
474 Ok(())
475 }
476
477 /// Must be idempotent: the runtime calls this on every exactly-once
478 /// sink after a `pre_commit` failure, including sinks that never
479 /// `pre_commit`ed.
480 async fn rollback_epoch(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
481 Ok(())
482 }
483
484 /// Required (no default) so every implementation declares
485 /// `suggested_write_timeout`.
486 fn capabilities(&self) -> SinkConnectorCapabilities;
487
488 /// Must be internally bounded — the sink task's periodic timer
489 /// calls this on every tick. Thorough drains belong in `pre_commit`
490 /// / `commit_epoch` / `close`, not here.
491 async fn flush(&mut self) -> Result<(), ConnectorError> {
492 Ok(())
493 }
494
495 /// Close the sink and release resources.
496 async fn close(&mut self) -> Result<(), ConnectorError>;
497
498 /// Return a [`SchemaRegistryAware`](crate::schema::SchemaRegistryAware)
499 /// view, if the sink speaks a schema registry protocol.
500 fn as_schema_registry_aware(&self) -> Option<&dyn crate::schema::SchemaRegistryAware> {
501 None
502 }
503}
504
505#[cfg(test)]
506#[allow(clippy::cast_possible_wrap)]
507mod tests {
508 use super::*;
509 use arrow_array::Int64Array;
510 use arrow_schema::{DataType, Field, Schema};
511 use std::sync::Arc;
512
513 fn test_schema() -> SchemaRef {
514 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
515 }
516
517 fn test_batch(n: usize) -> RecordBatch {
518 #[allow(clippy::cast_possible_wrap)]
519 let ids: Vec<i64> = (0..n as i64).collect();
520 RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
521 }
522
523 #[test]
524 fn test_source_batch() {
525 let batch = SourceBatch::new(test_batch(10));
526 assert_eq!(batch.num_rows(), 10);
527 assert!(batch.partition.is_none());
528 }
529
530 #[test]
531 fn test_source_batch_with_partition() {
532 let partition = PartitionInfo::new("0", "1234");
533 let batch = SourceBatch::with_partition(test_batch(5), partition);
534 assert_eq!(batch.num_rows(), 5);
535 assert_eq!(batch.partition.as_ref().unwrap().id, "0");
536 assert_eq!(batch.partition.as_ref().unwrap().offset, "1234");
537 }
538
539 #[test]
540 fn test_partition_info_display() {
541 let p = PartitionInfo::new("3", "42");
542 assert_eq!(p.to_string(), "3@42");
543 }
544
545 #[test]
546 fn test_write_result() {
547 let result = WriteResult::new(100, 5000);
548 assert_eq!(result.records_written, 100);
549 assert_eq!(result.bytes_written, 5000);
550 }
551
552 #[test]
553 fn test_sink_capabilities_builder() {
554 let caps = SinkConnectorCapabilities::new(std::time::Duration::from_secs(5))
555 .with_exactly_once()
556 .with_changelog()
557 .with_partitioned();
558
559 assert!(caps.exactly_once);
560 assert!(!caps.idempotent);
561 assert!(!caps.upsert);
562 assert!(caps.changelog);
563 assert!(!caps.schema_evolution);
564 assert!(caps.partitioned);
565 assert_eq!(
566 caps.suggested_write_timeout,
567 std::time::Duration::from_secs(5)
568 );
569 }
570}