Skip to main content

laminar_db/ffi/
writer.rs

1//! FFI writer functions.
2//!
3//! Provides `extern "C"` wrappers for data ingestion operations.
4
5use std::ffi::{c_char, CStr};
6
7use crate::api::Writer;
8
9use super::connection::LaminarConnection;
10use super::error::{
11    clear_last_error, set_last_error, LAMINAR_ERR_INVALID_UTF8, LAMINAR_ERR_NULL_POINTER,
12    LAMINAR_OK,
13};
14use super::query::LaminarRecordBatch;
15
16/// Opaque writer handle for FFI.
17#[repr(C)]
18pub struct LaminarWriter {
19    pub(super) inner: Option<Writer>,
20}
21
22impl LaminarWriter {
23    /// Create from Writer.
24    fn new(writer: Writer) -> Self {
25        Self {
26            inner: Some(writer),
27        }
28    }
29}
30
31/// Create a writer for a source.
32///
33/// # Arguments
34///
35/// * `conn` - Database connection
36/// * `source_name` - Null-terminated source name
37/// * `out` - Pointer to receive writer handle
38///
39/// # Returns
40///
41/// `LAMINAR_OK` on success, or an error code.
42///
43/// # Safety
44///
45/// * `conn` must be a valid connection handle
46/// * `source_name` must be a valid null-terminated UTF-8 string
47/// * `out` must be a valid pointer
48#[no_mangle]
49pub unsafe extern "C" fn laminar_writer_create(
50    conn: *mut LaminarConnection,
51    source_name: *const c_char,
52    out: *mut *mut LaminarWriter,
53) -> i32 {
54    clear_last_error();
55
56    if conn.is_null() || source_name.is_null() || out.is_null() {
57        return LAMINAR_ERR_NULL_POINTER;
58    }
59
60    // SAFETY: source_name is non-null (checked above)
61    let Ok(name_str) = (unsafe { CStr::from_ptr(source_name) }).to_str() else {
62        return LAMINAR_ERR_INVALID_UTF8;
63    };
64
65    // SAFETY: conn is non-null (checked above)
66    let conn_ref = unsafe { &(*conn).inner };
67
68    match conn_ref.writer(name_str) {
69        Ok(writer) => {
70            let handle = Box::new(LaminarWriter::new(writer));
71            // SAFETY: out is non-null (checked above)
72            unsafe { *out = Box::into_raw(handle) };
73            LAMINAR_OK
74        }
75        Err(e) => {
76            let code = e.code();
77            set_last_error(e);
78            code
79        }
80    }
81}
82
83/// Write a record batch to the source.
84///
85/// # Arguments
86///
87/// * `writer` - Writer handle
88/// * `batch` - Record batch to write (ownership is transferred)
89///
90/// # Returns
91///
92/// `LAMINAR_OK` on success, or an error code.
93///
94/// # Safety
95///
96/// * `writer` must be a valid writer handle
97/// * `batch` must be a valid record batch handle (will be consumed)
98#[no_mangle]
99pub unsafe extern "C" fn laminar_writer_write(
100    writer: *mut LaminarWriter,
101    batch: *mut LaminarRecordBatch,
102) -> i32 {
103    clear_last_error();
104
105    if writer.is_null() || batch.is_null() {
106        return LAMINAR_ERR_NULL_POINTER;
107    }
108
109    // SAFETY: batch is non-null (checked above), take ownership
110    let batch_box = unsafe { Box::from_raw(batch) };
111    let record_batch = batch_box.into_inner();
112
113    // SAFETY: writer is non-null (checked above)
114    let writer_ref = unsafe { &mut (*writer).inner };
115
116    if let Some(w) = writer_ref.as_mut() {
117        match w.write(record_batch) {
118            Ok(()) => LAMINAR_OK,
119            Err(e) => {
120                let code = e.code();
121                set_last_error(e);
122                code
123            }
124        }
125    } else {
126        set_last_error(crate::api::ApiError::internal("Writer already closed"));
127        LAMINAR_ERR_NULL_POINTER
128    }
129}
130
131/// Flush buffered data.
132///
133/// # Arguments
134///
135/// * `writer` - Writer handle
136///
137/// # Returns
138///
139/// `LAMINAR_OK` on success, or an error code.
140///
141/// # Safety
142///
143/// `writer` must be a valid writer handle.
144#[no_mangle]
145pub unsafe extern "C" fn laminar_writer_flush(writer: *mut LaminarWriter) -> i32 {
146    clear_last_error();
147
148    if writer.is_null() {
149        return LAMINAR_ERR_NULL_POINTER;
150    }
151
152    // SAFETY: writer is non-null (checked above)
153    let writer_ref = unsafe { &mut (*writer).inner };
154
155    if let Some(w) = writer_ref.as_mut() {
156        match w.flush() {
157            Ok(()) => LAMINAR_OK,
158            Err(e) => {
159                let code = e.code();
160                set_last_error(e);
161                code
162            }
163        }
164    } else {
165        set_last_error(crate::api::ApiError::internal("Writer already closed"));
166        LAMINAR_ERR_NULL_POINTER
167    }
168}
169
170/// Close the writer and release resources.
171///
172/// After calling this, the writer handle can still be freed with `laminar_writer_free`,
173/// but no more writes are allowed.
174///
175/// # Arguments
176///
177/// * `writer` - Writer handle
178///
179/// # Returns
180///
181/// `LAMINAR_OK` on success, or an error code.
182///
183/// # Safety
184///
185/// `writer` must be a valid writer handle.
186#[no_mangle]
187pub unsafe extern "C" fn laminar_writer_close(writer: *mut LaminarWriter) -> i32 {
188    clear_last_error();
189
190    if writer.is_null() {
191        return LAMINAR_ERR_NULL_POINTER;
192    }
193
194    // SAFETY: writer is non-null (checked above)
195    let writer_ref = unsafe { &mut (*writer).inner };
196
197    match writer_ref.take() {
198        Some(w) => match w.close() {
199            Ok(()) => LAMINAR_OK,
200            Err(e) => {
201                let code = e.code();
202                set_last_error(e);
203                code
204            }
205        },
206        None => {
207            // Already closed, that's fine
208            LAMINAR_OK
209        }
210    }
211}
212
213/// Free a writer handle.
214///
215/// # Arguments
216///
217/// * `writer` - Writer handle to free
218///
219/// # Safety
220///
221/// `writer` must be a valid handle from `laminar_writer_create`, or NULL.
222#[no_mangle]
223pub unsafe extern "C" fn laminar_writer_free(writer: *mut LaminarWriter) {
224    if !writer.is_null() {
225        // SAFETY: writer is non-null and was allocated by Box
226        drop(unsafe { Box::from_raw(writer) });
227    }
228}
229
230#[cfg(test)]
231#[allow(clippy::borrow_as_ptr)]
232mod tests {
233    use super::*;
234    use crate::ffi::connection::{laminar_close, laminar_execute, laminar_open};
235    use std::ptr;
236
237    #[test]
238    fn test_writer_create() {
239        let mut conn: *mut LaminarConnection = ptr::null_mut();
240        let mut writer: *mut LaminarWriter = ptr::null_mut();
241
242        // SAFETY: Test code with valid pointers
243        unsafe {
244            laminar_open(&mut conn);
245
246            // Create source
247            let sql = b"CREATE SOURCE writer_test (id BIGINT)\0";
248            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
249
250            // Create writer
251            let name = b"writer_test\0";
252            let rc = laminar_writer_create(conn, name.as_ptr().cast(), &mut writer);
253            assert_eq!(rc, LAMINAR_OK);
254            assert!(!writer.is_null());
255
256            // Close writer
257            let rc = laminar_writer_close(writer);
258            assert_eq!(rc, LAMINAR_OK);
259
260            laminar_writer_free(writer);
261            laminar_close(conn);
262        }
263    }
264
265    #[test]
266    fn test_writer_null_pointer() {
267        // SAFETY: Testing null pointer handling
268        let rc = unsafe { laminar_writer_flush(ptr::null_mut()) };
269        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
270    }
271
272    #[test]
273    fn test_writer_source_not_found() {
274        let mut conn: *mut LaminarConnection = ptr::null_mut();
275        let mut writer: *mut LaminarWriter = ptr::null_mut();
276
277        // SAFETY: Test code with valid pointers
278        unsafe {
279            laminar_open(&mut conn);
280
281            let name = b"nonexistent_source\0";
282            let rc = laminar_writer_create(conn, name.as_ptr().cast(), &mut writer);
283            assert_ne!(rc, LAMINAR_OK);
284            assert!(writer.is_null() || rc != LAMINAR_OK);
285
286            laminar_close(conn);
287        }
288    }
289}