Skip to main content

laminar_db/ffi/
callback.rs

1//! Async FFI callbacks for push-based notifications.
2//!
3//! Provides callback-based APIs for subscriptions where the database notifies
4//! the caller when new data arrives, rather than requiring polling.
5//!
6//! # Thread Safety
7//!
8//! - Callbacks are invoked from a background thread
9//! - Callbacks for a single subscription are serialized (never concurrent)
10//! - After `laminar_subscription_cancel()` returns, no more callbacks will fire
11//!
12//! # Example
13//!
14//! ```c
15//! #include "laminar.h"
16//!
17//! void on_data(void* ctx, LaminarRecordBatch* batch, int32_t event_type) {
18//!     // Process the batch...
19//!     laminar_batch_free(batch);  // Must free
20//! }
21//!
22//! void on_error(void* ctx, int32_t code, const char* msg) {
23//!     fprintf(stderr, "Error %d: %s\n", code, msg);
24//! }
25//!
26//! int main() {
27//!     LaminarConnection* conn;
28//!     laminar_open(&conn);
29//!
30//!     LaminarSubscriptionHandle* sub;
31//!     laminar_subscribe_callback(conn, "SELECT * FROM trades",
32//!                                on_data, on_error, NULL, &sub);
33//!
34//!     // Callbacks fire in background...
35//!     sleep(60);
36//!
37//!     laminar_subscription_cancel(sub);
38//!     laminar_subscription_free(sub);
39//!     laminar_close(conn);
40//! }
41//! ```
42
43use std::ffi::{c_char, c_void, CStr, CString};
44use std::sync::atomic::{AtomicBool, Ordering};
45use std::sync::Arc;
46use std::thread::{self, JoinHandle};
47
48use super::connection::LaminarConnection;
49use super::error::{
50    clear_last_error, set_last_error, LAMINAR_ERR_INVALID_UTF8, LAMINAR_ERR_NULL_POINTER,
51    LAMINAR_OK,
52};
53use super::query::LaminarRecordBatch;
54
55// ============================================================================
56// Event Type Constants
57// ============================================================================
58
59/// Event type: insert (+1 weight).
60pub const LAMINAR_EVENT_INSERT: i32 = 0;
61/// Event type: delete (-1 weight).
62pub const LAMINAR_EVENT_DELETE: i32 = 1;
63/// Event type: update (decomposed to delete + insert).
64pub const LAMINAR_EVENT_UPDATE: i32 = 2;
65/// Event type: watermark progress (no data).
66pub const LAMINAR_EVENT_WATERMARK: i32 = 3;
67/// Event type: snapshot (initial state load).
68pub const LAMINAR_EVENT_SNAPSHOT: i32 = 4;
69
70// ============================================================================
71// Callback Function Types
72// ============================================================================
73
74/// Callback function type for subscription data.
75///
76/// # Arguments
77///
78/// * `user_data` - Opaque pointer passed to `laminar_subscribe_callback`
79/// * `batch` - Record batch (ownership transferred to callback, must free)
80/// * `event_type` - One of `LAMINAR_EVENT_*` constants
81///
82/// # Safety
83///
84/// The callback must free the batch with `laminar_batch_free`.
85pub type LaminarSubscriptionCallback = Option<
86    unsafe extern "C" fn(user_data: *mut c_void, batch: *mut LaminarRecordBatch, event_type: i32),
87>;
88
89/// Callback function type for errors.
90///
91/// # Arguments
92///
93/// * `user_data` - Opaque pointer passed to `laminar_subscribe_callback`
94/// * `error_code` - One of `LAMINAR_ERR_*` constants
95/// * `error_message` - Error message (valid only during callback)
96pub type LaminarErrorCallback = Option<
97    unsafe extern "C" fn(user_data: *mut c_void, error_code: i32, error_message: *const c_char),
98>;
99
100// ============================================================================
101// Subscription Handle
102// ============================================================================
103
104/// Opaque handle for a callback-based subscription.
105///
106/// Created by `laminar_subscribe_callback`, freed by `laminar_subscription_free`.
107#[repr(C)]
108pub struct LaminarSubscriptionHandle {
109    /// Flag to signal cancellation to the background thread.
110    cancelled: Arc<AtomicBool>,
111    /// Background thread handle (if still running).
112    thread_handle: Option<JoinHandle<()>>,
113    /// User data pointer (stored for reference, not owned).
114    user_data: *mut c_void,
115}
116
117// SAFETY: The handle can be sent between threads. The user_data pointer
118// is the caller's responsibility to ensure thread safety.
119unsafe impl Send for LaminarSubscriptionHandle {}
120
121impl LaminarSubscriptionHandle {
122    /// Create a new subscription handle.
123    fn new(
124        cancelled: Arc<AtomicBool>,
125        thread_handle: JoinHandle<()>,
126        user_data: *mut c_void,
127    ) -> Self {
128        Self {
129            cancelled,
130            thread_handle: Some(thread_handle),
131            user_data,
132        }
133    }
134
135    /// Cancel the subscription and wait for the thread to stop.
136    fn cancel(&mut self) {
137        // Signal cancellation
138        self.cancelled.store(true, Ordering::SeqCst);
139
140        // Wait for thread to finish
141        if let Some(handle) = self.thread_handle.take() {
142            let _ = handle.join();
143        }
144    }
145}
146
147// ============================================================================
148// Callback Context
149// ============================================================================
150
151/// Internal context passed to the background subscription thread.
152struct CallbackContext {
153    /// User data pointer.
154    user_data: *mut c_void,
155    /// Data callback.
156    on_data: LaminarSubscriptionCallback,
157    /// Error callback.
158    on_error: LaminarErrorCallback,
159    /// Cancellation flag.
160    cancelled: Arc<AtomicBool>,
161}
162
163// SAFETY: CallbackContext is only accessed from the subscription thread.
164// The user_data pointer is the caller's responsibility.
165unsafe impl Send for CallbackContext {}
166
167impl CallbackContext {
168    /// Invoke the data callback if set.
169    fn call_on_data(&self, batch: LaminarRecordBatch, event_type: i32) {
170        if let Some(callback) = self.on_data {
171            let batch_ptr = Box::into_raw(Box::new(batch));
172            // SAFETY: Callback is provided by FFI caller, we pass valid pointers.
173            unsafe {
174                callback(self.user_data, batch_ptr, event_type);
175            }
176        }
177    }
178
179    /// Invoke the error callback if set.
180    fn call_on_error(&self, error_code: i32, message: &str) {
181        if let Some(callback) = self.on_error {
182            let c_message = CString::new(message)
183                .unwrap_or_else(|_| CString::new("Error message contained null byte").unwrap());
184            // SAFETY: Callback is provided by FFI caller, we pass valid pointers.
185            unsafe {
186                callback(self.user_data, error_code, c_message.as_ptr());
187            }
188        }
189    }
190
191    /// Check if cancellation has been requested.
192    fn is_cancelled(&self) -> bool {
193        self.cancelled.load(Ordering::SeqCst)
194    }
195}
196
197// ============================================================================
198// FFI Functions
199// ============================================================================
200
201/// Create a callback-based subscription.
202///
203/// The `on_data` callback is invoked from a background thread when new data
204/// arrives. The callback receives ownership of the batch and must free it.
205///
206/// # Arguments
207///
208/// * `conn` - Database connection
209/// * `query` - SQL query string (null-terminated)
210/// * `on_data` - Callback for data batches (may be NULL to ignore data)
211/// * `on_error` - Callback for errors (may be NULL to ignore errors)
212/// * `user_data` - Opaque pointer passed to callbacks
213/// * `out` - Pointer to receive subscription handle
214///
215/// # Returns
216///
217/// `LAMINAR_OK` on success, or an error code.
218///
219/// # Safety
220///
221/// * `conn` must be a valid connection handle
222/// * `query` must be a valid null-terminated UTF-8 string
223/// * `out` must be a valid pointer
224/// * Callbacks must be thread-safe if `user_data` is shared
225#[no_mangle]
226pub unsafe extern "C" fn laminar_subscribe_callback(
227    conn: *mut LaminarConnection,
228    query: *const c_char,
229    on_data: LaminarSubscriptionCallback,
230    on_error: LaminarErrorCallback,
231    user_data: *mut c_void,
232    out: *mut *mut LaminarSubscriptionHandle,
233) -> i32 {
234    clear_last_error();
235
236    if conn.is_null() || query.is_null() || out.is_null() {
237        return LAMINAR_ERR_NULL_POINTER;
238    }
239
240    // Parse query string
241    let Ok(query_str) = (unsafe { CStr::from_ptr(query) }).to_str() else {
242        return LAMINAR_ERR_INVALID_UTF8;
243    };
244
245    // SAFETY: conn is non-null (checked above)
246    let conn_ref = unsafe { &(*conn).inner };
247
248    // Create the subscription (using query_stream for streaming results)
249    let stream = match conn_ref.query_stream(query_str) {
250        Ok(s) => s,
251        Err(e) => {
252            let code = e.code();
253            set_last_error(e);
254            return code;
255        }
256    };
257
258    // Set up cancellation flag
259    let cancelled = Arc::new(AtomicBool::new(false));
260    let cancelled_clone = Arc::clone(&cancelled);
261
262    // Create callback context
263    let ctx = CallbackContext {
264        user_data,
265        on_data,
266        on_error,
267        cancelled: cancelled_clone,
268    };
269
270    // Spawn background thread to drive the subscription
271    let thread_handle = thread::spawn(move || {
272        subscription_thread(stream, ctx);
273    });
274
275    // Create handle
276    let handle = Box::new(LaminarSubscriptionHandle::new(
277        cancelled,
278        thread_handle,
279        user_data,
280    ));
281
282    // SAFETY: out is non-null (checked above)
283    unsafe { *out = Box::into_raw(handle) };
284
285    LAMINAR_OK
286}
287
288/// Background thread function that drives the subscription.
289#[allow(clippy::needless_pass_by_value)] // ctx is owned by the thread
290fn subscription_thread(mut stream: crate::api::QueryStream, ctx: CallbackContext) {
291    loop {
292        // Check for cancellation
293        if ctx.is_cancelled() {
294            break;
295        }
296
297        // Try to get next batch (non-blocking to allow cancellation checks)
298        match stream.try_next() {
299            Ok(Some(batch)) => {
300                // Determine event type (default to insert for query results)
301                let event_type = LAMINAR_EVENT_INSERT;
302                ctx.call_on_data(LaminarRecordBatch::new(batch), event_type);
303            }
304            Ok(None) => {
305                // No data available, check if stream is exhausted
306                if !stream.is_active() {
307                    break;
308                }
309                // Brief sleep to avoid busy-waiting
310                std::thread::sleep(std::time::Duration::from_millis(1));
311            }
312            Err(e) => {
313                ctx.call_on_error(e.code(), e.message());
314                // Continue unless fatal
315                if !stream.is_active() {
316                    break;
317                }
318            }
319        }
320    }
321}
322
323/// Cancel a callback-based subscription.
324///
325/// After this function returns, no more callbacks will be invoked.
326/// It is safe to free resources referenced by `user_data` after this returns.
327///
328/// # Arguments
329///
330/// * `handle` - Subscription handle
331///
332/// # Returns
333///
334/// `LAMINAR_OK` on success, or an error code.
335///
336/// # Safety
337///
338/// `handle` must be a valid subscription handle.
339#[no_mangle]
340pub unsafe extern "C" fn laminar_subscription_cancel(
341    handle: *mut LaminarSubscriptionHandle,
342) -> i32 {
343    clear_last_error();
344
345    if handle.is_null() {
346        return LAMINAR_ERR_NULL_POINTER;
347    }
348
349    // SAFETY: handle is non-null (checked above)
350    let handle_ref = unsafe { &mut *handle };
351    handle_ref.cancel();
352
353    LAMINAR_OK
354}
355
356/// Check if a subscription is still active.
357///
358/// # Arguments
359///
360/// * `handle` - Subscription handle
361/// * `out` - Pointer to receive result (true if active)
362///
363/// # Returns
364///
365/// `LAMINAR_OK` on success, or an error code.
366///
367/// # Safety
368///
369/// * `handle` must be a valid subscription handle
370/// * `out` must be a valid pointer
371#[no_mangle]
372pub unsafe extern "C" fn laminar_subscription_is_active(
373    handle: *mut LaminarSubscriptionHandle,
374    out: *mut bool,
375) -> i32 {
376    clear_last_error();
377
378    if handle.is_null() || out.is_null() {
379        return LAMINAR_ERR_NULL_POINTER;
380    }
381
382    // SAFETY: handle and out are non-null (checked above)
383    let handle_ref = unsafe { &*handle };
384    let active = !handle_ref.cancelled.load(Ordering::SeqCst) && handle_ref.thread_handle.is_some();
385
386    unsafe { *out = active };
387
388    LAMINAR_OK
389}
390
391/// Get the user data pointer from a subscription handle.
392///
393/// # Arguments
394///
395/// * `handle` - Subscription handle
396///
397/// # Returns
398///
399/// The user data pointer passed to `laminar_subscribe_callback`, or NULL.
400///
401/// # Safety
402///
403/// `handle` must be a valid subscription handle or NULL.
404#[no_mangle]
405pub unsafe extern "C" fn laminar_subscription_user_data(
406    handle: *mut LaminarSubscriptionHandle,
407) -> *mut c_void {
408    if handle.is_null() {
409        return std::ptr::null_mut();
410    }
411
412    // SAFETY: handle is non-null (checked above)
413    unsafe { (*handle).user_data }
414}
415
416/// Free a subscription handle.
417///
418/// If the subscription is still active, it will be cancelled first.
419///
420/// # Arguments
421///
422/// * `handle` - Subscription handle to free
423///
424/// # Safety
425///
426/// `handle` must be a valid handle from `laminar_subscribe_callback`, or NULL.
427#[no_mangle]
428pub unsafe extern "C" fn laminar_subscription_free(handle: *mut LaminarSubscriptionHandle) {
429    if !handle.is_null() {
430        // SAFETY: handle is non-null and was allocated by Box
431        let mut boxed = unsafe { Box::from_raw(handle) };
432        // Cancel if still active
433        boxed.cancel();
434        drop(boxed);
435    }
436}
437
438// Note: Async write operations (laminar_writer_write_async) are deferred to a
439// future release. They require careful synchronization (Arc<Mutex<Writer>>)
440// that is beyond the scope of this initial callback implementation.
441// For now, use the synchronous laminar_writer_write() API.
442
443#[cfg(test)]
444#[allow(
445    clippy::borrow_as_ptr,
446    clippy::manual_c_str_literals,
447    clippy::items_after_statements
448)]
449mod tests {
450    use super::*;
451    use crate::ffi::connection::{laminar_close, laminar_execute, laminar_open};
452    use std::ptr;
453    use std::sync::atomic::AtomicUsize;
454
455    #[test]
456    fn test_event_type_constants() {
457        assert_eq!(LAMINAR_EVENT_INSERT, 0);
458        assert_eq!(LAMINAR_EVENT_DELETE, 1);
459        assert_eq!(LAMINAR_EVENT_UPDATE, 2);
460        assert_eq!(LAMINAR_EVENT_WATERMARK, 3);
461        assert_eq!(LAMINAR_EVENT_SNAPSHOT, 4);
462    }
463
464    #[test]
465    fn test_subscribe_null_pointer() {
466        let mut out: *mut LaminarSubscriptionHandle = ptr::null_mut();
467
468        // Null connection
469        let rc = unsafe {
470            laminar_subscribe_callback(
471                ptr::null_mut(),
472                b"SELECT 1\0".as_ptr().cast(),
473                None,
474                None,
475                ptr::null_mut(),
476                &mut out,
477            )
478        };
479        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
480
481        // Null query
482        let mut conn: *mut LaminarConnection = ptr::null_mut();
483        unsafe { laminar_open(&mut conn) };
484
485        let rc = unsafe {
486            laminar_subscribe_callback(conn, ptr::null(), None, None, ptr::null_mut(), &mut out)
487        };
488        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
489
490        // Null out
491        let rc = unsafe {
492            laminar_subscribe_callback(
493                conn,
494                b"SELECT 1\0".as_ptr().cast(),
495                None,
496                None,
497                ptr::null_mut(),
498                ptr::null_mut(),
499            )
500        };
501        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
502
503        unsafe { laminar_close(conn) };
504    }
505
506    #[test]
507    fn test_subscription_cancel_null() {
508        let rc = unsafe { laminar_subscription_cancel(ptr::null_mut()) };
509        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
510    }
511
512    #[test]
513    fn test_subscription_free_null() {
514        // Should not crash
515        unsafe { laminar_subscription_free(ptr::null_mut()) };
516    }
517
518    #[test]
519    fn test_subscription_user_data_null() {
520        let result = unsafe { laminar_subscription_user_data(ptr::null_mut()) };
521        assert!(result.is_null());
522    }
523
524    #[test]
525    fn test_subscribe_and_cancel() {
526        let mut conn: *mut LaminarConnection = ptr::null_mut();
527        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
528
529        unsafe {
530            laminar_open(&mut conn);
531
532            // Create a table for querying
533            let sql = b"CREATE TABLE callback_test (id BIGINT)\0";
534            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
535
536            // Subscribe (no callbacks, just test lifecycle)
537            let query = b"SELECT * FROM callback_test\0";
538            let rc = laminar_subscribe_callback(
539                conn,
540                query.as_ptr().cast(),
541                None,
542                None,
543                ptr::null_mut(),
544                &mut sub,
545            );
546            assert_eq!(rc, LAMINAR_OK);
547            assert!(!sub.is_null());
548
549            // Check active
550            let mut active = false;
551            let rc = laminar_subscription_is_active(sub, &mut active);
552            assert_eq!(rc, LAMINAR_OK);
553            // Note: may be false if stream completed immediately
554
555            // Cancel
556            let rc = laminar_subscription_cancel(sub);
557            assert_eq!(rc, LAMINAR_OK);
558
559            // Should no longer be active
560            let rc = laminar_subscription_is_active(sub, &mut active);
561            assert_eq!(rc, LAMINAR_OK);
562            assert!(!active);
563
564            laminar_subscription_free(sub);
565            laminar_close(conn);
566        }
567    }
568
569    #[test]
570    fn test_subscribe_with_user_data() {
571        let mut conn: *mut LaminarConnection = ptr::null_mut();
572        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
573
574        // Use a counter as user data
575        static COUNTER: AtomicUsize = AtomicUsize::new(42);
576
577        unsafe {
578            laminar_open(&mut conn);
579
580            let sql = b"CREATE TABLE userdata_test (id BIGINT)\0";
581            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
582
583            let query = b"SELECT * FROM userdata_test\0";
584            let user_data = std::ptr::addr_of!(COUNTER) as *mut c_void;
585
586            let rc = laminar_subscribe_callback(
587                conn,
588                query.as_ptr().cast(),
589                None,
590                None,
591                user_data,
592                &mut sub,
593            );
594            assert_eq!(rc, LAMINAR_OK);
595
596            // Verify user data is preserved
597            let retrieved = laminar_subscription_user_data(sub);
598            assert_eq!(retrieved, user_data);
599
600            laminar_subscription_cancel(sub);
601            laminar_subscription_free(sub);
602            laminar_close(conn);
603        }
604    }
605
606    // Static counters for callback tests
607    static DATA_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
608    static ERROR_CALLBACK_COUNT: AtomicUsize = AtomicUsize::new(0);
609
610    unsafe extern "C" fn test_data_callback(
611        _user_data: *mut c_void,
612        batch: *mut LaminarRecordBatch,
613        _event_type: i32,
614    ) {
615        DATA_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
616        // Must free the batch
617        if !batch.is_null() {
618            crate::ffi::query::laminar_batch_free(batch);
619        }
620    }
621
622    unsafe extern "C" fn test_error_callback(
623        _user_data: *mut c_void,
624        _error_code: i32,
625        _error_message: *const c_char,
626    ) {
627        ERROR_CALLBACK_COUNT.fetch_add(1, Ordering::SeqCst);
628    }
629
630    #[test]
631    fn test_subscribe_with_callbacks() {
632        // Reset counters
633        DATA_CALLBACK_COUNT.store(0, Ordering::SeqCst);
634        ERROR_CALLBACK_COUNT.store(0, Ordering::SeqCst);
635
636        let mut conn: *mut LaminarConnection = ptr::null_mut();
637        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
638
639        unsafe {
640            laminar_open(&mut conn);
641
642            let sql = b"CREATE TABLE callback_data_test (id BIGINT)\0";
643            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
644
645            let query = b"SELECT * FROM callback_data_test\0";
646            let rc = laminar_subscribe_callback(
647                conn,
648                query.as_ptr().cast(),
649                Some(test_data_callback),
650                Some(test_error_callback),
651                ptr::null_mut(),
652                &mut sub,
653            );
654            assert_eq!(rc, LAMINAR_OK);
655
656            // Give a moment for the subscription thread to run
657            std::thread::sleep(std::time::Duration::from_millis(50));
658
659            laminar_subscription_cancel(sub);
660            laminar_subscription_free(sub);
661            laminar_close(conn);
662        }
663
664        // Note: callbacks may or may not have fired depending on timing
665        // The important thing is no crashes occurred
666    }
667
668    #[test]
669    fn test_subscription_is_active_null_pointer() {
670        let mut active = true;
671        let rc = unsafe { laminar_subscription_is_active(ptr::null_mut(), &mut active) };
672        assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
673
674        let mut conn: *mut LaminarConnection = ptr::null_mut();
675        let mut sub: *mut LaminarSubscriptionHandle = ptr::null_mut();
676
677        unsafe {
678            laminar_open(&mut conn);
679            let sql = b"CREATE TABLE active_test (id BIGINT)\0";
680            laminar_execute(conn, sql.as_ptr().cast(), ptr::null_mut());
681
682            let query = b"SELECT * FROM active_test\0";
683            laminar_subscribe_callback(
684                conn,
685                query.as_ptr().cast(),
686                None,
687                None,
688                ptr::null_mut(),
689                &mut sub,
690            );
691
692            // Null out pointer
693            let rc = laminar_subscription_is_active(sub, ptr::null_mut());
694            assert_eq!(rc, LAMINAR_ERR_NULL_POINTER);
695
696            laminar_subscription_cancel(sub);
697            laminar_subscription_free(sub);
698            laminar_close(conn);
699        }
700    }
701}