1use std::ffi::{c_char, c_void, CStr, CString};
44use std::sync::atomic::{AtomicBool, Ordering};
45use std::sync::Arc;
46use std::thread::{self, JoinHandle};
47
48use super::connection::LaminarConnection;
49use super::error::{
50 clear_last_error, set_last_error, LAMINAR_ERR_INVALID_UTF8, LAMINAR_ERR_NULL_POINTER,
51 LAMINAR_OK,
52};
53use super::query::LaminarRecordBatch;
54
55pub const LAMINAR_EVENT_INSERT: i32 = 0;
61pub const LAMINAR_EVENT_DELETE: i32 = 1;
63pub const LAMINAR_EVENT_UPDATE: i32 = 2;
65pub const LAMINAR_EVENT_WATERMARK: i32 = 3;
67pub const LAMINAR_EVENT_SNAPSHOT: i32 = 4;
69
70pub type LaminarSubscriptionCallback = Option<
86 unsafe extern "C" fn(user_data: *mut c_void, batch: *mut LaminarRecordBatch, event_type: i32),
87>;
88
89pub type LaminarErrorCallback = Option<
97 unsafe extern "C" fn(user_data: *mut c_void, error_code: i32, error_message: *const c_char),
98>;
99
100#[repr(C)]
108pub struct LaminarSubscriptionHandle {
109 cancelled: Arc<AtomicBool>,
111 thread_handle: Option<JoinHandle<()>>,
113 user_data: *mut c_void,
115}
116
117unsafe impl Send for LaminarSubscriptionHandle {}
120
121impl LaminarSubscriptionHandle {
122 fn new(
124 cancelled: Arc<AtomicBool>,
125 thread_handle: JoinHandle<()>,
126 user_data: *mut c_void,
127 ) -> Self {
128 Self {
129 cancelled,
130 thread_handle: Some(thread_handle),
131 user_data,
132 }
133 }
134
135 fn cancel(&mut self) {
137 self.cancelled.store(true, Ordering::SeqCst);
139
140 if let Some(handle) = self.thread_handle.take() {
142 let _ = handle.join();
143 }
144 }
145}
146
147struct CallbackContext {
153 user_data: *mut c_void,
155 on_data: LaminarSubscriptionCallback,
157 on_error: LaminarErrorCallback,
159 cancelled: Arc<AtomicBool>,
161}
162
163unsafe impl Send for CallbackContext {}
166
167impl CallbackContext {
168 fn call_on_data(&self, batch: LaminarRecordBatch, event_type: i32) {
170 if let Some(callback) = self.on_data {
171 let batch_ptr = Box::into_raw(Box::new(batch));
172 unsafe {
174 callback(self.user_data, batch_ptr, event_type);
175 }
176 }
177 }
178
179 fn call_on_error(&self, error_code: i32, message: &str) {
181 if let Some(callback) = self.on_error {
182 let c_message = CString::new(message)
183 .unwrap_or_else(|_| CString::new("Error message contained null byte").unwrap());
184 unsafe {
186 callback(self.user_data, error_code, c_message.as_ptr());
187 }
188 }
189 }
190
191 fn is_cancelled(&self) -> bool {
193 self.cancelled.load(Ordering::SeqCst)
194 }
195}
196
197#[no_mangle]
226pub unsafe extern "C" fn laminar_subscribe_callback(
227 conn: *mut LaminarConnection,
228 query: *const c_char,
229 on_data: LaminarSubscriptionCallback,
230 on_error: LaminarErrorCallback,
231 user_data: *mut c_void,
232 out: *mut *mut LaminarSubscriptionHandle,
233) -> i32 {
234 clear_last_error();
235
236 if conn.is_null() || query.is_null() || out.is_null() {
237 return LAMINAR_ERR_NULL_POINTER;
238 }
239
240 let Ok(query_str) = (unsafe { CStr::from_ptr(query) }).to_str() else {
242 return LAMINAR_ERR_INVALID_UTF8;
243 };
244
245 let conn_ref = unsafe { &(*conn).inner };
247
248 let stream = match conn_ref.query_stream(query_str) {
250 Ok(s) => s,
251 Err(e) => {
252 let code = e.code();
253 set_last_error(e);
254 return code;
255 }
256 };
257
258 let cancelled = Arc::new(AtomicBool::new(false));
260 let cancelled_clone = Arc::clone(&cancelled);
261
262 let ctx = CallbackContext {
264 user_data,
265 on_data,
266 on_error,
267 cancelled: cancelled_clone,
268 };
269
270 let thread_handle = thread::spawn(move || {
272 subscription_thread(stream, ctx);
273 });
274
275 let handle = Box::new(LaminarSubscriptionHandle::new(
277 cancelled,
278 thread_handle,
279 user_data,
280 ));
281
282 unsafe { *out = Box::into_raw(handle) };
284
285 LAMINAR_OK
286}
287
288#[allow(clippy::needless_pass_by_value)] fn subscription_thread(mut stream: crate::api::QueryStream, ctx: CallbackContext) {
291 loop {
292 if ctx.is_cancelled() {
294 break;
295 }
296
297 match stream.try_next() {
299 Ok(Some(batch)) => {
300 let event_type = LAMINAR_EVENT_INSERT;
302 ctx.call_on_data(LaminarRecordBatch::new(batch), event_type);
303 }
304 Ok(None) => {
305 if !stream.is_active() {
307 break;
308 }
309 std::thread::sleep(std::time::Duration::from_millis(1));
311 }
312 Err(e) => {
313 ctx.call_on_error(e.code(), e.message());
314 if !stream.is_active() {
316 break;
317 }
318 }
319 }
320 }
321}
322
323#[no_mangle]
340pub unsafe extern "C" fn laminar_subscription_cancel(
341 handle: *mut LaminarSubscriptionHandle,
342) -> i32 {
343 clear_last_error();
344
345 if handle.is_null() {
346 return LAMINAR_ERR_NULL_POINTER;
347 }
348
349 let handle_ref = unsafe { &mut *handle };
351 handle_ref.cancel();
352
353 LAMINAR_OK
354}
355
356#[no_mangle]
372pub unsafe extern "C" fn laminar_subscription_is_active(
373 handle: *mut LaminarSubscriptionHandle,
374 out: *mut bool,
375) -> i32 {
376 clear_last_error();
377
378 if handle.is_null() || out.is_null() {
379 return LAMINAR_ERR_NULL_POINTER;
380 }
381
382 let handle_ref = unsafe { &*handle };
384 let active = !handle_ref.cancelled.load(Ordering::SeqCst) && handle_ref.thread_handle.is_some();
385
386 unsafe { *out = active };
387
388 LAMINAR_OK
389}
390
391#[no_mangle]
405pub unsafe extern "C" fn laminar_subscription_user_data(
406 handle: *mut LaminarSubscriptionHandle,
407) -> *mut c_void {
408 if handle.is_null() {
409 return std::ptr::null_mut();
410 }
411
412 unsafe { (*handle).user_data }
414}
415
416#[no_mangle]
428pub unsafe extern "C" fn laminar_subscription_free(handle: *mut LaminarSubscriptionHandle) {
429 if !handle.is_null() {
430 let mut boxed = unsafe { Box::from_raw(handle) };
432 boxed.cancel();
434 drop(boxed);
435 }
436}
437
438#[cfg(test)]
444#[allow(
445 clippy::borrow_as_ptr,
446 clippy::manual_c_str_literals,
447 clippy::items_after_statements
448)]
449mod tests {
450 use super::*;
451 use crate::ffi::connection::{laminar_close, laminar_execute, laminar_open};
452 use std::ptr;
453 use std::sync::atomic::AtomicUsize;
454
455 #[test]
456 fn test_event_type_constants() {
457 assert_eq!(LAMINAR_EVENT_INSERT, 0);
458 assert_eq!(LAMINAR_EVENT_DELETE, 1);
459 assert_eq!(LAMINAR_EVENT_UPDATE, 2);
460 assert_eq!(LAMINAR_EVENT_WATERMARK, 3);
461 assert_eq!(LAMINAR_EVENT_SNAPSHOT, 4);
462 }
463
464 #[test]
465 fn test_subscribe_null_pointer() {
466 let mut out: *mut LaminarSubscriptionHandle = ptr::null_mut();
467
468 let rc = unsafe {
470 laminar_subscribe_callback(
471 ptr::null_mut(),
472 b"SELECT 1\0".as_ptr().cast(),
473 None,
474 None,
475 ptr::null_mut(),
476 &mut out,
477 )
478 };
479 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
480
481 let mut conn: *mut LaminarConnection = ptr::null_mut();
483 unsafe { laminar_open(&mut conn) };
484
485 let rc = unsafe {
486 laminar_subscribe_callback(conn, ptr::null(), None, None, ptr::null_mut(), &mut out)
487 };
488 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
489
490 let rc = unsafe {
492 laminar_subscribe_callback(
493 conn,
494 b"SELECT 1\0".as_ptr().cast(),
495 None,
496 None,
497 ptr::null_mut(),
498 ptr::null_mut(),
499 )
500 };
501 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
502
503 unsafe { laminar_close(conn) };
504 }
505
506 #[test]
507 fn test_subscription_cancel_null() {
508 let rc = unsafe { laminar_subscription_cancel(ptr::null_mut()) };
509 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
510 }
511
512 #[test]
513 fn test_subscription_free_null() {
514 unsafe { laminar_subscription_free(ptr::null_mut()) };
516 }
517
518 #[test]
519 fn test_subscription_user_data_null() {
520 let result = unsafe { laminar_subscription_user_data(ptr::null_mut()) };
521 assert!(result.is_null());
522 }
523
524 #[test]
525 fn test_subscribe_and_cancel() {
526 let mut conn: *mut LaminarConnection = ptr::null_mut();
527 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
528
529 unsafe {
530 laminar_open(&mut conn);
531
532 let sql = b"CREATE TABLE callback_test (id BIGINT)\0";
534 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
535
536 let query = b"SELECT * FROM callback_test\0";
538 let rc = laminar_subscribe_callback(
539 conn,
540 query.as_ptr().cast(),
541 None,
542 None,
543 ptr::null_mut(),
544 &mut sub,
545 );
546 assert_eq!(rc, LAMINAR_OK);
547 assert!(!sub.is_null());
548
549 let mut active = false;
551 let rc = laminar_subscription_is_active(sub, &mut active);
552 assert_eq!(rc, LAMINAR_OK);
553 let rc = laminar_subscription_cancel(sub);
557 assert_eq!(rc, LAMINAR_OK);
558
559 let rc = laminar_subscription_is_active(sub, &mut active);
561 assert_eq!(rc, LAMINAR_OK);
562 assert!(!active);
563
564 laminar_subscription_free(sub);
565 laminar_close(conn);
566 }
567 }
568
569 #[test]
570 fn test_subscribe_with_user_data() {
571 let mut conn: *mut LaminarConnection = ptr::null_mut();
572 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
573
574 static COUNTER: AtomicUsize = AtomicUsize::new(42);
576
577 unsafe {
578 laminar_open(&mut conn);
579
580 let sql = b"CREATE TABLE userdata_test (id BIGINT)\0";
581 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
582
583 let query = b"SELECT * FROM userdata_test\0";
584 let user_data = std::ptr::addr_of!(COUNTER) as *mut c_void;
585
586 let rc = laminar_subscribe_callback(
587 conn,
588 query.as_ptr().cast(),
589 None,
590 None,
591 user_data,
592 &mut sub,
593 );
594 assert_eq!(rc, LAMINAR_OK);
595
596 let retrieved = laminar_subscription_user_data(sub);
598 assert_eq!(retrieved, user_data);
599
600 laminar_subscription_cancel(sub);
601 laminar_subscription_free(sub);
602 laminar_close(conn);
603 }
604 }
605
606 static DATA_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
608 static ERROR_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
609
610 unsafe extern "C" fn test_data_callback(
611 _user_data: *mut c_void,
612 batch: *mut LaminarRecordBatch,
613 _event_type: i32,
614 ) {
615 DATA_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
616 if !batch.is_null() {
618 crate::ffi::query::laminar_batch_free(batch);
619 }
620 }
621
622 unsafe extern "C" fn test_error_callback(
623 _user_data: *mut c_void,
624 _error_code: i32,
625 _error_message: *const c_char,
626 ) {
627 ERROR_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
628 }
629
630 #[test]
631 fn test_subscribe_with_callbacks() {
632 DATA_CALLBACK_COUNT.store(0, Ordering::SeqCst);
634 ERROR_CALLBACK_COUNT.store(0, Ordering::SeqCst);
635
636 let mut conn: *mut LaminarConnection = ptr::null_mut();
637 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
638
639 unsafe {
640 laminar_open(&mut conn);
641
642 let sql = b"CREATE TABLE callback_data_test (id BIGINT)\0";
643 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
644
645 let query = b"SELECT * FROM callback_data_test\0";
646 let rc = laminar_subscribe_callback(
647 conn,
648 query.as_ptr().cast(),
649 Some(test_data_callback),
650 Some(test_error_callback),
651 ptr::null_mut(),
652 &mut sub,
653 );
654 assert_eq!(rc, LAMINAR_OK);
655
656 std::thread::sleep(std::time::Duration::from_millis(50));
658
659 laminar_subscription_cancel(sub);
660 laminar_subscription_free(sub);
661 laminar_close(conn);
662 }
663
664 }
667
668 #[test]
669 fn test_subscription_is_active_null_pointer() {
670 let mut active = true;
671 let rc = unsafe { laminar_subscription_is_active(ptr::null_mut(), &mut active) };
672 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
673
674 let mut conn: *mut LaminarConnection = ptr::null_mut();
675 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
676
677 unsafe {
678 laminar_open(&mut conn);
679 let sql = b"CREATE TABLE active_test (id BIGINT)\0";
680 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
681
682 let query = b"SELECT * FROM active_test\0";
683 laminar_subscribe_callback(
684 conn,
685 query.as_ptr().cast(),
686 None,
687 None,
688 ptr::null_mut(),
689 &mut sub,
690 );
691
692 let rc = laminar_subscription_is_active(sub, ptr::null_mut());
694 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
695
696 laminar_subscription_cancel(sub);
697 laminar_subscription_free(sub);
698 laminar_close(conn);
699 }
700 }
701}