Skip to main content

laminar_db/ffi/
callback.rs

1//! Async FFI callbacks for push-based notifications.
2
3use std::ffi::{c_char, c_void, CStr, CString};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread::{self, JoinHandle};
7
8use super::connection::LaminarConnection;
9use super::error::{
10    clear_last_error, set_last_error, LAMINAR_ERR_INVALID_UTF8, LAMINAR_ERR_NULL_POINTER,
11    LAMINAR_OK,
12};
13use super::query::LaminarRecordBatch;
14
15// ============================================================================
16// Event Type Constants
17// ============================================================================
18
19/// Event type: insert (+1 weight).
20pub const LAMINAR_EVENT_INSERT: i32 = 0;
21/// Event type: delete (-1 weight).
22pub const LAMINAR_EVENT_DELETE: i32 = 1;
23/// Event type: update (decomposed to delete + insert).
24pub const LAMINAR_EVENT_UPDATE: i32 = 2;
25/// Event type: watermark progress (no data).
26pub const LAMINAR_EVENT_WATERMARK: i32 = 3;
27/// Event type: snapshot (initial state load).
28pub const LAMINAR_EVENT_SNAPSHOT: i32 = 4;
29
30// ============================================================================
31// Callback Function Types
32// ============================================================================
33
34/// Callback function type for subscription data.
35///
36/// # Arguments
37///
38/// * `user_data` - Opaque pointer passed to `laminar_subscribe_callback`
39/// * `batch` - Record batch (ownership transferred to callback, must free)
40/// * `event_type` - One of `LAMINAR_EVENT_*` constants
41///
42/// # Safety
43///
44/// The callback must free the batch with `laminar_batch_free`.
45pub type LaminarSubscriptionCallback = Option<
46    unsafe extern "C" fn(user_data: *mut c_void, batch: *mut LaminarRecordBatch, event_type: i32),
47>;
48
49/// Callback function type for errors.
50///
51/// # Arguments
52///
53/// * `user_data` - Opaque pointer passed to `laminar_subscribe_callback`
54/// * `error_code` - One of `LAMINAR_ERR_*` constants
55/// * `error_message` - Error message (valid only during callback)
56pub type LaminarErrorCallback = Option<
57    unsafe extern "C" fn(user_data: *mut c_void, error_code: i32, error_message: *const c_char),
58>;
59
60// ============================================================================
61// Subscription Handle
62// ============================================================================
63
64/// Opaque handle for a callback-based subscription.
65///
66/// Created by `laminar_subscribe_callback`, freed by `laminar_subscription_free`.
67#[repr(C)]
68pub struct LaminarSubscriptionHandle {
69    /// Flag to signal cancellation to the background thread.
70    cancelled: Arc<AtomicBool>,
71    /// Background thread handle (if still running).
72    thread_handle: Option<JoinHandle<()>>,
73    /// User data pointer (stored for reference, not owned).
74    user_data: *mut c_void,
75}
76
77// SAFETY: The handle can be sent between threads. The user_data pointer
78// is the caller's responsibility to ensure thread safety.
79unsafe impl Send for LaminarSubscriptionHandle {}
80
81impl LaminarSubscriptionHandle {
82    /// Create a new subscription handle.
83    fn new(
84        cancelled: Arc<AtomicBool>,
85        thread_handle: JoinHandle<()>,
86        user_data: *mut c_void,
87    ) -> Self {
88        Self {
89            cancelled,
90            thread_handle: Some(thread_handle),
91            user_data,
92        }
93    }
94
95    /// Cancel the subscription and wait for the thread to stop.
96    fn cancel(&mut self) {
97        // Signal cancellation
98        self.cancelled.store(true, Ordering::SeqCst);
99
100        // Wait for thread to finish
101        if let Some(handle) = self.thread_handle.take() {
102            let _ = handle.join();
103        }
104    }
105}
106
107// ============================================================================
108// Callback Context
109// ============================================================================
110
111/// Internal context passed to the background subscription thread.
112struct CallbackContext {
113    /// User data pointer.
114    user_data: *mut c_void,
115    /// Data callback.
116    on_data: LaminarSubscriptionCallback,
117    /// Error callback.
118    on_error: LaminarErrorCallback,
119    /// Cancellation flag.
120    cancelled: Arc<AtomicBool>,
121}
122
123// SAFETY: CallbackContext is only accessed from the subscription thread.
124// The user_data pointer is the caller's responsibility.
125unsafe impl Send for CallbackContext {}
126
127impl CallbackContext {
128    /// Invoke the data callback if set.
129    fn call_on_data(&self, batch: LaminarRecordBatch, event_type: i32) {
130        if let Some(callback) = self.on_data {
131            let batch_ptr = Box::into_raw(Box::new(batch));
132            // SAFETY: Callback is provided by FFI caller, we pass valid pointers.
133            unsafe {
134                callback(self.user_data, batch_ptr, event_type);
135            }
136        }
137    }
138
139    /// Invoke the error callback if set.
140    fn call_on_error(&self, error_code: i32, message: &str) {
141        if let Some(callback) = self.on_error {
142            let c_message = CString::new(message)
143                .unwrap_or_else(|_| CString::new("Error message contained null byte").unwrap());
144            // SAFETY: Callback is provided by FFI caller, we pass valid pointers.
145            unsafe {
146                callback(self.user_data, error_code, c_message.as_ptr());
147            }
148        }
149    }
150
151    /// Check if cancellation has been requested.
152    fn is_cancelled(&self) -> bool {
153        self.cancelled.load(Ordering::SeqCst)
154    }
155}
156
157// ============================================================================
158// FFI Functions
159// ============================================================================
160
161/// Create a callback-based subscription.
162///
163/// The `on_data` callback is invoked from a background thread when new data
164/// arrives. The callback receives ownership of the batch and must free it.
165///
166/// # Arguments
167///
168/// * `conn` - Database connection
169/// * `query` - SQL query string (null-terminated)
170/// * `on_data` - Callback for data batches (may be NULL to ignore data)
171/// * `on_error` - Callback for errors (may be NULL to ignore errors)
172/// * `user_data` - Opaque pointer passed to callbacks
173/// * `out` - Pointer to receive subscription handle
174///
175/// # Returns
176///
177/// `LAMINAR_OK` on success, or an error code.
178///
179/// # Safety
180///
181/// * `conn` must be a valid connection handle
182/// * `query` must be a valid null-terminated UTF-8 string
183/// * `out` must be a valid pointer
184/// * Callbacks must be thread-safe if `user_data` is shared
185#[no_mangle]
186pub unsafe extern "C" fn laminar_subscribe_callback(
187    conn: *mut LaminarConnection,
188    query: *const c_char,
189    on_data: LaminarSubscriptionCallback,
190    on_error: LaminarErrorCallback,
191    user_data: *mut c_void,
192    out: *mut *mut LaminarSubscriptionHandle,
193) -> i32 {
194    clear_last_error();
195
196    if conn.is_null() || query.is_null() || out.is_null() {
197        return LAMINAR_ERR_NULL_POINTER;
198    }
199
200    // Parse query string
201    let Ok(query_str) = (unsafe { CStr::from_ptr(query) }).to_str() else {
202        return LAMINAR_ERR_INVALID_UTF8;
203    };
204
205    // SAFETY: conn is non-null (checked above)
206    let conn_ref = unsafe { &(*conn).inner };
207
208    // Create the subscription (using query_stream for streaming results)
209    let stream = match conn_ref.query_stream(query_str) {
210        Ok(s) => s,
211        Err(e) => {
212            let code = e.code();
213            set_last_error(e);
214            return code;
215        }
216    };
217
218    // Set up cancellation flag
219    let cancelled = Arc::new(AtomicBool::new(false));
220    let cancelled_clone = Arc::clone(&cancelled);
221
222    // Create callback context
223    let ctx = CallbackContext {
224        user_data,
225        on_data,
226        on_error,
227        cancelled: cancelled_clone,
228    };
229
230    // Spawn background thread to drive the subscription
231    let thread_handle = thread::spawn(move || {
232        subscription_thread(stream, ctx);
233    });
234
235    // Create handle
236    let handle = Box::new(LaminarSubscriptionHandle::new(
237        cancelled,
238        thread_handle,
239        user_data,
240    ));
241
242    // SAFETY: out is non-null (checked above)
243    unsafe { *out = Box::into_raw(handle) };
244
245    LAMINAR_OK
246}
247
248/// Background thread function that drives the subscription.
249#[allow(clippy::needless_pass_by_value)] // ctx is owned by the thread
250fn subscription_thread(mut stream: crate::api::QueryStream, ctx: CallbackContext) {
251    loop {
252        // Check for cancellation
253        if ctx.is_cancelled() {
254            break;
255        }
256
257        // Try to get next batch (non-blocking to allow cancellation checks)
258        match stream.try_next() {
259            Ok(Some(batch)) => {
260                // Determine event type (default to insert for query results)
261                let event_type = LAMINAR_EVENT_INSERT;
262                ctx.call_on_data(LaminarRecordBatch::new(batch), event_type);
263            }
264            Ok(None) => {
265                // No data available, check if stream is exhausted
266                if !stream.is_active() {
267                    break;
268                }
269                // Brief sleep to avoid busy-waiting
270                std::thread::sleep(std::time::Duration::from_millis(1));
271            }
272            Err(e) => {
273                ctx.call_on_error(e.code(), e.message());
274                // Continue unless fatal
275                if !stream.is_active() {
276                    break;
277                }
278            }
279        }
280    }
281}
282
283/// Cancel a callback-based subscription.
284///
285/// After this function returns, no more callbacks will be invoked.
286/// It is safe to free resources referenced by `user_data` after this returns.
287///
288/// # Arguments
289///
290/// * `handle` - Subscription handle
291///
292/// # Returns
293///
294/// `LAMINAR_OK` on success, or an error code.
295///
296/// # Safety
297///
298/// `handle` must be a valid subscription handle.
299#[no_mangle]
300pub unsafe extern "C" fn laminar_subscription_cancel(
301    handle: *mut LaminarSubscriptionHandle,
302) -> i32 {
303    clear_last_error();
304
305    if handle.is_null() {
306        return LAMINAR_ERR_NULL_POINTER;
307    }
308
309    // SAFETY: handle is non-null (checked above)
310    let handle_ref = unsafe { &mut *handle };
311    handle_ref.cancel();
312
313    LAMINAR_OK
314}
315
316/// Check if a subscription is still active.
317///
318/// # Arguments
319///
320/// * `handle` - Subscription handle
321/// * `out` - Pointer to receive result (true if active)
322///
323/// # Returns
324///
325/// `LAMINAR_OK` on success, or an error code.
326///
327/// # Safety
328///
329/// * `handle` must be a valid subscription handle
330/// * `out` must be a valid pointer
331#[no_mangle]
332pub unsafe extern "C" fn laminar_subscription_is_active(
333    handle: *mut LaminarSubscriptionHandle,
334    out: *mut bool,
335) -> i32 {
336    clear_last_error();
337
338    if handle.is_null() || out.is_null() {
339        return LAMINAR_ERR_NULL_POINTER;
340    }
341
342    // SAFETY: handle and out are non-null (checked above)
343    let handle_ref = unsafe { &*handle };
344    let active = !handle_ref.cancelled.load(Ordering::SeqCst) && handle_ref.thread_handle.is_some();
345
346    unsafe { *out = active };
347
348    LAMINAR_OK
349}
350
351/// Get the user data pointer from a subscription handle.
352///
353/// # Arguments
354///
355/// * `handle` - Subscription handle
356///
357/// # Returns
358///
359/// The user data pointer passed to `laminar_subscribe_callback`, or NULL.
360///
361/// # Safety
362///
363/// `handle` must be a valid subscription handle or NULL.
364#[no_mangle]
365pub unsafe extern "C" fn laminar_subscription_user_data(
366    handle: *mut LaminarSubscriptionHandle,
367) -> *mut c_void {
368    if handle.is_null() {
369        return std::ptr::null_mut();
370    }
371
372    // SAFETY: handle is non-null (checked above)
373    unsafe { (*handle).user_data }
374}
375
376/// Free a subscription handle.
377///
378/// If the subscription is still active, it will be cancelled first.
379///
380/// # Arguments
381///
382/// * `handle` - Subscription handle to free
383///
384/// # Safety
385///
386/// `handle` must be a valid handle from `laminar_subscribe_callback`, or NULL.
387#[no_mangle]
388pub unsafe extern "C" fn laminar_subscription_free(handle: *mut LaminarSubscriptionHandle) {
389    if !handle.is_null() {
390        // SAFETY: handle is non-null and was allocated by Box
391        let mut boxed = unsafe { Box::from_raw(handle) };
392        // Cancel if still active
393        boxed.cancel();
394        drop(boxed);
395    }
396}
397
398// Note: Async write operations (laminar_writer_write_async) are deferred to a
399// future release. They require careful synchronization (Arc<Mutex<Writer>>)
400// that is beyond the scope of this initial callback implementation.
401// For now, use the synchronous laminar_writer_write() API.
402
403#[cfg(test)]
404#[allow(
405    clippy::borrow_as_ptr,
406    clippy::manual_c_str_literals,
407    clippy::items_after_statements
408)]
409mod tests {
410    use super::*;
411    use crate::ffi::connection::{laminar_close, laminar_execute, laminar_open};
412    use std::ptr;
413    use std::sync::atomic::AtomicUsize;
414
415    #[test]
416    fn test_event_type_constants() {
417        assert_eq!(LAMINAR_EVENT_INSERT, 0);
418        assert_eq!(LAMINAR_EVENT_DELETE, 1);
419        assert_eq!(LAMINAR_EVENT_UPDATE, 2);
420        assert_eq!(LAMINAR_EVENT_WATERMARK, 3);
421        assert_eq!(LAMINAR_EVENT_SNAPSHOT, 4);
422    }
423
424    #[test]
425    fn test_subscribe_null_pointer() {
426        let mut out: *mut LaminarSubscriptionHandle = ptr::null_mut();
427
428        // Null connection
429        let rc = unsafe {
430            laminar_subscribe_callback(
431                ptr::null_mut(),
432                b"SELECT 1\0".as_ptr().cast(),
433                None,
434                None,
435                ptr::null_mut(),
436                &mut out,
437            )
438        };
439        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
440
441        // Null query
442        let mut conn: *mut LaminarConnection = ptr::null_mut();
443        unsafe { laminar_open(&mut conn) };
444
445        let rc = unsafe {
446            laminar_subscribe_callback(conn, ptr::null(), None, None, ptr::null_mut(), &mut out)
447        };
448        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
449
450        // Null out
451        let rc = unsafe {
452            laminar_subscribe_callback(
453                conn,
454                b"SELECT 1\0".as_ptr().cast(),
455                None,
456                None,
457                ptr::null_mut(),
458                ptr::null_mut(),
459            )
460        };
461        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
462
463        unsafe { laminar_close(conn) };
464    }
465
466    #[test]
467    fn test_subscription_cancel_null() {
468        let rc = unsafe { laminar_subscription_cancel(ptr::null_mut()) };
469        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
470    }
471
472    #[test]
473    fn test_subscription_free_null() {
474        // Should not crash
475        unsafe { laminar_subscription_free(ptr::null_mut()) };
476    }
477
478    #[test]
479    fn test_subscription_user_data_null() {
480        let result = unsafe { laminar_subscription_user_data(ptr::null_mut()) };
481        assert!(result.is_null());
482    }
483
484    #[test]
485    fn test_subscribe_and_cancel() {
486        let mut conn: *mut LaminarConnection = ptr::null_mut();
487        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
488
489        unsafe {
490            laminar_open(&mut conn);
491
492            // Create a table for querying
493            let sql = b"CREATE TABLE callback_test (id BIGINT)\0";
494            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
495
496            // Subscribe (no callbacks, just test lifecycle)
497            let query = b"SELECT * FROM callback_test\0";
498            let rc = laminar_subscribe_callback(
499                conn,
500                query.as_ptr().cast(),
501                None,
502                None,
503                ptr::null_mut(),
504                &mut sub,
505            );
506            assert_eq!(rc, LAMINAR_OK);
507            assert!(!sub.is_null());
508
509            // Check active
510            let mut active = false;
511            let rc = laminar_subscription_is_active(sub, &mut active);
512            assert_eq!(rc, LAMINAR_OK);
513            // Note: may be false if stream completed immediately
514
515            // Cancel
516            let rc = laminar_subscription_cancel(sub);
517            assert_eq!(rc, LAMINAR_OK);
518
519            // Should no longer be active
520            let rc = laminar_subscription_is_active(sub, &mut active);
521            assert_eq!(rc, LAMINAR_OK);
522            assert!(!active);
523
524            laminar_subscription_free(sub);
525            laminar_close(conn);
526        }
527    }
528
529    #[test]
530    fn test_subscribe_with_user_data() {
531        let mut conn: *mut LaminarConnection = ptr::null_mut();
532        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
533
534        // Use a counter as user data
535        static COUNTER: AtomicUsize = AtomicUsize::new(42);
536
537        unsafe {
538            laminar_open(&mut conn);
539
540            let sql = b"CREATE TABLE userdata_test (id BIGINT)\0";
541            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
542
543            let query = b"SELECT * FROM userdata_test\0";
544            let user_data = std::ptr::addr_of!(COUNTER) as *mut c_void;
545
546            let rc = laminar_subscribe_callback(
547                conn,
548                query.as_ptr().cast(),
549                None,
550                None,
551                user_data,
552                &mut sub,
553            );
554            assert_eq!(rc, LAMINAR_OK);
555
556            // Verify user data is preserved
557            let retrieved = laminar_subscription_user_data(sub);
558            assert_eq!(retrieved, user_data);
559
560            laminar_subscription_cancel(sub);
561            laminar_subscription_free(sub);
562            laminar_close(conn);
563        }
564    }
565
566    // Static counters for callback tests
567    static DATA_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
568    static ERROR_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
569
570    unsafe extern "C" fn test_data_callback(
571        _user_data: *mut c_void,
572        batch: *mut LaminarRecordBatch,
573        _event_type: i32,
574    ) {
575        DATA_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
576        // Must free the batch
577        if !batch.is_null() {
578            crate::ffi::query::laminar_batch_free(batch);
579        }
580    }
581
582    unsafe extern "C" fn test_error_callback(
583        _user_data: *mut c_void,
584        _error_code: i32,
585        _error_message: *const c_char,
586    ) {
587        ERROR_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
588    }
589
590    #[test]
591    fn test_subscribe_with_callbacks() {
592        // Reset counters
593        DATA_CALLBACK_COUNT.store(0, Ordering::SeqCst);
594        ERROR_CALLBACK_COUNT.store(0, Ordering::SeqCst);
595
596        let mut conn: *mut LaminarConnection = ptr::null_mut();
597        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
598
599        unsafe {
600            laminar_open(&mut conn);
601
602            let sql = b"CREATE TABLE callback_data_test (id BIGINT)\0";
603            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
604
605            let query = b"SELECT * FROM callback_data_test\0";
606            let rc = laminar_subscribe_callback(
607                conn,
608                query.as_ptr().cast(),
609                Some(test_data_callback),
610                Some(test_error_callback),
611                ptr::null_mut(),
612                &mut sub,
613            );
614            assert_eq!(rc, LAMINAR_OK);
615
616            // Give a moment for the subscription thread to run
617            std::thread::sleep(std::time::Duration::from_millis(50));
618
619            laminar_subscription_cancel(sub);
620            laminar_subscription_free(sub);
621            laminar_close(conn);
622        }
623
624        // Note: callbacks may or may not have fired depending on timing
625        // The important thing is no crashes occurred
626    }
627
628    #[test]
629    fn test_subscription_is_active_null_pointer() {
630        let mut active = true;
631        let rc = unsafe { laminar_subscription_is_active(ptr::null_mut(), &mut active) };
632        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
633
634        let mut conn: *mut LaminarConnection = ptr::null_mut();
635        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
636
637        unsafe {
638            laminar_open(&mut conn);
639            let sql = b"CREATE TABLE active_test (id BIGINT)\0";
640            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
641
642            let query = b"SELECT * FROM active_test\0";
643            laminar_subscribe_callback(
644                conn,
645                query.as_ptr().cast(),
646                None,
647                None,
648                ptr::null_mut(),
649                &mut sub,
650            );
651
652            // Null out pointer
653            let rc = laminar_subscription_is_active(sub, ptr::null_mut());
654            assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
655
656            laminar_subscription_cancel(sub);
657            laminar_subscription_free(sub);
658            laminar_close(conn);
659        }
660    }
661}