laminar_connectors/lakehouse/
iceberg.rs1use std::time::Duration;
13
14use arrow_array::RecordBatch;
15use arrow_schema::SchemaRef;
16use async_trait::async_trait;
17#[cfg(feature = "iceberg")]
18use tracing::info;
19use tracing::{debug, warn};
20
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
23use crate::error::ConnectorError;
24
25use super::iceberg_config::IcebergSinkConfig;
26
27pub struct IcebergSink {
33 config: IcebergSinkConfig,
35 schema: Option<SchemaRef>,
37 state: ConnectorState,
39 current_epoch: u64,
41 last_committed_epoch: u64,
43 buffer: Vec<RecordBatch>,
45 buffered_rows: usize,
47 staged_batches: Vec<RecordBatch>,
49 staged_rows: usize,
51 epoch_skipped: bool,
53 #[cfg(feature = "iceberg")]
55 catalog: Option<std::sync::Arc<dyn iceberg::Catalog>>,
56 #[cfg(feature = "iceberg")]
58 table: Option<iceberg::table::Table>,
59 #[cfg(feature = "iceberg")]
62 iceberg_arrow_schema: Option<SchemaRef>,
63}
64
65impl IcebergSink {
66 #[must_use]
68 pub fn new(config: IcebergSinkConfig, _registry: Option<&prometheus::Registry>) -> Self {
69 Self {
70 config,
71 schema: None,
72 state: ConnectorState::Created,
73 current_epoch: 0,
74 last_committed_epoch: 0,
75 buffer: Vec::new(),
76 buffered_rows: 0,
77 staged_batches: Vec::new(),
78 staged_rows: 0,
79 epoch_skipped: false,
80 #[cfg(feature = "iceberg")]
81 catalog: None,
82 #[cfg(feature = "iceberg")]
83 table: None,
84 #[cfg(feature = "iceberg")]
85 iceberg_arrow_schema: None,
86 }
87 }
88
89 fn clear_buffer(&mut self) {
90 self.buffer.clear();
91 self.buffered_rows = 0;
92 }
93
94 fn clear_staged(&mut self) {
95 self.staged_batches.clear();
96 self.staged_rows = 0;
97 }
98
99 #[cfg(feature = "iceberg")]
109 fn align_batch_to_iceberg_schema(
110 &self,
111 batch: &RecordBatch,
112 ) -> Result<RecordBatch, ConnectorError> {
113 let target_schema =
114 self.iceberg_arrow_schema
115 .as_ref()
116 .ok_or_else(|| ConnectorError::InvalidState {
117 expected: "open".into(),
118 actual: "iceberg arrow schema not initialized".into(),
119 })?;
120
121 let batch_schema = batch.schema();
124 if batch_schema.fields().len() == target_schema.fields().len()
125 && batch_schema
126 .fields()
127 .iter()
128 .zip(target_schema.fields().iter())
129 .all(|(a, b)| a.name() == b.name() && a.data_type() == b.data_type())
130 {
131 return RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(
132 |e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")),
133 );
134 }
135
136 let mut columns = Vec::with_capacity(target_schema.fields().len());
138
139 for field in target_schema.fields() {
140 if let Ok(col_idx) = batch_schema.index_of(field.name()) {
141 let col = batch.column(col_idx);
142 if col.data_type() == field.data_type() {
143 columns.push(col.clone());
144 } else {
145 columns.push(arrow_cast::cast(col, field.data_type()).map_err(|e| {
146 ConnectorError::WriteError(format!(
147 "cast field '{}' from {} to {}: {e}",
148 field.name(),
149 col.data_type(),
150 field.data_type(),
151 ))
152 })?);
153 }
154 } else if field.is_nullable() {
155 columns.push(arrow_array::new_null_array(
157 field.data_type(),
158 batch.num_rows(),
159 ));
160 } else {
161 return Err(ConnectorError::SchemaMismatch(format!(
162 "Iceberg column '{}' is NOT NULL but missing from pipeline",
163 field.name(),
164 )));
165 }
166 }
167
168 for field in batch_schema.fields() {
171 if target_schema.field_with_name(field.name()).is_err() {
172 return Err(ConnectorError::SchemaMismatch(format!(
173 "pipeline column '{}' has no matching field in Iceberg table schema \
174 (schema evolved since open?)",
175 field.name(),
176 )));
177 }
178 }
179
180 RecordBatch::try_new(target_schema.clone(), columns)
181 .map_err(|e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")))
182 }
183
184 #[cfg(feature = "iceberg")]
186 fn parquet_compression(name: &str) -> parquet::basic::Compression {
187 match name.to_lowercase().as_str() {
188 "snappy" => parquet::basic::Compression::SNAPPY,
189 "none" | "uncompressed" => parquet::basic::Compression::UNCOMPRESSED,
190 "lz4" => parquet::basic::Compression::LZ4,
191 _ => parquet::basic::Compression::ZSTD(
193 parquet::basic::ZstdLevel::try_new(3).unwrap_or_default(),
194 ),
195 }
196 }
197
198 #[cfg(feature = "iceberg")]
201 fn validate_schema_not_drifted(&self) -> Result<(), ConnectorError> {
202 if let (Some(pipeline_schema), Some(target_schema)) =
203 (&self.schema, &self.iceberg_arrow_schema)
204 {
205 for field in pipeline_schema.fields() {
206 if target_schema.field_with_name(field.name()).is_err() {
207 return Err(ConnectorError::SchemaMismatch(format!(
208 "pipeline field '{}' no longer exists in Iceberg table schema \
209 (concurrent schema evolution?)",
210 field.name(),
211 )));
212 }
213 }
214 }
215 Ok(())
216 }
217
218 #[cfg(feature = "iceberg")]
220 async fn commit_to_iceberg(&mut self) -> Result<(), ConnectorError> {
221 use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder};
222
223 let catalog = self
224 .catalog
225 .as_ref()
226 .ok_or_else(|| ConnectorError::InvalidState {
227 expected: "open".into(),
228 actual: "catalog not initialized".into(),
229 })?;
230 let table = self
231 .table
232 .as_ref()
233 .ok_or_else(|| ConnectorError::InvalidState {
234 expected: "open".into(),
235 actual: "table not loaded".into(),
236 })?;
237
238 let file_io = table.file_io().clone();
239 let location = table.metadata().location().to_string();
240 let schema = table.current_schema_ref();
241
242 self.validate_schema_not_drifted()?;
243
244 let props = parquet::file::properties::WriterProperties::builder()
245 .set_compression(Self::parquet_compression(&self.config.compression))
246 .build();
247 let writer_builder = ParquetWriterBuilder::new(props, schema);
248
249 let mut all_data_files = Vec::new();
250
251 for (idx, batch) in self.staged_batches.iter().enumerate() {
252 if batch.num_rows() == 0 {
253 continue;
254 }
255
256 let aligned = self.align_batch_to_iceberg_schema(batch)?;
261
262 let file_path = format!(
263 "{location}/data/{}-{}-{}-{idx}.parquet",
264 self.config.writer_id,
265 self.current_epoch,
266 uuid::Uuid::new_v4(),
267 );
268
269 let output_file = file_io
270 .new_output(&file_path)
271 .map_err(|e| ConnectorError::WriteError(format!("create output: {e}")))?;
272
273 let mut writer = writer_builder
274 .clone()
275 .build(output_file)
276 .await
277 .map_err(|e| ConnectorError::WriteError(format!("build parquet writer: {e}")))?;
278
279 writer
280 .write(&aligned)
281 .await
282 .map_err(|e| ConnectorError::WriteError(format!("parquet write: {e}")))?;
283
284 let data_file_builders = writer
285 .close()
286 .await
287 .map_err(|e| ConnectorError::WriteError(format!("close parquet writer: {e}")))?;
288
289 for dfb in data_file_builders {
290 let data_file = dfb
291 .build()
292 .map_err(|e| ConnectorError::WriteError(format!("data file build: {e}")))?;
293 all_data_files.push(data_file);
294 }
295 }
296
297 if all_data_files.is_empty() {
298 debug!(epoch = self.current_epoch, "no data files to commit");
299 return Ok(());
300 }
301
302 let updated_table = super::iceberg_io::commit_data_files(
303 table,
304 catalog.as_ref(),
305 all_data_files,
306 Some((&self.config.writer_id, self.current_epoch)),
307 )
308 .await?;
309
310 self.table = Some(updated_table);
313
314 let table = self.table.as_ref().expect("just set above");
320 let new_iceberg_schema = table.current_schema_ref();
321 match iceberg::arrow::schema_to_arrow_schema(&new_iceberg_schema) {
322 Ok(arrow_schema) => {
323 self.iceberg_arrow_schema = Some(std::sync::Arc::new(arrow_schema));
324 }
325 Err(e) => {
326 self.iceberg_arrow_schema = None;
330 warn!(
331 epoch = self.current_epoch,
332 error = %e,
333 "failed to refresh Iceberg Arrow schema; cache invalidated"
334 );
335 }
336 }
337
338 info!(
339 epoch = self.current_epoch,
340 rows = self.staged_rows,
341 "iceberg commit succeeded"
342 );
343
344 Ok(())
345 }
346}
347
348#[async_trait]
349impl SinkConnector for IcebergSink {
350 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
351 if !config.properties().is_empty() {
353 self.config = IcebergSinkConfig::from_config(config)?;
354 }
355
356 #[cfg(feature = "iceberg")]
357 {
358 let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
359 let ns = &self.config.catalog.namespace;
360 let tbl = &self.config.catalog.table_name;
361
362 if self.config.auto_create {
363 if let Some(schema) = config.arrow_schema() {
364 super::iceberg_io::ensure_table_exists(catalog.as_ref(), ns, tbl, &schema)
365 .await?;
366 }
367 }
368
369 let table = super::iceberg_io::load_table(catalog.as_ref(), ns, tbl).await?;
370
371 let iceberg_schema = table.current_schema_ref();
373 let table_schema = std::sync::Arc::new(
374 iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
375 ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
376 })?,
377 );
378
379 self.iceberg_arrow_schema = Some(table_schema.clone());
382
383 if self.schema.is_none() {
384 self.schema = Some(table_schema.clone());
385 }
386
387 if let Some(epoch) =
389 super::iceberg_io::get_last_committed_epoch(&table, &self.config.writer_id)
390 {
391 self.last_committed_epoch = epoch;
392 info!(writer_id = %self.config.writer_id, epoch, "recovered last committed epoch");
393 }
394
395 if let Some(pipeline_schema) = config.arrow_schema() {
398 super::iceberg_config::validate_sink_schema(&pipeline_schema, &table_schema)?;
399 self.schema = Some(pipeline_schema);
400 }
401
402 self.catalog = Some(catalog);
403 self.table = Some(table);
404 self.state = ConnectorState::Running;
405
406 info!(table = tbl, namespace = ns, "iceberg sink connected");
407 return Ok(());
408 }
409
410 #[cfg(not(feature = "iceberg"))]
411 {
412 self.state = ConnectorState::Failed;
413 Err(ConnectorError::ConfigurationError(
414 "Apache Iceberg requires the 'iceberg' feature".into(),
415 ))
416 }
417 }
418
419 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
420 if batch.num_rows() == 0 || self.epoch_skipped {
421 return Ok(WriteResult::new(0, 0));
422 }
423
424 if self.schema.is_none() {
425 self.schema = Some(batch.schema());
426 }
427
428 let rows = batch.num_rows();
429 self.buffer.push(batch.clone());
430 self.buffered_rows += rows;
431
432 Ok(WriteResult::new(rows, 0))
433 }
434
435 fn schema(&self) -> SchemaRef {
436 self.schema
437 .clone()
438 .unwrap_or_else(|| std::sync::Arc::new(arrow_schema::Schema::empty()))
439 }
440
441 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
442 self.current_epoch = epoch;
443 self.epoch_skipped = false;
444 self.clear_buffer();
445 self.clear_staged();
446
447 if epoch > 0 && epoch <= self.last_committed_epoch {
448 debug!(
449 epoch,
450 last = self.last_committed_epoch,
451 "epoch already committed, skipping"
452 );
453 self.epoch_skipped = true;
454 }
455
456 Ok(())
457 }
458
459 async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
460 if self.epoch_skipped {
461 return Ok(());
462 }
463
464 std::mem::swap(&mut self.staged_batches, &mut self.buffer);
465 self.staged_rows = self.buffered_rows;
466 self.clear_buffer();
467
468 Ok(())
469 }
470
471 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
472 if self.epoch_skipped || self.staged_rows == 0 {
473 self.clear_staged();
474 return Ok(());
475 }
476
477 #[cfg(feature = "iceberg")]
478 {
479 self.commit_to_iceberg().await?;
480 }
481
482 self.last_committed_epoch = epoch;
483 self.clear_staged();
484 Ok(())
485 }
486
487 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
488 warn!(epoch, "iceberg rollback: discarding staged data");
489 self.clear_buffer();
490 self.clear_staged();
491 self.epoch_skipped = false;
492 Ok(())
493 }
494
495 fn capabilities(&self) -> SinkConnectorCapabilities {
496 SinkConnectorCapabilities::new(Duration::from_secs(300))
498 .with_exactly_once()
499 .with_two_phase_commit()
500 }
501
502 async fn close(&mut self) -> Result<(), ConnectorError> {
503 #[cfg(feature = "iceberg")]
504 {
505 self.catalog = None;
506 self.table = None;
507 self.iceberg_arrow_schema = None;
508 }
509 self.state = ConnectorState::Closed;
510 Ok(())
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use arrow_array::Int64Array;
518 use arrow_schema::{DataType, Field, Schema};
519 use std::sync::Arc;
520
521 fn test_schema() -> SchemaRef {
522 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
523 }
524
525 fn test_batch(n: usize) -> RecordBatch {
526 let ids: Vec<i64> = (0..n as i64).collect();
527 RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
528 }
529
530 fn test_config() -> IcebergSinkConfig {
531 let mut config = ConnectorConfig::new("iceberg");
532 config.set("catalog.uri", "http://localhost:8181");
533 config.set("warehouse", "s3://test/wh");
534 config.set("namespace", "test");
535 config.set("table.name", "events");
536 IcebergSinkConfig::from_config(&config).unwrap()
537 }
538
539 #[test]
540 fn test_new_sink() {
541 let sink = IcebergSink::new(test_config(), None);
542 assert!(sink.schema.is_none());
543 assert_eq!(sink.current_epoch, 0);
544 assert_eq!(sink.buffered_rows, 0);
545 }
546
547 #[tokio::test]
548 async fn test_write_buffers_batches() {
549 let mut sink = IcebergSink::new(test_config(), None);
550 sink.begin_epoch(1).await.unwrap();
551
552 let result = sink.write_batch(&test_batch(100)).await.unwrap();
553 assert_eq!(result.records_written, 100);
554 assert_eq!(sink.buffered_rows, 100);
555 assert_eq!(sink.buffer.len(), 1);
556
557 let result = sink.write_batch(&test_batch(50)).await.unwrap();
558 assert_eq!(result.records_written, 50);
559 assert_eq!(sink.buffered_rows, 150);
560 assert_eq!(sink.buffer.len(), 2);
561 }
562
563 #[tokio::test]
564 async fn test_pre_commit_stages_buffer() {
565 let mut sink = IcebergSink::new(test_config(), None);
566 sink.begin_epoch(1).await.unwrap();
567 sink.write_batch(&test_batch(100)).await.unwrap();
568
569 sink.pre_commit(1).await.unwrap();
570 assert_eq!(sink.staged_rows, 100);
571 assert_eq!(sink.staged_batches.len(), 1);
572 assert!(sink.buffer.is_empty());
573 assert_eq!(sink.buffered_rows, 0);
574 }
575
576 #[tokio::test]
577 async fn test_rollback_clears_staged() {
578 let mut sink = IcebergSink::new(test_config(), None);
579 sink.begin_epoch(1).await.unwrap();
580 sink.write_batch(&test_batch(100)).await.unwrap();
581 sink.pre_commit(1).await.unwrap();
582
583 sink.rollback_epoch(1).await.unwrap();
584 assert!(sink.staged_batches.is_empty());
585 assert_eq!(sink.staged_rows, 0);
586 assert!(sink.buffer.is_empty());
587 }
588
589 #[tokio::test]
590 async fn test_epoch_skip_when_already_committed() {
591 let mut sink = IcebergSink::new(test_config(), None);
592 sink.last_committed_epoch = 5;
593
594 sink.begin_epoch(3).await.unwrap();
595 assert!(sink.epoch_skipped);
596
597 let result = sink.write_batch(&test_batch(100)).await.unwrap();
598 assert_eq!(result.records_written, 0);
599 }
600
601 #[tokio::test]
602 async fn test_empty_epoch_commit() {
603 let mut sink = IcebergSink::new(test_config(), None);
604 sink.begin_epoch(1).await.unwrap();
605 sink.pre_commit(1).await.unwrap();
606 sink.commit_epoch(1).await.unwrap();
607 }
608
609 #[test]
610 fn test_capabilities() {
611 let sink = IcebergSink::new(test_config(), None);
612 let caps = sink.capabilities();
613 assert!(caps.exactly_once);
614 assert!(caps.two_phase_commit);
615 assert!(!caps.partitioned);
616 assert!(!caps.upsert);
617 }
618}