1use std::ffi::{c_char, c_void, CStr, CString};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread::{self, JoinHandle};
7
8use super::connection::LaminarConnection;
9use super::error::{
10 clear_last_error, set_last_error, LAMINAR_ERR_INVALID_UTF8, LAMINAR_ERR_NULL_POINTER,
11 LAMINAR_OK,
12};
13use super::query::LaminarRecordBatch;
14
15pub const LAMINAR_EVENT_INSERT: i32 = 0;
21pub const LAMINAR_EVENT_DELETE: i32 = 1;
23pub const LAMINAR_EVENT_UPDATE: i32 = 2;
25pub const LAMINAR_EVENT_WATERMARK: i32 = 3;
27pub const LAMINAR_EVENT_SNAPSHOT: i32 = 4;
29
30pub type LaminarSubscriptionCallback = Option<
46 unsafe extern "C" fn(user_data: *mut c_void, batch: *mut LaminarRecordBatch, event_type: i32),
47>;
48
49pub type LaminarErrorCallback = Option<
57 unsafe extern "C" fn(user_data: *mut c_void, error_code: i32, error_message: *const c_char),
58>;
59
60#[repr(C)]
68pub struct LaminarSubscriptionHandle {
69 cancelled: Arc<AtomicBool>,
71 thread_handle: Option<JoinHandle<()>>,
73 user_data: *mut c_void,
75}
76
77unsafe impl Send for LaminarSubscriptionHandle {}
80
81impl LaminarSubscriptionHandle {
82 fn new(
84 cancelled: Arc<AtomicBool>,
85 thread_handle: JoinHandle<()>,
86 user_data: *mut c_void,
87 ) -> Self {
88 Self {
89 cancelled,
90 thread_handle: Some(thread_handle),
91 user_data,
92 }
93 }
94
95 fn cancel(&mut self) {
97 self.cancelled.store(true, Ordering::SeqCst);
99
100 if let Some(handle) = self.thread_handle.take() {
102 let _ = handle.join();
103 }
104 }
105}
106
107struct CallbackContext {
113 user_data: *mut c_void,
115 on_data: LaminarSubscriptionCallback,
117 on_error: LaminarErrorCallback,
119 cancelled: Arc<AtomicBool>,
121}
122
123unsafe impl Send for CallbackContext {}
126
127impl CallbackContext {
128 fn call_on_data(&self, batch: LaminarRecordBatch, event_type: i32) {
130 if let Some(callback) = self.on_data {
131 let batch_ptr = Box::into_raw(Box::new(batch));
132 unsafe {
134 callback(self.user_data, batch_ptr, event_type);
135 }
136 }
137 }
138
139 fn call_on_error(&self, error_code: i32, message: &str) {
141 if let Some(callback) = self.on_error {
142 let c_message = CString::new(message)
143 .unwrap_or_else(|_| CString::new("Error message contained null byte").unwrap());
144 unsafe {
146 callback(self.user_data, error_code, c_message.as_ptr());
147 }
148 }
149 }
150
151 fn is_cancelled(&self) -> bool {
153 self.cancelled.load(Ordering::SeqCst)
154 }
155}
156
157#[no_mangle]
186pub unsafe extern "C" fn laminar_subscribe_callback(
187 conn: *mut LaminarConnection,
188 query: *const c_char,
189 on_data: LaminarSubscriptionCallback,
190 on_error: LaminarErrorCallback,
191 user_data: *mut c_void,
192 out: *mut *mut LaminarSubscriptionHandle,
193) -> i32 {
194 clear_last_error();
195
196 if conn.is_null() || query.is_null() || out.is_null() {
197 return LAMINAR_ERR_NULL_POINTER;
198 }
199
200 let Ok(query_str) = (unsafe { CStr::from_ptr(query) }).to_str() else {
202 return LAMINAR_ERR_INVALID_UTF8;
203 };
204
205 let conn_ref = unsafe { &(*conn).inner };
207
208 let stream = match conn_ref.query_stream(query_str) {
210 Ok(s) => s,
211 Err(e) => {
212 let code = e.code();
213 set_last_error(e);
214 return code;
215 }
216 };
217
218 let cancelled = Arc::new(AtomicBool::new(false));
220 let cancelled_clone = Arc::clone(&cancelled);
221
222 let ctx = CallbackContext {
224 user_data,
225 on_data,
226 on_error,
227 cancelled: cancelled_clone,
228 };
229
230 let thread_handle = thread::spawn(move || {
232 subscription_thread(stream, ctx);
233 });
234
235 let handle = Box::new(LaminarSubscriptionHandle::new(
237 cancelled,
238 thread_handle,
239 user_data,
240 ));
241
242 unsafe { *out = Box::into_raw(handle) };
244
245 LAMINAR_OK
246}
247
248#[allow(clippy::needless_pass_by_value)] fn subscription_thread(mut stream: crate::api::QueryStream, ctx: CallbackContext) {
251 loop {
252 if ctx.is_cancelled() {
254 break;
255 }
256
257 match stream.try_next() {
259 Ok(Some(batch)) => {
260 let event_type = LAMINAR_EVENT_INSERT;
262 ctx.call_on_data(LaminarRecordBatch::new(batch), event_type);
263 }
264 Ok(None) => {
265 if !stream.is_active() {
267 break;
268 }
269 std::thread::sleep(std::time::Duration::from_millis(1));
271 }
272 Err(e) => {
273 ctx.call_on_error(e.code(), e.message());
274 if !stream.is_active() {
276 break;
277 }
278 }
279 }
280 }
281}
282
283#[no_mangle]
300pub unsafe extern "C" fn laminar_subscription_cancel(
301 handle: *mut LaminarSubscriptionHandle,
302) -> i32 {
303 clear_last_error();
304
305 if handle.is_null() {
306 return LAMINAR_ERR_NULL_POINTER;
307 }
308
309 let handle_ref = unsafe { &mut *handle };
311 handle_ref.cancel();
312
313 LAMINAR_OK
314}
315
316#[no_mangle]
332pub unsafe extern "C" fn laminar_subscription_is_active(
333 handle: *mut LaminarSubscriptionHandle,
334 out: *mut bool,
335) -> i32 {
336 clear_last_error();
337
338 if handle.is_null() || out.is_null() {
339 return LAMINAR_ERR_NULL_POINTER;
340 }
341
342 let handle_ref = unsafe { &*handle };
344 let active = !handle_ref.cancelled.load(Ordering::SeqCst) && handle_ref.thread_handle.is_some();
345
346 unsafe { *out = active };
347
348 LAMINAR_OK
349}
350
351#[no_mangle]
365pub unsafe extern "C" fn laminar_subscription_user_data(
366 handle: *mut LaminarSubscriptionHandle,
367) -> *mut c_void {
368 if handle.is_null() {
369 return std::ptr::null_mut();
370 }
371
372 unsafe { (*handle).user_data }
374}
375
376#[no_mangle]
388pub unsafe extern "C" fn laminar_subscription_free(handle: *mut LaminarSubscriptionHandle) {
389 if !handle.is_null() {
390 let mut boxed = unsafe { Box::from_raw(handle) };
392 boxed.cancel();
394 drop(boxed);
395 }
396}
397
398#[cfg(test)]
404#[allow(
405 clippy::borrow_as_ptr,
406 clippy::manual_c_str_literals,
407 clippy::items_after_statements
408)]
409mod tests {
410 use super::*;
411 use crate::ffi::connection::{laminar_close, laminar_execute, laminar_open};
412 use std::ptr;
413 use std::sync::atomic::AtomicUsize;
414
415 #[test]
416 fn test_event_type_constants() {
417 assert_eq!(LAMINAR_EVENT_INSERT, 0);
418 assert_eq!(LAMINAR_EVENT_DELETE, 1);
419 assert_eq!(LAMINAR_EVENT_UPDATE, 2);
420 assert_eq!(LAMINAR_EVENT_WATERMARK, 3);
421 assert_eq!(LAMINAR_EVENT_SNAPSHOT, 4);
422 }
423
424 #[test]
425 fn test_subscribe_null_pointer() {
426 let mut out: *mut LaminarSubscriptionHandle = ptr::null_mut();
427
428 let rc = unsafe {
430 laminar_subscribe_callback(
431 ptr::null_mut(),
432 b"SELECT 1\0".as_ptr().cast(),
433 None,
434 None,
435 ptr::null_mut(),
436 &mut out,
437 )
438 };
439 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
440
441 let mut conn: *mut LaminarConnection = ptr::null_mut();
443 unsafe { laminar_open(&mut conn) };
444
445 let rc = unsafe {
446 laminar_subscribe_callback(conn, ptr::null(), None, None, ptr::null_mut(), &mut out)
447 };
448 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
449
450 let rc = unsafe {
452 laminar_subscribe_callback(
453 conn,
454 b"SELECT 1\0".as_ptr().cast(),
455 None,
456 None,
457 ptr::null_mut(),
458 ptr::null_mut(),
459 )
460 };
461 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
462
463 unsafe { laminar_close(conn) };
464 }
465
466 #[test]
467 fn test_subscription_cancel_null() {
468 let rc = unsafe { laminar_subscription_cancel(ptr::null_mut()) };
469 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
470 }
471
472 #[test]
473 fn test_subscription_free_null() {
474 unsafe { laminar_subscription_free(ptr::null_mut()) };
476 }
477
478 #[test]
479 fn test_subscription_user_data_null() {
480 let result = unsafe { laminar_subscription_user_data(ptr::null_mut()) };
481 assert!(result.is_null());
482 }
483
484 #[test]
485 fn test_subscribe_and_cancel() {
486 let mut conn: *mut LaminarConnection = ptr::null_mut();
487 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
488
489 unsafe {
490 laminar_open(&mut conn);
491
492 let sql = b"CREATE TABLE callback_test (id BIGINT)\0";
494 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
495
496 let query = b"SELECT * FROM callback_test\0";
498 let rc = laminar_subscribe_callback(
499 conn,
500 query.as_ptr().cast(),
501 None,
502 None,
503 ptr::null_mut(),
504 &mut sub,
505 );
506 assert_eq!(rc, LAMINAR_OK);
507 assert!(!sub.is_null());
508
509 let mut active = false;
511 let rc = laminar_subscription_is_active(sub, &mut active);
512 assert_eq!(rc, LAMINAR_OK);
513 let rc = laminar_subscription_cancel(sub);
517 assert_eq!(rc, LAMINAR_OK);
518
519 let rc = laminar_subscription_is_active(sub, &mut active);
521 assert_eq!(rc, LAMINAR_OK);
522 assert!(!active);
523
524 laminar_subscription_free(sub);
525 laminar_close(conn);
526 }
527 }
528
529 #[test]
530 fn test_subscribe_with_user_data() {
531 let mut conn: *mut LaminarConnection = ptr::null_mut();
532 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
533
534 static COUNTER: AtomicUsize = AtomicUsize::new(42);
536
537 unsafe {
538 laminar_open(&mut conn);
539
540 let sql = b"CREATE TABLE userdata_test (id BIGINT)\0";
541 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
542
543 let query = b"SELECT * FROM userdata_test\0";
544 let user_data = std::ptr::addr_of!(COUNTER) as *mut c_void;
545
546 let rc = laminar_subscribe_callback(
547 conn,
548 query.as_ptr().cast(),
549 None,
550 None,
551 user_data,
552 &mut sub,
553 );
554 assert_eq!(rc, LAMINAR_OK);
555
556 let retrieved = laminar_subscription_user_data(sub);
558 assert_eq!(retrieved, user_data);
559
560 laminar_subscription_cancel(sub);
561 laminar_subscription_free(sub);
562 laminar_close(conn);
563 }
564 }
565
566 static DATA_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
568 static ERROR_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
569
570 unsafe extern "C" fn test_data_callback(
571 _user_data: *mut c_void,
572 batch: *mut LaminarRecordBatch,
573 _event_type: i32,
574 ) {
575 DATA_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
576 if !batch.is_null() {
578 crate::ffi::query::laminar_batch_free(batch);
579 }
580 }
581
582 unsafe extern "C" fn test_error_callback(
583 _user_data: *mut c_void,
584 _error_code: i32,
585 _error_message: *const c_char,
586 ) {
587 ERROR_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
588 }
589
590 #[test]
591 fn test_subscribe_with_callbacks() {
592 DATA_CALLBACK_COUNT.store(0, Ordering::SeqCst);
594 ERROR_CALLBACK_COUNT.store(0, Ordering::SeqCst);
595
596 let mut conn: *mut LaminarConnection = ptr::null_mut();
597 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
598
599 unsafe {
600 laminar_open(&mut conn);
601
602 let sql = b"CREATE TABLE callback_data_test (id BIGINT)\0";
603 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
604
605 let query = b"SELECT * FROM callback_data_test\0";
606 let rc = laminar_subscribe_callback(
607 conn,
608 query.as_ptr().cast(),
609 Some(test_data_callback),
610 Some(test_error_callback),
611 ptr::null_mut(),
612 &mut sub,
613 );
614 assert_eq!(rc, LAMINAR_OK);
615
616 std::thread::sleep(std::time::Duration::from_millis(50));
618
619 laminar_subscription_cancel(sub);
620 laminar_subscription_free(sub);
621 laminar_close(conn);
622 }
623
624 }
627
628 #[test]
629 fn test_subscription_is_active_null_pointer() {
630 let mut active = true;
631 let rc = unsafe { laminar_subscription_is_active(ptr::null_mut(), &mut active) };
632 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
633
634 let mut conn: *mut LaminarConnection = ptr::null_mut();
635 let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
636
637 unsafe {
638 laminar_open(&mut conn);
639 let sql = b"CREATE TABLE active_test (id BIGINT)\0";
640 laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
641
642 let query = b"SELECT * FROM active_test\0";
643 laminar_subscribe_callback(
644 conn,
645 query.as_ptr().cast(),
646 None,
647 None,
648 ptr::null_mut(),
649 &mut sub,
650 );
651
652 let rc = laminar_subscription_is_active(sub, ptr::null_mut());
654 assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
655
656 laminar_subscription_cancel(sub);
657 laminar_subscription_free(sub);
658 laminar_close(conn);
659 }
660 }
661}