laminar_connectors/lakehouse/
delta_source.rs1use std::collections::VecDeque;
23use std::sync::Arc;
24
25use arrow_array::RecordBatch;
26use arrow_schema::SchemaRef;
27use async_trait::async_trait;
28#[cfg(feature = "delta-lake")]
29use std::time::Instant;
30#[cfg(feature = "delta-lake")]
31use tracing::debug;
32use tracing::info;
33
34#[cfg(feature = "delta-lake")]
35use deltalake::DeltaTable;
36
37use crate::checkpoint::SourceCheckpoint;
38use crate::config::{ConnectorConfig, ConnectorState};
39use crate::connector::{SourceBatch, SourceConnector};
40use crate::error::ConnectorError;
41use crate::health::HealthStatus;
42use crate::metrics::ConnectorMetrics;
43
44use super::delta_source_config::DeltaSourceConfig;
45
46pub struct DeltaSource {
59 config: DeltaSourceConfig,
61 state: ConnectorState,
63 schema: Option<SchemaRef>,
65 current_version: i64,
68 #[cfg(feature = "delta-lake")]
72 inflight_version: Option<i64>,
73 pending_batches: VecDeque<RecordBatch>,
75 records_read: u64,
77 #[cfg(feature = "delta-lake")]
79 table: Option<DeltaTable>,
80 #[cfg(feature = "delta-lake")]
84 last_version_check: Option<Instant>,
85}
86
87impl DeltaSource {
88 #[must_use]
90 pub fn new(config: DeltaSourceConfig) -> Self {
91 Self {
92 config,
93 state: ConnectorState::Created,
94 schema: None,
95 current_version: -1,
96 #[cfg(feature = "delta-lake")]
97 inflight_version: None,
98 pending_batches: VecDeque::new(),
99 records_read: 0,
100 #[cfg(feature = "delta-lake")]
101 table: None,
102 #[cfg(feature = "delta-lake")]
103 last_version_check: None,
104 }
105 }
106
107 #[must_use]
109 pub fn state(&self) -> ConnectorState {
110 self.state
111 }
112
113 #[must_use]
115 pub fn current_version(&self) -> i64 {
116 self.current_version
117 }
118
119 #[must_use]
121 pub fn config(&self) -> &DeltaSourceConfig {
122 &self.config
123 }
124}
125
126#[async_trait]
127impl SourceConnector for DeltaSource {
128 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
129 self.state = ConnectorState::Initializing;
130
131 if !config.properties().is_empty() {
133 self.config = DeltaSourceConfig::from_config(config)?;
134 }
135
136 info!(
137 table_path = %self.config.table_path,
138 starting_version = ?self.config.starting_version,
139 "opening Delta Lake source connector"
140 );
141
142 #[cfg(feature = "delta-lake")]
143 {
144 use super::delta_io;
145
146 let table = delta_io::open_or_create_table(
148 &self.config.table_path,
149 self.config.storage_options.clone(),
150 None,
151 )
152 .await?;
153
154 if let Ok(schema) = delta_io::get_table_schema(&table) {
156 self.schema = Some(schema);
157 }
158
159 let table_version = table.version().unwrap_or(0);
161 #[allow(clippy::cast_possible_wrap)]
162 if let Some(start) = self.config.starting_version {
163 self.current_version = start;
164 } else {
165 self.current_version = table_version;
166 }
167
168 info!(
169 table_path = %self.config.table_path,
170 table_version,
171 current_version = self.current_version,
172 "Delta Lake source: resolved starting version"
173 );
174
175 self.table = Some(table);
176 }
177
178 #[cfg(not(feature = "delta-lake"))]
179 {
180 if let Some(start) = self.config.starting_version {
183 self.current_version = start;
184 }
185 }
186
187 self.state = ConnectorState::Running;
188 info!("Delta Lake source connector opened successfully");
189 Ok(())
190 }
191
192 #[allow(unused_variables)]
193 async fn poll_batch(
194 &mut self,
195 max_records: usize,
196 ) -> Result<Option<SourceBatch>, ConnectorError> {
197 if self.state != ConnectorState::Running {
198 return Err(ConnectorError::InvalidState {
199 expected: "Running".into(),
200 actual: self.state.to_string(),
201 });
202 }
203
204 if let Some(batch) = self.pending_batches.pop_front() {
208 self.records_read += batch.num_rows() as u64;
209
210 #[cfg(feature = "delta-lake")]
211 if self.pending_batches.is_empty() {
212 if let Some(v) = self.inflight_version.take() {
213 self.current_version = v;
214 }
215 }
216
217 return Ok(Some(SourceBatch::new(batch)));
218 }
219
220 #[cfg(feature = "delta-lake")]
222 {
223 use super::delta_io;
224
225 if let Some(last_check) = self.last_version_check {
229 if last_check.elapsed() < self.config.poll_interval {
230 return Ok(None);
231 }
232 }
233 self.last_version_check = Some(Instant::now());
234
235 let table = self
236 .table
237 .as_mut()
238 .ok_or_else(|| ConnectorError::InvalidState {
239 expected: "table initialized".into(),
240 actual: "table not initialized".into(),
241 })?;
242
243 let latest_version = delta_io::get_latest_version(table).await?;
244
245 if latest_version <= self.current_version {
246 return Ok(None); }
248
249 debug!(
250 current_version = self.current_version,
251 latest_version, "Delta Lake source: new version(s) available"
252 );
253
254 let batches =
259 delta_io::read_batches_at_version(table, latest_version, max_records).await?;
260
261 if self.schema.is_none() {
263 if let Some(first) = batches.first() {
264 self.schema = Some(first.schema());
265 }
266 }
267
268 for batch in batches {
272 if batch.num_rows() > 0 {
273 self.pending_batches.push_back(batch);
274 }
275 }
276
277 if self.pending_batches.is_empty() {
278 self.current_version = latest_version;
281 } else {
282 self.inflight_version = Some(latest_version);
283 }
284
285 if let Some(batch) = self.pending_batches.pop_front() {
286 self.records_read += batch.num_rows() as u64;
287
288 if self.pending_batches.is_empty() {
290 if let Some(v) = self.inflight_version.take() {
291 self.current_version = v;
292 }
293 }
294
295 return Ok(Some(SourceBatch::new(batch)));
296 }
297 }
298
299 Ok(None)
300 }
301
302 fn schema(&self) -> SchemaRef {
303 self.schema
304 .clone()
305 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
306 }
307
308 fn checkpoint(&self) -> SourceCheckpoint {
309 let mut cp = SourceCheckpoint::new(0);
310 cp.set_offset("delta_version", self.current_version.to_string());
311 cp
312 }
313
314 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
315 if let Some(version_str) = checkpoint.get_offset("delta_version") {
316 self.current_version = version_str.parse::<i64>().map_err(|_| {
317 ConnectorError::ConfigurationError(format!(
318 "invalid delta_version in checkpoint: '{version_str}'"
319 ))
320 })?;
321 info!(
322 restored_version = self.current_version,
323 "Delta Lake source: restored from checkpoint"
324 );
325 }
326 Ok(())
327 }
328
329 fn health_check(&self) -> HealthStatus {
330 match self.state {
331 ConnectorState::Running => HealthStatus::Healthy,
332 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
333 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
334 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
335 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
336 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
337 }
338 }
339
340 fn metrics(&self) -> ConnectorMetrics {
341 ConnectorMetrics {
342 records_total: self.records_read,
343 ..ConnectorMetrics::default()
344 }
345 }
346
347 async fn close(&mut self) -> Result<(), ConnectorError> {
348 info!("closing Delta Lake source connector");
349
350 #[cfg(feature = "delta-lake")]
351 {
352 self.table = None;
353 }
354
355 self.pending_batches.clear();
356 self.state = ConnectorState::Closed;
357
358 info!(
359 table_path = %self.config.table_path,
360 current_version = self.current_version,
361 records_read = self.records_read,
362 "Delta Lake source connector closed"
363 );
364
365 Ok(())
366 }
367}
368
369impl std::fmt::Debug for DeltaSource {
370 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371 f.debug_struct("DeltaSource")
372 .field("state", &self.state)
373 .field("table_path", &self.config.table_path)
374 .field("current_version", &self.current_version)
375 .field("pending_batches", &self.pending_batches.len())
376 .field("records_read", &self.records_read)
377 .finish_non_exhaustive()
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use arrow_array::{Float64Array, Int64Array, StringArray};
385 use arrow_schema::{DataType, Field, Schema};
386
387 fn test_config() -> DeltaSourceConfig {
388 DeltaSourceConfig::new("/tmp/delta_source_test")
389 }
390
391 fn test_schema() -> SchemaRef {
392 Arc::new(Schema::new(vec![
393 Field::new("id", DataType::Int64, false),
394 Field::new("name", DataType::Utf8, true),
395 Field::new("value", DataType::Float64, true),
396 ]))
397 }
398
399 #[allow(clippy::cast_precision_loss)]
400 fn test_batch(n: usize) -> RecordBatch {
401 let ids: Vec<i64> = (0..n as i64).collect();
402 let names: Vec<&str> = (0..n).map(|_| "test").collect();
403 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
404
405 RecordBatch::try_new(
406 test_schema(),
407 vec![
408 Arc::new(Int64Array::from(ids)),
409 Arc::new(StringArray::from(names)),
410 Arc::new(Float64Array::from(values)),
411 ],
412 )
413 .unwrap()
414 }
415
416 #[test]
417 fn test_new_defaults() {
418 let source = DeltaSource::new(test_config());
419 assert_eq!(source.state(), ConnectorState::Created);
420 assert_eq!(source.current_version(), -1);
421 assert!(source.schema.is_none());
422 }
423
424 #[test]
425 fn test_checkpoint_roundtrip() {
426 let mut source = DeltaSource::new(test_config());
427 source.current_version = 42;
428
429 let cp = source.checkpoint();
430 assert_eq!(cp.get_offset("delta_version"), Some("42"));
431 }
432
433 #[tokio::test]
434 async fn test_restore_from_checkpoint() {
435 let mut source = DeltaSource::new(test_config());
436 assert_eq!(source.current_version(), -1);
437
438 let mut cp = SourceCheckpoint::new(0);
439 cp.set_offset("delta_version", "10");
440 source.restore(&cp).await.unwrap();
441
442 assert_eq!(source.current_version(), 10);
443 }
444
445 #[test]
446 fn test_health_check() {
447 let mut source = DeltaSource::new(test_config());
448 assert_eq!(source.health_check(), HealthStatus::Unknown);
449
450 source.state = ConnectorState::Running;
451 assert_eq!(source.health_check(), HealthStatus::Healthy);
452
453 source.state = ConnectorState::Closed;
454 assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
455 }
456
457 #[test]
458 fn test_schema_empty_when_none() {
459 let source = DeltaSource::new(test_config());
460 let schema = source.schema();
461 assert_eq!(schema.fields().len(), 0);
462 }
463
464 #[tokio::test]
465 async fn test_poll_not_running() {
466 let mut source = DeltaSource::new(test_config());
467 let result = source.poll_batch(100).await;
469 assert!(result.is_err());
470 }
471
472 #[tokio::test]
473 async fn test_poll_returns_buffered_batches() {
474 let mut source = DeltaSource::new(test_config());
475 source.state = ConnectorState::Running;
476
477 source.pending_batches.push_back(test_batch(5));
479 source.pending_batches.push_back(test_batch(3));
480
481 let batch1 = source.poll_batch(100).await.unwrap();
482 assert!(batch1.is_some());
483 assert_eq!(batch1.unwrap().records.num_rows(), 5);
484
485 let batch2 = source.poll_batch(100).await.unwrap();
486 assert!(batch2.is_some());
487 assert_eq!(batch2.unwrap().records.num_rows(), 3);
488
489 assert_eq!(source.records_read, 8);
490 }
491
492 #[tokio::test]
496 async fn test_poll_batch_returns_buffered_incrementally() {
497 let mut source = DeltaSource::new(test_config());
498 source.state = ConnectorState::Running;
499
500 for _ in 0..10 {
502 source.pending_batches.push_back(test_batch(100));
503 }
504
505 let batch = source.poll_batch(50).await.unwrap();
507 assert!(batch.is_some());
508 assert_eq!(batch.unwrap().records.num_rows(), 100);
509 assert_eq!(source.pending_batches.len(), 9);
511 }
512
513 #[tokio::test]
517 async fn test_version_deferred_until_buffer_drained() {
518 let mut source = DeltaSource::new(test_config());
519 source.state = ConnectorState::Running;
520 source.current_version = 5;
521
522 source.pending_batches.push_back(test_batch(10));
528 source.pending_batches.push_back(test_batch(10));
529 source.pending_batches.push_back(test_batch(10));
530
531 let b1 = source.poll_batch(100).await.unwrap();
534 assert!(b1.is_some());
535 assert_eq!(source.pending_batches.len(), 2);
536
537 let b2 = source.poll_batch(100).await.unwrap();
538 assert!(b2.is_some());
539 assert_eq!(source.pending_batches.len(), 1);
540
541 let b3 = source.poll_batch(100).await.unwrap();
542 assert!(b3.is_some());
543 assert!(source.pending_batches.is_empty());
544 assert_eq!(source.records_read, 30);
545 }
546
547 #[test]
550 fn test_poll_interval_is_stored() {
551 let mut config = test_config();
552 config.poll_interval = std::time::Duration::from_millis(500);
553 let source = DeltaSource::new(config);
554 assert_eq!(
555 source.config().poll_interval,
556 std::time::Duration::from_millis(500)
557 );
558 }
559
560 #[test]
561 fn test_debug_output() {
562 let source = DeltaSource::new(test_config());
563 let debug = format!("{source:?}");
564 assert!(debug.contains("DeltaSource"));
565 assert!(debug.contains("/tmp/delta_source_test"));
566 }
567
568 #[tokio::test]
569 async fn test_close() {
570 let mut source = DeltaSource::new(test_config());
571 source.state = ConnectorState::Running;
572 source.pending_batches.push_back(test_batch(5));
573
574 source.close().await.unwrap();
575 assert_eq!(source.state(), ConnectorState::Closed);
576 assert!(source.pending_batches.is_empty());
577 }
578}