Skip to main content

laminar_db/ffi/
arrow_ffi.rs

1//! Arrow C Data Interface for zero-copy data exchange.
2//!
3//! This module provides `extern "C"` functions for exporting and importing
4//! Arrow data via the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html).
5//!
6//! # Zero-Copy Export
7//!
8//! The export functions transfer ownership of data buffers to the consumer.
9//! The consumer must call the release callbacks when done.
10//!
11//! # Usage
12//!
13//! ```c
14//! #include "laminar.h"
15//!
16//! // Query and get a batch
17//! LaminarRecordBatch* batch = ...;
18//!
19//! // Export to Arrow C Data Interface (caller allocates structs)
20//! struct ArrowArray array;
21//! struct ArrowSchema schema;
22//! int32_t rc = laminar_batch_export(batch, &array, &schema);
23//!
24//! // Consumer uses the data...
25//!
26//! // Consumer releases when done
27//! if (array.release) array.release(&array);
28//! if (schema.release) schema.release(&schema);
29//! ```
30
31use arrow::array::{Array, RecordBatch, StructArray};
32use arrow::ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema};
33
34use super::error::{
35    clear_last_error, set_last_error, LAMINAR_ERR_INTERNAL, LAMINAR_ERR_NULL_POINTER, LAMINAR_OK,
36};
37use super::query::LaminarRecordBatch;
38use super::schema::LaminarSchema;
39use crate::api::ApiError;
40
41/// Export a `RecordBatch` to the Arrow C Data Interface.
42///
43/// The batch is exported as a struct array (Arrow convention for record batches).
44/// The caller must allocate the `ArrowArray` and `ArrowSchema` structs before calling.
45///
46/// # Arguments
47///
48/// * `batch` - Record batch to export
49/// * `out_array` - Pointer to caller-allocated `ArrowArray` struct
50/// * `out_schema` - Pointer to caller-allocated `ArrowSchema` struct
51///
52/// # Returns
53///
54/// `LAMINAR_OK` on success, or an error code.
55///
56/// # Safety
57///
58/// * `batch` must be a valid batch handle
59/// * `out_array` and `out_schema` must be valid pointers to uninitialized structs
60/// * Caller must eventually call the release callbacks on both structs
61#[no_mangle]
62pub unsafe extern "C" fn laminar_batch_export(
63    batch: *mut LaminarRecordBatch,
64    out_array: *mut FFI_ArrowArray,
65    out_schema: *mut FFI_ArrowSchema,
66) -> i32 {
67    clear_last_error();
68
69    if batch.is_null() || out_array.is_null() || out_schema.is_null() {
70        return LAMINAR_ERR_NULL_POINTER;
71    }
72
73    // SAFETY: batch is non-null (checked above)
74    let batch_ref = unsafe { &(*batch) };
75    let record_batch = batch_ref.inner();
76
77    // Convert RecordBatch to StructArray for export (Arrow convention)
78    let struct_array: StructArray = record_batch.clone().into();
79    let data = struct_array.into_data();
80
81    match to_ffi(&data) {
82        Ok((array, schema)) => {
83            // SAFETY: out_array and out_schema are non-null (checked above)
84            unsafe {
85                std::ptr::write(out_array, array);
86                std::ptr::write(out_schema, schema);
87            }
88            LAMINAR_OK
89        }
90        Err(e) => {
91            set_last_error(ApiError::internal(format!("Arrow FFI export failed: {e}")));
92            LAMINAR_ERR_INTERNAL
93        }
94    }
95}
96
97/// Export just the schema to the Arrow C Data Interface.
98///
99/// Useful when consumers need the schema before receiving data.
100///
101/// # Arguments
102///
103/// * `schema` - Schema to export
104/// * `out_schema` - Pointer to caller-allocated `ArrowSchema` struct
105///
106/// # Returns
107///
108/// `LAMINAR_OK` on success, or an error code.
109///
110/// # Safety
111///
112/// * `schema` must be a valid schema handle
113/// * `out_schema` must be a valid pointer to an uninitialized struct
114/// * Caller must eventually call the release callback
115#[no_mangle]
116pub unsafe extern "C" fn laminar_schema_export(
117    schema: *mut LaminarSchema,
118    out_schema: *mut FFI_ArrowSchema,
119) -> i32 {
120    clear_last_error();
121
122    if schema.is_null() || out_schema.is_null() {
123        return LAMINAR_ERR_NULL_POINTER;
124    }
125
126    // SAFETY: schema is non-null (checked above)
127    let schema_ref = unsafe { (*schema).schema() };
128
129    match FFI_ArrowSchema::try_from(schema_ref.as_ref()) {
130        Ok(ffi_schema) => {
131            // SAFETY: out_schema is non-null (checked above)
132            unsafe {
133                std::ptr::write(out_schema, ffi_schema);
134            }
135            LAMINAR_OK
136        }
137        Err(e) => {
138            set_last_error(ApiError::internal(format!(
139                "Arrow FFI schema export failed: {e}"
140            )));
141            LAMINAR_ERR_INTERNAL
142        }
143    }
144}
145
146/// Export a single column from a `RecordBatch` to the Arrow C Data Interface.
147///
148/// # Arguments
149///
150/// * `batch` - Record batch containing the column
151/// * `column_index` - Zero-based index of the column to export
152/// * `out_array` - Pointer to caller-allocated `ArrowArray` struct
153/// * `out_schema` - Pointer to caller-allocated `ArrowSchema` struct
154///
155/// # Returns
156///
157/// `LAMINAR_OK` on success, or an error code.
158///
159/// # Safety
160///
161/// * `batch` must be a valid batch handle
162/// * `column_index` must be less than the number of columns
163/// * `out_array` and `out_schema` must be valid pointers
164/// * Caller must eventually call the release callbacks
165#[no_mangle]
166pub unsafe extern "C" fn laminar_batch_export_column(
167    batch: *mut LaminarRecordBatch,
168    column_index: usize,
169    out_array: *mut FFI_ArrowArray,
170    out_schema: *mut FFI_ArrowSchema,
171) -> i32 {
172    clear_last_error();
173
174    if batch.is_null() || out_array.is_null() || out_schema.is_null() {
175        return LAMINAR_ERR_NULL_POINTER;
176    }
177
178    // SAFETY: batch is non-null (checked above)
179    let batch_ref = unsafe { &(*batch) };
180    let record_batch = batch_ref.inner();
181
182    if column_index >= record_batch.num_columns() {
183        set_last_error(ApiError::internal(format!(
184            "Column index {column_index} out of bounds (batch has {} columns)",
185            record_batch.num_columns()
186        )));
187        return LAMINAR_ERR_NULL_POINTER;
188    }
189
190    let column = record_batch.column(column_index);
191    let data = column.to_data();
192
193    match to_ffi(&data) {
194        Ok((array, schema)) => {
195            // SAFETY: out_array and out_schema are non-null (checked above)
196            unsafe {
197                std::ptr::write(out_array, array);
198                std::ptr::write(out_schema, schema);
199            }
200            LAMINAR_OK
201        }
202        Err(e) => {
203            set_last_error(ApiError::internal(format!(
204                "Arrow FFI column export failed: {e}"
205            )));
206            LAMINAR_ERR_INTERNAL
207        }
208    }
209}
210
211/// Import a `RecordBatch` from the Arrow C Data Interface.
212///
213/// Takes ownership of the `ArrowArray` and `ArrowSchema` structs.
214/// The release callbacks will be called automatically.
215///
216/// # Arguments
217///
218/// * `array` - Pointer to `ArrowArray` struct (ownership transferred)
219/// * `schema` - Pointer to `ArrowSchema` struct (ownership transferred)
220/// * `out` - Pointer to receive the new batch handle
221///
222/// # Returns
223///
224/// `LAMINAR_OK` on success, or an error code.
225///
226/// # Safety
227///
228/// * `array` and `schema` must be valid Arrow C Data Interface structs
229/// * Ownership is transferred - the structs will be released by this function
230/// * `out` must be a valid pointer
231#[no_mangle]
232pub unsafe extern "C" fn laminar_batch_import(
233    array: *mut FFI_ArrowArray,
234    schema: *mut FFI_ArrowSchema,
235    out: *mut *mut LaminarRecordBatch,
236) -> i32 {
237    clear_last_error();
238
239    if array.is_null() || schema.is_null() || out.is_null() {
240        return LAMINAR_ERR_NULL_POINTER;
241    }
242
243    // SAFETY: array and schema are non-null (checked above)
244    // Take ownership by reading the structs
245    let ffi_array = unsafe { std::ptr::read(array) };
246    let ffi_schema = unsafe { std::ptr::read(schema) };
247
248    // Clear the original pointers to prevent double-free
249    // (The consumer should not use these after calling import)
250    unsafe {
251        std::ptr::write_bytes(array, 0, 1);
252        std::ptr::write_bytes(schema, 0, 1);
253    }
254
255    match from_ffi(ffi_array, &ffi_schema) {
256        Ok(data) => {
257            // Convert ArrayData to StructArray then to RecordBatch
258            let struct_array = StructArray::from(data);
259            let batch = RecordBatch::from(struct_array);
260
261            let handle = Box::new(LaminarRecordBatch::new(batch));
262            // SAFETY: out is non-null (checked above)
263            unsafe { *out = Box::into_raw(handle) };
264            LAMINAR_OK
265        }
266        Err(e) => {
267            set_last_error(ApiError::internal(format!("Arrow FFI import failed: {e}")));
268            LAMINAR_ERR_INTERNAL
269        }
270    }
271}
272
273/// Create a `RecordBatch` from Arrow C Data Interface for writing.
274///
275/// This is an alias for `laminar_batch_import` for clarity when the
276/// intent is to create data for writing rather than receiving query results.
277///
278/// # Safety
279///
280/// Same requirements as `laminar_batch_import`.
281#[no_mangle]
282pub unsafe extern "C" fn laminar_batch_create(
283    array: *mut FFI_ArrowArray,
284    schema: *mut FFI_ArrowSchema,
285    out: *mut *mut LaminarRecordBatch,
286) -> i32 {
287    laminar_batch_import(array, schema, out)
288}
289
290#[cfg(test)]
291#[allow(clippy::borrow_as_ptr)]
292mod tests {
293    use super::*;
294    use arrow::array::{Int64Array, StringArray};
295    use arrow::datatypes::{DataType, Field, Schema};
296    use std::sync::Arc;
297
298    fn create_test_batch() -> RecordBatch {
299        let schema = Arc::new(Schema::new(vec![
300            Field::new("id", DataType::Int64, false),
301            Field::new("name", DataType::Utf8, true),
302        ]));
303
304        RecordBatch::try_new(
305            schema,
306            vec![
307                Arc::new(Int64Array::from(vec![1, 2, 3])),
308                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
309            ],
310        )
311        .unwrap()
312    }
313
314    #[test]
315    fn test_export_import_roundtrip() {
316        let batch = create_test_batch();
317        let mut ffi_batch = LaminarRecordBatch::new(batch.clone());
318
319        // Export
320        let mut out_array = FFI_ArrowArray::empty();
321        let mut out_schema = FFI_ArrowSchema::empty();
322
323        let rc = unsafe { laminar_batch_export(&mut ffi_batch, &mut out_array, &mut out_schema) };
324        assert_eq!(rc, LAMINAR_OK);
325
326        // Import
327        let mut imported: *mut LaminarRecordBatch = std::ptr::null_mut();
328        let rc = unsafe { laminar_batch_import(&mut out_array, &mut out_schema, &mut imported) };
329        assert_eq!(rc, LAMINAR_OK);
330        assert!(!imported.is_null());
331
332        // Verify data matches
333        let imported_batch = unsafe { (*imported).inner() };
334        assert_eq!(batch.num_rows(), imported_batch.num_rows());
335        assert_eq!(batch.num_columns(), imported_batch.num_columns());
336
337        // Clean up
338        unsafe {
339            super::super::query::laminar_batch_free(imported);
340        }
341    }
342
343    #[test]
344    fn test_export_column() {
345        let batch = create_test_batch();
346        let mut ffi_batch = LaminarRecordBatch::new(batch);
347
348        let mut out_array = FFI_ArrowArray::empty();
349        let mut out_schema = FFI_ArrowSchema::empty();
350
351        // Export first column
352        let rc = unsafe {
353            laminar_batch_export_column(&mut ffi_batch, 0, &mut out_array, &mut out_schema)
354        };
355        assert_eq!(rc, LAMINAR_OK);
356
357        // Import and verify
358        let data = unsafe { from_ffi(out_array, &out_schema) }.unwrap();
359        let array = Int64Array::from(data);
360        assert_eq!(array.len(), 3);
361        assert_eq!(array.value(0), 1);
362        assert_eq!(array.value(1), 2);
363        assert_eq!(array.value(2), 3);
364    }
365
366    #[test]
367    fn test_export_column_out_of_bounds() {
368        let batch = create_test_batch();
369        let mut ffi_batch = LaminarRecordBatch::new(batch);
370
371        let mut out_array = FFI_ArrowArray::empty();
372        let mut out_schema = FFI_ArrowSchema::empty();
373
374        // Try to export non-existent column
375        let rc = unsafe {
376            laminar_batch_export_column(&mut ffi_batch, 99, &mut out_array, &mut out_schema)
377        };
378        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
379    }
380
381    #[test]
382    fn test_schema_export() {
383        let schema = Arc::new(Schema::new(vec![
384            Field::new("id", DataType::Int64, false),
385            Field::new("value", DataType::Float64, true),
386        ]));
387
388        let mut ffi_schema_handle = LaminarSchema::new(schema);
389        let mut out_schema = FFI_ArrowSchema::empty();
390
391        let rc = unsafe { laminar_schema_export(&mut ffi_schema_handle, &mut out_schema) };
392        assert_eq!(rc, LAMINAR_OK);
393
394        // The schema is released when dropped
395        drop(out_schema);
396    }
397
398    #[test]
399    fn test_null_pointer_checks() {
400        let mut out_array = FFI_ArrowArray::empty();
401        let mut out_schema = FFI_ArrowSchema::empty();
402        let mut out: *mut LaminarRecordBatch = std::ptr::null_mut();
403
404        // Export with null batch
405        let rc =
406            unsafe { laminar_batch_export(std::ptr::null_mut(), &mut out_array, &mut out_schema) };
407        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
408
409        // Import with null array
410        let rc = unsafe { laminar_batch_import(std::ptr::null_mut(), &mut out_schema, &mut out) };
411        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
412
413        // Schema export with null schema
414        let rc = unsafe { laminar_schema_export(std::ptr::null_mut(), &mut out_schema) };
415        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
416    }
417}