Skip to main content

laminar_db/ffi/
arrow_ffi.rs

1//! Arrow C Data Interface for zero-copy data exchange.
2
3use arrow::array::{Array, RecordBatch, StructArray};
4use arrow::ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema};
5
6use super::error::{
7    clear_last_error, set_last_error, LAMINAR_ERR_INTERNAL, LAMINAR_ERR_NULL_POINTER, LAMINAR_OK,
8};
9use super::query::LaminarRecordBatch;
10use super::schema::LaminarSchema;
11use crate::api::ApiError;
12
13/// Export a `RecordBatch` to the Arrow C Data Interface.
14///
15/// The batch is exported as a struct array (Arrow convention for record batches).
16/// The caller must allocate the `ArrowArray` and `ArrowSchema` structs before calling.
17///
18/// # Arguments
19///
20/// * `batch` - Record batch to export
21/// * `out_array` - Pointer to caller-allocated `ArrowArray` struct
22/// * `out_schema` - Pointer to caller-allocated `ArrowSchema` struct
23///
24/// # Returns
25///
26/// `LAMINAR_OK` on success, or an error code.
27///
28/// # Safety
29///
30/// * `batch` must be a valid batch handle
31/// * `out_array` and `out_schema` must be valid pointers to uninitialized structs
32/// * Caller must eventually call the release callbacks on both structs
33#[no_mangle]
34pub unsafe extern "C" fn laminar_batch_export(
35    batch: *mut LaminarRecordBatch,
36    out_array: *mut FFI_ArrowArray,
37    out_schema: *mut FFI_ArrowSchema,
38) -> i32 {
39    clear_last_error();
40
41    if batch.is_null() || out_array.is_null() || out_schema.is_null() {
42        return LAMINAR_ERR_NULL_POINTER;
43    }
44
45    // SAFETY: batch is non-null (checked above)
46    let batch_ref = unsafe { &(*batch) };
47    let record_batch = batch_ref.inner();
48
49    // Convert RecordBatch to StructArray for export (Arrow convention)
50    let struct_array: StructArray = record_batch.clone().into();
51    let data = struct_array.into_data();
52
53    match to_ffi(&data) {
54        Ok((array, schema)) => {
55            // SAFETY: out_array and out_schema are non-null (checked above)
56            unsafe {
57                std::ptr::write(out_array, array);
58                std::ptr::write(out_schema, schema);
59            }
60            LAMINAR_OK
61        }
62        Err(e) => {
63            set_last_error(ApiError::internal(format!("Arrow FFI export failed: {e}")));
64            LAMINAR_ERR_INTERNAL
65        }
66    }
67}
68
69/// Export just the schema to the Arrow C Data Interface.
70///
71/// Useful when consumers need the schema before receiving data.
72///
73/// # Arguments
74///
75/// * `schema` - Schema to export
76/// * `out_schema` - Pointer to caller-allocated `ArrowSchema` struct
77///
78/// # Returns
79///
80/// `LAMINAR_OK` on success, or an error code.
81///
82/// # Safety
83///
84/// * `schema` must be a valid schema handle
85/// * `out_schema` must be a valid pointer to an uninitialized struct
86/// * Caller must eventually call the release callback
87#[no_mangle]
88pub unsafe extern "C" fn laminar_schema_export(
89    schema: *mut LaminarSchema,
90    out_schema: *mut FFI_ArrowSchema,
91) -> i32 {
92    clear_last_error();
93
94    if schema.is_null() || out_schema.is_null() {
95        return LAMINAR_ERR_NULL_POINTER;
96    }
97
98    // SAFETY: schema is non-null (checked above)
99    let schema_ref = unsafe { (*schema).schema() };
100
101    match FFI_ArrowSchema::try_from(schema_ref.as_ref()) {
102        Ok(ffi_schema) => {
103            // SAFETY: out_schema is non-null (checked above)
104            unsafe {
105                std::ptr::write(out_schema, ffi_schema);
106            }
107            LAMINAR_OK
108        }
109        Err(e) => {
110            set_last_error(ApiError::internal(format!(
111                "Arrow FFI schema export failed: {e}"
112            )));
113            LAMINAR_ERR_INTERNAL
114        }
115    }
116}
117
118/// Export a single column from a `RecordBatch` to the Arrow C Data Interface.
119///
120/// # Arguments
121///
122/// * `batch` - Record batch containing the column
123/// * `column_index` - Zero-based index of the column to export
124/// * `out_array` - Pointer to caller-allocated `ArrowArray` struct
125/// * `out_schema` - Pointer to caller-allocated `ArrowSchema` struct
126///
127/// # Returns
128///
129/// `LAMINAR_OK` on success, or an error code.
130///
131/// # Safety
132///
133/// * `batch` must be a valid batch handle
134/// * `column_index` must be less than the number of columns
135/// * `out_array` and `out_schema` must be valid pointers
136/// * Caller must eventually call the release callbacks
137#[no_mangle]
138pub unsafe extern "C" fn laminar_batch_export_column(
139    batch: *mut LaminarRecordBatch,
140    column_index: usize,
141    out_array: *mut FFI_ArrowArray,
142    out_schema: *mut FFI_ArrowSchema,
143) -> i32 {
144    clear_last_error();
145
146    if batch.is_null() || out_array.is_null() || out_schema.is_null() {
147        return LAMINAR_ERR_NULL_POINTER;
148    }
149
150    // SAFETY: batch is non-null (checked above)
151    let batch_ref = unsafe { &(*batch) };
152    let record_batch = batch_ref.inner();
153
154    if column_index >= record_batch.num_columns() {
155        set_last_error(ApiError::internal(format!(
156            "Column index {column_index} out of bounds (batch has {} columns)",
157            record_batch.num_columns()
158        )));
159        return LAMINAR_ERR_NULL_POINTER;
160    }
161
162    let column = record_batch.column(column_index);
163    let data = column.to_data();
164
165    match to_ffi(&data) {
166        Ok((array, schema)) => {
167            // SAFETY: out_array and out_schema are non-null (checked above)
168            unsafe {
169                std::ptr::write(out_array, array);
170                std::ptr::write(out_schema, schema);
171            }
172            LAMINAR_OK
173        }
174        Err(e) => {
175            set_last_error(ApiError::internal(format!(
176                "Arrow FFI column export failed: {e}"
177            )));
178            LAMINAR_ERR_INTERNAL
179        }
180    }
181}
182
183/// Import a `RecordBatch` from the Arrow C Data Interface.
184///
185/// Takes ownership of the `ArrowArray` and `ArrowSchema` structs.
186/// The release callbacks will be called automatically.
187///
188/// # Arguments
189///
190/// * `array` - Pointer to `ArrowArray` struct (ownership transferred)
191/// * `schema` - Pointer to `ArrowSchema` struct (ownership transferred)
192/// * `out` - Pointer to receive the new batch handle
193///
194/// # Returns
195///
196/// `LAMINAR_OK` on success, or an error code.
197///
198/// # Safety
199///
200/// * `array` and `schema` must be valid Arrow C Data Interface structs
201/// * Ownership is transferred - the structs will be released by this function
202/// * `out` must be a valid pointer
203#[no_mangle]
204pub unsafe extern "C" fn laminar_batch_import(
205    array: *mut FFI_ArrowArray,
206    schema: *mut FFI_ArrowSchema,
207    out: *mut *mut LaminarRecordBatch,
208) -> i32 {
209    clear_last_error();
210
211    if array.is_null() || schema.is_null() || out.is_null() {
212        return LAMINAR_ERR_NULL_POINTER;
213    }
214
215    // SAFETY: array and schema are non-null (checked above)
216    // Take ownership by reading the structs
217    let ffi_array = unsafe { std::ptr::read(array) };
218    let ffi_schema = unsafe { std::ptr::read(schema) };
219
220    // Clear the original pointers to prevent double-free
221    // (The consumer should not use these after calling import)
222    unsafe {
223        std::ptr::write_bytes(array, 0, 1);
224        std::ptr::write_bytes(schema, 0, 1);
225    }
226
227    match from_ffi(ffi_array, &ffi_schema) {
228        Ok(data) => {
229            // Convert ArrayData to StructArray then to RecordBatch
230            let struct_array = StructArray::from(data);
231            let batch = RecordBatch::from(struct_array);
232
233            let handle = Box::new(LaminarRecordBatch::new(batch));
234            // SAFETY: out is non-null (checked above)
235            unsafe { *out = Box::into_raw(handle) };
236            LAMINAR_OK
237        }
238        Err(e) => {
239            set_last_error(ApiError::internal(format!("Arrow FFI import failed: {e}")));
240            LAMINAR_ERR_INTERNAL
241        }
242    }
243}
244
245/// Create a `RecordBatch` from Arrow C Data Interface for writing.
246///
247/// This is an alias for `laminar_batch_import` for clarity when the
248/// intent is to create data for writing rather than receiving query results.
249///
250/// # Safety
251///
252/// Same requirements as `laminar_batch_import`.
253#[no_mangle]
254pub unsafe extern "C" fn laminar_batch_create(
255    array: *mut FFI_ArrowArray,
256    schema: *mut FFI_ArrowSchema,
257    out: *mut *mut LaminarRecordBatch,
258) -> i32 {
259    laminar_batch_import(array, schema, out)
260}
261
262#[cfg(test)]
263#[allow(clippy::borrow_as_ptr)]
264mod tests {
265    use super::*;
266    use arrow::array::{Int64Array, StringArray};
267    use arrow::datatypes::{DataType, Field, Schema};
268    use std::sync::Arc;
269
270    fn create_test_batch() -> RecordBatch {
271        let schema = Arc::new(Schema::new(vec![
272            Field::new("id", DataType::Int64, false),
273            Field::new("name", DataType::Utf8, true),
274        ]));
275
276        RecordBatch::try_new(
277            schema,
278            vec![
279                Arc::new(Int64Array::from(vec![1, 2, 3])),
280                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
281            ],
282        )
283        .unwrap()
284    }
285
286    #[test]
287    fn test_export_import_roundtrip() {
288        let batch = create_test_batch();
289        let mut ffi_batch = LaminarRecordBatch::new(batch.clone());
290
291        // Export
292        let mut out_array = FFI_ArrowArray::empty();
293        let mut out_schema = FFI_ArrowSchema::empty();
294
295        let rc = unsafe { laminar_batch_export(&mut ffi_batch, &mut out_array, &mut out_schema) };
296        assert_eq!(rc, LAMINAR_OK);
297
298        // Import
299        let mut imported: *mut LaminarRecordBatch = std::ptr::null_mut();
300        let rc = unsafe { laminar_batch_import(&mut out_array, &mut out_schema, &mut imported) };
301        assert_eq!(rc, LAMINAR_OK);
302        assert!(!imported.is_null());
303
304        // Verify data matches
305        let imported_batch = unsafe { (*imported).inner() };
306        assert_eq!(batch.num_rows(), imported_batch.num_rows());
307        assert_eq!(batch.num_columns(), imported_batch.num_columns());
308
309        // Clean up
310        unsafe {
311            super::super::query::laminar_batch_free(imported);
312        }
313    }
314
315    #[test]
316    fn test_export_column() {
317        let batch = create_test_batch();
318        let mut ffi_batch = LaminarRecordBatch::new(batch);
319
320        let mut out_array = FFI_ArrowArray::empty();
321        let mut out_schema = FFI_ArrowSchema::empty();
322
323        // Export first column
324        let rc = unsafe {
325            laminar_batch_export_column(&mut ffi_batch, 0, &mut out_array, &mut out_schema)
326        };
327        assert_eq!(rc, LAMINAR_OK);
328
329        // Import and verify
330        let data = unsafe { from_ffi(out_array, &out_schema) }.unwrap();
331        let array = Int64Array::from(data);
332        assert_eq!(array.len(), 3);
333        assert_eq!(array.value(0), 1);
334        assert_eq!(array.value(1), 2);
335        assert_eq!(array.value(2), 3);
336    }
337
338    #[test]
339    fn test_export_column_out_of_bounds() {
340        let batch = create_test_batch();
341        let mut ffi_batch = LaminarRecordBatch::new(batch);
342
343        let mut out_array = FFI_ArrowArray::empty();
344        let mut out_schema = FFI_ArrowSchema::empty();
345
346        // Try to export non-existent column
347        let rc = unsafe {
348            laminar_batch_export_column(&mut ffi_batch, 99, &mut out_array, &mut out_schema)
349        };
350        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
351    }
352
353    #[test]
354    fn test_schema_export() {
355        let schema = Arc::new(Schema::new(vec![
356            Field::new("id", DataType::Int64, false),
357            Field::new("value", DataType::Float64, true),
358        ]));
359
360        let mut ffi_schema_handle = LaminarSchema::new(schema);
361        let mut out_schema = FFI_ArrowSchema::empty();
362
363        let rc = unsafe { laminar_schema_export(&mut ffi_schema_handle, &mut out_schema) };
364        assert_eq!(rc, LAMINAR_OK);
365
366        // The schema is released when dropped
367        drop(out_schema);
368    }
369
370    #[test]
371    fn test_null_pointer_checks() {
372        let mut out_array = FFI_ArrowArray::empty();
373        let mut out_schema = FFI_ArrowSchema::empty();
374        let mut out: *mut LaminarRecordBatch = std::ptr::null_mut();
375
376        // Export with null batch
377        let rc =
378            unsafe { laminar_batch_export(std::ptr::null_mut(), &mut out_array, &mut out_schema) };
379        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
380
381        // Import with null array
382        let rc = unsafe { laminar_batch_import(std::ptr::null_mut(), &mut out_schema, &mut out) };
383        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
384
385        // Schema export with null schema
386        let rc = unsafe { laminar_schema_export(std::ptr::null_mut(), &mut out_schema) };
387        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
388    }
389}