1use std::ffi::{c_char, CStr};
6
7use crate::api::Writer;
8
9use super::connection::LaminarConnection;
10use super::error::{
11 clear_last_error, set_last_error, LAMINAR_ERR_INVALID_UTF8, LAMINAR_ERR_NULL_POINTER,
12 LAMINAR_OK,
13};
14use super::query::LaminarRecordBatch;
15
16#[repr(C)]
18pub struct LaminarWriter {
19 pub(super) inner: Option<Writer>,
20}
21
22impl LaminarWriter {
23 fn new(writer: Writer) -> Self {
25 Self {
26 inner: Some(writer),
27 }
28 }
29}
30
31#[no_mangle]
49pub unsafe extern "C" fn laminar_writer_create(
50 conn: *mut LaminarConnection,
51 source_name: *const c_char,
52 out: *mut *mut LaminarWriter,
53) -> i32 {
54 clear_last_error();
55
56 if conn.is_null() || source_name.is_null() || out.is_null() {
57 return LAMINAR_ERR_NULL_POINTER;
58 }
59
60 let Ok(name_str) = (unsafe { CStr::from_ptr(source_name) }).to_str() else {
62 return LAMINAR_ERR_INVALID_UTF8;
63 };
64
65 let conn_ref = unsafe { &(*conn).inner };
67
68 match conn_ref.writer(name_str) {
69 Ok(writer) => {
70 let handle = Box::new(LaminarWriter::new(writer));
71 unsafe { *out = Box::into_raw(handle) };
73 LAMINAR_OK
74 }
75 Err(e) => {
76 let code = e.code();
77 set_last_error(e);
78 code
79 }
80 }
81}
82
83#[no_mangle]
99pub unsafe extern "C" fn laminar_writer_write(
100 writer: *mut LaminarWriter,
101 batch: *mut LaminarRecordBatch,
102) -> i32 {
103 clear_last_error();
104
105 if writer.is_null() || batch.is_null() {
106 return LAMINAR_ERR_NULL_POINTER;
107 }
108
109 let batch_box = unsafe { Box::from_raw(batch) };
111 let record_batch = batch_box.into_inner();
112
113 let writer_ref = unsafe { &mut (*writer).inner };
115
116 if let Some(w) = writer_ref.as_mut() {
117 match w.write(record_batch) {
118 Ok(()) => LAMINAR_OK,
119 Err(e) => {
120 let code = e.code();
121 set_last_error(e);
122 code
123 }
124 }
125 } else {
126 set_last_error(crate::api::ApiError::internal("Writer already closed"));
127 LAMINAR_ERR_NULL_POINTER
128 }
129}
130
131#[no_mangle]
145pub unsafe extern "C" fn laminar_writer_flush(writer: *mut LaminarWriter) -> i32 {
146 clear_last_error();
147
148 if writer.is_null() {
149 return LAMINAR_ERR_NULL_POINTER;
150 }
151
152 let writer_ref = unsafe { &mut (*writer).inner };
154
155 if let Some(w) = writer_ref.as_mut() {
156 match w.flush() {
157 Ok(()) => LAMINAR_OK,
158 Err(e) => {
159 let code = e.code();
160 set_last_error(e);
161 code
162 }
163 }
164 } else {
165 set_last_error(crate::api::ApiError::internal("Writer already closed"));
166 LAMINAR_ERR_NULL_POINTER
167 }
168}
169
170#[no_mangle]
187pub unsafe extern "C" fn laminar_writer_close(writer: *mut LaminarWriter) -> i32 {
188 clear_last_error();
189
190 if writer.is_null() {
191 return LAMINAR_ERR_NULL_POINTER;
192 }
193
194 let writer_ref = unsafe { &mut (*writer).inner };
196
197 match writer_ref.take() {
198 Some(w) => match w.close() {
199 Ok(()) => LAMINAR_OK,
200 Err(e) => {
201 let code = e.code();
202 set_last_error(e);
203 code
204 }
205 },
206 None => {
207 LAMINAR_OK
209 }
210 }
211}
212
213#[no_mangle]
223pub unsafe extern "C" fn laminar_writer_free(writer: *mut LaminarWriter) {
224 if !writer.is_null() {
225 drop(unsafe { Box::from_raw(writer) });
227 }
228}
229
230#[cfg(test)]
231#[allow(clippy::borrow_as_ptr)]
232mod tests {
233 use super::*;
234 use crate::ffi::connection::{laminar_close, laminar_execute, laminar_open};
235 use std::ptr;
236
237 #[test]
238 fn test_writer_create() {
239 let mut conn: *mut LaminarConnection = ptr::null_mut();
240 let mut writer: *mut LaminarWriter = ptr::null_mut();
241
242 unsafe {
244 laminar_open(&mut conn);
245
246 let sql = b"CREATE SOURCE writer_test (id BIGINT)\0";
248 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
249
250 let name = b"writer_test\0";
252 let rc = laminar_writer_create(conn, name.as_ptr().cast(), &mut writer);
253 assert_eq!(rc, LAMINAR_OK);
254 assert!(!writer.is_null());
255
256 let rc = laminar_writer_close(writer);
258 assert_eq!(rc, LAMINAR_OK);
259
260 laminar_writer_free(writer);
261 laminar_close(conn);
262 }
263 }
264
265 #[test]
266 fn test_writer_null_pointer() {
267 let rc = unsafe { laminar_writer_flush(ptr::null_mut()) };
269 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
270 }
271
272 #[test]
273 fn test_writer_source_not_found() {
274 let mut conn: *mut LaminarConnection = ptr::null_mut();
275 let mut writer: *mut LaminarWriter = ptr::null_mut();
276
277 unsafe {
279 laminar_open(&mut conn);
280
281 let name = b"nonexistent_source\0";
282 let rc = laminar_writer_create(conn, name.as_ptr().cast(), &mut writer);
283 assert_ne!(rc, LAMINAR_OK);
284 assert!(writer.is_null() || rc != LAMINAR_OK);
285
286 laminar_close(conn);
287 }
288 }
289}