laminar_connectors/lakehouse/
iceberg.rs1use std::time::Duration;
13
14use arrow_array::RecordBatch;
15use arrow_schema::SchemaRef;
16use async_trait::async_trait;
17#[cfg(feature = "iceberg")]
18use tracing::info;
19use tracing::{debug, warn};
20
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
23use crate::error::ConnectorError;
24use crate::health::HealthStatus;
25use crate::metrics::ConnectorMetrics;
26
27use super::iceberg_config::IcebergSinkConfig;
28
29pub struct IcebergSink {
35 config: IcebergSinkConfig,
37 schema: Option<SchemaRef>,
39 state: ConnectorState,
41 current_epoch: u64,
43 last_committed_epoch: u64,
45 buffer: Vec<RecordBatch>,
47 buffered_rows: usize,
49 staged_batches: Vec<RecordBatch>,
51 staged_rows: usize,
53 epoch_skipped: bool,
55 #[cfg(feature = "iceberg")]
57 catalog: Option<std::sync::Arc<dyn iceberg::Catalog>>,
58 #[cfg(feature = "iceberg")]
60 table: Option<iceberg::table::Table>,
61 #[cfg(feature = "iceberg")]
64 iceberg_arrow_schema: Option<SchemaRef>,
65}
66
67impl IcebergSink {
68 #[must_use]
70 pub fn new(config: IcebergSinkConfig, _registry: Option<&prometheus::Registry>) -> Self {
71 Self {
72 config,
73 schema: None,
74 state: ConnectorState::Created,
75 current_epoch: 0,
76 last_committed_epoch: 0,
77 buffer: Vec::new(),
78 buffered_rows: 0,
79 staged_batches: Vec::new(),
80 staged_rows: 0,
81 epoch_skipped: false,
82 #[cfg(feature = "iceberg")]
83 catalog: None,
84 #[cfg(feature = "iceberg")]
85 table: None,
86 #[cfg(feature = "iceberg")]
87 iceberg_arrow_schema: None,
88 }
89 }
90
91 fn clear_buffer(&mut self) {
92 self.buffer.clear();
93 self.buffered_rows = 0;
94 }
95
96 fn clear_staged(&mut self) {
97 self.staged_batches.clear();
98 self.staged_rows = 0;
99 }
100
101 #[cfg(feature = "iceberg")]
111 fn align_batch_to_iceberg_schema(
112 &self,
113 batch: &RecordBatch,
114 ) -> Result<RecordBatch, ConnectorError> {
115 let target_schema =
116 self.iceberg_arrow_schema
117 .as_ref()
118 .ok_or_else(|| ConnectorError::InvalidState {
119 expected: "open".into(),
120 actual: "iceberg arrow schema not initialized".into(),
121 })?;
122
123 let batch_schema = batch.schema();
126 if batch_schema.fields().len() == target_schema.fields().len()
127 && batch_schema
128 .fields()
129 .iter()
130 .zip(target_schema.fields().iter())
131 .all(|(a, b)| a.name() == b.name() && a.data_type() == b.data_type())
132 {
133 return RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(
134 |e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")),
135 );
136 }
137
138 let mut columns = Vec::with_capacity(target_schema.fields().len());
140
141 for field in target_schema.fields() {
142 if let Ok(col_idx) = batch_schema.index_of(field.name()) {
143 let col = batch.column(col_idx);
144 if col.data_type() == field.data_type() {
145 columns.push(col.clone());
146 } else {
147 columns.push(arrow_cast::cast(col, field.data_type()).map_err(|e| {
148 ConnectorError::WriteError(format!(
149 "cast field '{}' from {} to {}: {e}",
150 field.name(),
151 col.data_type(),
152 field.data_type(),
153 ))
154 })?);
155 }
156 } else if field.is_nullable() {
157 columns.push(arrow_array::new_null_array(
159 field.data_type(),
160 batch.num_rows(),
161 ));
162 } else {
163 return Err(ConnectorError::SchemaMismatch(format!(
164 "Iceberg column '{}' is NOT NULL but missing from pipeline",
165 field.name(),
166 )));
167 }
168 }
169
170 for field in batch_schema.fields() {
173 if target_schema.field_with_name(field.name()).is_err() {
174 return Err(ConnectorError::SchemaMismatch(format!(
175 "pipeline column '{}' has no matching field in Iceberg table schema \
176 (schema evolved since open?)",
177 field.name(),
178 )));
179 }
180 }
181
182 RecordBatch::try_new(target_schema.clone(), columns)
183 .map_err(|e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")))
184 }
185
186 #[cfg(feature = "iceberg")]
188 fn parquet_compression(name: &str) -> parquet::basic::Compression {
189 match name.to_lowercase().as_str() {
190 "snappy" => parquet::basic::Compression::SNAPPY,
191 "none" | "uncompressed" => parquet::basic::Compression::UNCOMPRESSED,
192 "lz4" => parquet::basic::Compression::LZ4,
193 _ => parquet::basic::Compression::ZSTD(
195 parquet::basic::ZstdLevel::try_new(3).unwrap_or_default(),
196 ),
197 }
198 }
199
200 #[cfg(feature = "iceberg")]
203 fn validate_schema_not_drifted(&self) -> Result<(), ConnectorError> {
204 if let (Some(pipeline_schema), Some(target_schema)) =
205 (&self.schema, &self.iceberg_arrow_schema)
206 {
207 for field in pipeline_schema.fields() {
208 if target_schema.field_with_name(field.name()).is_err() {
209 return Err(ConnectorError::SchemaMismatch(format!(
210 "pipeline field '{}' no longer exists in Iceberg table schema \
211 (concurrent schema evolution?)",
212 field.name(),
213 )));
214 }
215 }
216 }
217 Ok(())
218 }
219
220 #[cfg(feature = "iceberg")]
222 async fn commit_to_iceberg(&mut self) -> Result<(), ConnectorError> {
223 use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder};
224
225 let catalog = self
226 .catalog
227 .as_ref()
228 .ok_or_else(|| ConnectorError::InvalidState {
229 expected: "open".into(),
230 actual: "catalog not initialized".into(),
231 })?;
232 let table = self
233 .table
234 .as_ref()
235 .ok_or_else(|| ConnectorError::InvalidState {
236 expected: "open".into(),
237 actual: "table not loaded".into(),
238 })?;
239
240 let file_io = table.file_io().clone();
241 let location = table.metadata().location().to_string();
242 let schema = table.current_schema_ref();
243
244 self.validate_schema_not_drifted()?;
245
246 let props = parquet::file::properties::WriterProperties::builder()
247 .set_compression(Self::parquet_compression(&self.config.compression))
248 .build();
249 let writer_builder = ParquetWriterBuilder::new(props, schema);
250
251 let mut all_data_files = Vec::new();
252
253 for (idx, batch) in self.staged_batches.iter().enumerate() {
254 if batch.num_rows() == 0 {
255 continue;
256 }
257
258 let aligned = self.align_batch_to_iceberg_schema(batch)?;
263
264 let file_path = format!(
265 "{location}/data/{}-{}-{idx}.parquet",
266 self.config.writer_id, self.current_epoch,
267 );
268
269 let output_file = file_io
270 .new_output(&file_path)
271 .map_err(|e| ConnectorError::WriteError(format!("create output: {e}")))?;
272
273 let mut writer = writer_builder
274 .clone()
275 .build(output_file)
276 .await
277 .map_err(|e| ConnectorError::WriteError(format!("build parquet writer: {e}")))?;
278
279 writer
280 .write(&aligned)
281 .await
282 .map_err(|e| ConnectorError::WriteError(format!("parquet write: {e}")))?;
283
284 let data_file_builders = writer
285 .close()
286 .await
287 .map_err(|e| ConnectorError::WriteError(format!("close parquet writer: {e}")))?;
288
289 for dfb in data_file_builders {
290 let data_file = dfb
291 .build()
292 .map_err(|e| ConnectorError::WriteError(format!("data file build: {e}")))?;
293 all_data_files.push(data_file);
294 }
295 }
296
297 if all_data_files.is_empty() {
298 debug!(epoch = self.current_epoch, "no data files to commit");
299 return Ok(());
300 }
301
302 let updated_table = super::iceberg_io::commit_data_files(
303 table,
304 catalog.as_ref(),
305 all_data_files,
306 Some((&self.config.writer_id, self.current_epoch)),
307 )
308 .await?;
309
310 self.table = Some(updated_table);
313
314 let table = self.table.as_ref().expect("just set above");
320 let new_iceberg_schema = table.current_schema_ref();
321 match iceberg::arrow::schema_to_arrow_schema(&new_iceberg_schema) {
322 Ok(arrow_schema) => {
323 self.iceberg_arrow_schema = Some(std::sync::Arc::new(arrow_schema));
324 }
325 Err(e) => {
326 self.iceberg_arrow_schema = None;
330 warn!(
331 epoch = self.current_epoch,
332 error = %e,
333 "failed to refresh Iceberg Arrow schema; cache invalidated"
334 );
335 }
336 }
337
338 info!(
339 epoch = self.current_epoch,
340 rows = self.staged_rows,
341 "iceberg commit succeeded"
342 );
343
344 Ok(())
345 }
346}
347
348#[async_trait]
349impl SinkConnector for IcebergSink {
350 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
351 if !config.properties().is_empty() {
353 self.config = IcebergSinkConfig::from_config(config)?;
354 }
355
356 #[cfg(feature = "iceberg")]
357 {
358 let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
359 let ns = &self.config.catalog.namespace;
360 let tbl = &self.config.catalog.table_name;
361
362 if self.config.auto_create {
363 if let Some(schema) = config.arrow_schema() {
364 super::iceberg_io::ensure_table_exists(catalog.as_ref(), ns, tbl, &schema)
365 .await?;
366 }
367 }
368
369 let table = super::iceberg_io::load_table(catalog.as_ref(), ns, tbl).await?;
370
371 let iceberg_schema = table.current_schema_ref();
373 let table_schema = std::sync::Arc::new(
374 iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
375 ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
376 })?,
377 );
378
379 self.iceberg_arrow_schema = Some(table_schema.clone());
382
383 if self.schema.is_none() {
384 self.schema = Some(table_schema.clone());
385 }
386
387 if let Some(epoch) =
389 super::iceberg_io::get_last_committed_epoch(&table, &self.config.writer_id)
390 {
391 self.last_committed_epoch = epoch;
392 info!(writer_id = %self.config.writer_id, epoch, "recovered last committed epoch");
393 }
394
395 if let Some(pipeline_schema) = config.arrow_schema() {
398 super::iceberg_config::validate_sink_schema(&pipeline_schema, &table_schema)?;
399 self.schema = Some(pipeline_schema);
400 }
401
402 self.catalog = Some(catalog);
403 self.table = Some(table);
404 self.state = ConnectorState::Running;
405
406 info!(table = tbl, namespace = ns, "iceberg sink connected");
407 return Ok(());
408 }
409
410 #[cfg(not(feature = "iceberg"))]
411 {
412 self.state = ConnectorState::Failed;
413 Err(ConnectorError::ConfigurationError(
414 "Apache Iceberg requires the 'iceberg' feature".into(),
415 ))
416 }
417 }
418
419 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
420 if batch.num_rows() == 0 || self.epoch_skipped {
421 return Ok(WriteResult::new(0, 0));
422 }
423
424 if self.schema.is_none() {
425 self.schema = Some(batch.schema());
426 }
427
428 let rows = batch.num_rows();
429 self.buffer.push(batch.clone());
430 self.buffered_rows += rows;
431
432 Ok(WriteResult::new(rows, 0))
433 }
434
435 fn schema(&self) -> SchemaRef {
436 self.schema
437 .clone()
438 .unwrap_or_else(|| std::sync::Arc::new(arrow_schema::Schema::empty()))
439 }
440
441 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
442 self.current_epoch = epoch;
443 self.epoch_skipped = false;
444 self.clear_buffer();
445 self.clear_staged();
446
447 if epoch > 0 && epoch <= self.last_committed_epoch {
448 debug!(
449 epoch,
450 last = self.last_committed_epoch,
451 "epoch already committed, skipping"
452 );
453 self.epoch_skipped = true;
454 }
455
456 Ok(())
457 }
458
459 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
460 if self.epoch_skipped {
461 return Ok(());
462 }
463
464 std::mem::swap(&mut self.staged_batches, &mut self.buffer);
465 self.staged_rows = self.buffered_rows;
466 self.clear_buffer();
467
468 Ok(())
469 }
470
471 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
472 if self.epoch_skipped || self.staged_rows == 0 {
473 self.clear_staged();
474 return Ok(());
475 }
476
477 #[cfg(feature = "iceberg")]
478 {
479 self.commit_to_iceberg().await?;
480 }
481
482 self.last_committed_epoch = epoch;
483 self.clear_staged();
484 Ok(())
485 }
486
487 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
488 warn!(epoch, "iceberg rollback: discarding staged data");
489 self.clear_buffer();
490 self.clear_staged();
491 self.epoch_skipped = false;
492 Ok(())
493 }
494
495 fn health_check(&self) -> HealthStatus {
496 match self.state {
497 ConnectorState::Running => HealthStatus::Healthy,
498 ConnectorState::Failed => HealthStatus::Unhealthy("sink failed".into()),
499 _ => HealthStatus::Unknown,
500 }
501 }
502
503 fn metrics(&self) -> ConnectorMetrics {
504 ConnectorMetrics::default()
505 }
506
507 fn capabilities(&self) -> SinkConnectorCapabilities {
508 SinkConnectorCapabilities::new(Duration::from_secs(300))
510 .with_exactly_once()
511 .with_two_phase_commit()
512 }
513
514 async fn close(&mut self) -> Result<(), ConnectorError> {
515 #[cfg(feature = "iceberg")]
516 {
517 self.catalog = None;
518 self.table = None;
519 self.iceberg_arrow_schema = None;
520 }
521 self.state = ConnectorState::Closed;
522 Ok(())
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use arrow_array::Int64Array;
530 use arrow_schema::{DataType, Field, Schema};
531 use std::sync::Arc;
532
533 fn test_schema() -> SchemaRef {
534 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
535 }
536
537 fn test_batch(n: usize) -> RecordBatch {
538 let ids: Vec<i64> = (0..n as i64).collect();
539 RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
540 }
541
542 fn test_config() -> IcebergSinkConfig {
543 let mut config = ConnectorConfig::new("iceberg");
544 config.set("catalog.uri", "http://localhost:8181");
545 config.set("warehouse", "s3://test/wh");
546 config.set("namespace", "test");
547 config.set("table.name", "events");
548 IcebergSinkConfig::from_config(&config).unwrap()
549 }
550
551 #[test]
552 fn test_new_sink() {
553 let sink = IcebergSink::new(test_config(), None);
554 assert!(sink.schema.is_none());
555 assert_eq!(sink.current_epoch, 0);
556 assert_eq!(sink.buffered_rows, 0);
557 }
558
559 #[tokio::test]
560 async fn test_write_buffers_batches() {
561 let mut sink = IcebergSink::new(test_config(), None);
562 sink.begin_epoch(1).await.unwrap();
563
564 let result = sink.write_batch(&test_batch(100)).await.unwrap();
565 assert_eq!(result.records_written, 100);
566 assert_eq!(sink.buffered_rows, 100);
567 assert_eq!(sink.buffer.len(), 1);
568
569 let result = sink.write_batch(&test_batch(50)).await.unwrap();
570 assert_eq!(result.records_written, 50);
571 assert_eq!(sink.buffered_rows, 150);
572 assert_eq!(sink.buffer.len(), 2);
573 }
574
575 #[tokio::test]
576 async fn test_pre_commit_stages_buffer() {
577 let mut sink = IcebergSink::new(test_config(), None);
578 sink.begin_epoch(1).await.unwrap();
579 sink.write_batch(&test_batch(100)).await.unwrap();
580
581 sink.pre_commit(1).await.unwrap();
582 assert_eq!(sink.staged_rows, 100);
583 assert_eq!(sink.staged_batches.len(), 1);
584 assert!(sink.buffer.is_empty());
585 assert_eq!(sink.buffered_rows, 0);
586 }
587
588 #[tokio::test]
589 async fn test_rollback_clears_staged() {
590 let mut sink = IcebergSink::new(test_config(), None);
591 sink.begin_epoch(1).await.unwrap();
592 sink.write_batch(&test_batch(100)).await.unwrap();
593 sink.pre_commit(1).await.unwrap();
594
595 sink.rollback_epoch(1).await.unwrap();
596 assert!(sink.staged_batches.is_empty());
597 assert_eq!(sink.staged_rows, 0);
598 assert!(sink.buffer.is_empty());
599 }
600
601 #[tokio::test]
602 async fn test_epoch_skip_when_already_committed() {
603 let mut sink = IcebergSink::new(test_config(), None);
604 sink.last_committed_epoch = 5;
605
606 sink.begin_epoch(3).await.unwrap();
607 assert!(sink.epoch_skipped);
608
609 let result = sink.write_batch(&test_batch(100)).await.unwrap();
610 assert_eq!(result.records_written, 0);
611 }
612
613 #[tokio::test]
614 async fn test_empty_epoch_commit() {
615 let mut sink = IcebergSink::new(test_config(), None);
616 sink.begin_epoch(1).await.unwrap();
617 sink.pre_commit(1).await.unwrap();
618 sink.commit_epoch(1).await.unwrap();
619 }
620
621 #[test]
622 fn test_capabilities() {
623 let sink = IcebergSink::new(test_config(), None);
624 let caps = sink.capabilities();
625 assert!(caps.exactly_once);
626 assert!(caps.two_phase_commit);
627 assert!(!caps.partitioned);
628 assert!(!caps.upsert);
629 }
630}