// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use deno_core::futures::channel::mpsc; use deno_runtime::deno_napi::*; use once_cell::sync::Lazy; use std::mem::forget; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; use std::sync::Arc; static TS_FN_ID_COUNTER: Lazy = Lazy::new(|| AtomicUsize::new(0)); pub struct TsFn { pub id: usize, pub env: *mut Env, pub maybe_func: Option>, pub maybe_call_js_cb: Option, pub context: *mut c_void, pub thread_counter: usize, pub ref_counter: Arc, sender: mpsc::UnboundedSender, tsfn_sender: mpsc::UnboundedSender, } impl Drop for TsFn { fn drop(&mut self) { let env = unsafe { self.env.as_mut().unwrap() }; env.remove_threadsafe_function_ref_counter(self.id) } } impl TsFn { pub fn acquire(&mut self) -> Result { self.thread_counter += 1; Ok(()) } pub fn release(mut self) -> Result { self.thread_counter -= 1; if self.thread_counter == 0 { self .tsfn_sender .unbounded_send(ThreadSafeFunctionStatus::Dead) .map_err(|_| Error::GenericFailure)?; drop(self); } else { forget(self); } Ok(()) } pub fn ref_(&mut self) -> Result { self .ref_counter .fetch_add(1, std::sync::atomic::Ordering::SeqCst); Ok(()) } pub fn unref(&mut self) -> Result { let _ = self.ref_counter.fetch_update( std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst, |x| { if x == 0 { None } else { Some(x - 1) } }, ); Ok(()) } pub fn call(&self, data: *mut c_void, is_blocking: bool) { let js_func = self.maybe_func.clone(); let (tx, rx) = channel(); if let Some(call_js_cb) = self.maybe_call_js_cb { let context = self.context; let env = self.env; let call = Box::new(move || { let scope = &mut unsafe { (*env).scope() }; match js_func { Some(func) => { let func: v8::Local = func.open(scope).to_object(scope).unwrap().into(); unsafe { call_js_cb(env as *mut c_void, func.into(), context, data) }; } None => { unsafe { call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data) }; } } // Receiver might have been already dropped let _ = tx.send(()); }); // This call should never fail self.sender.unbounded_send(call).unwrap(); } else if let Some(_js_func) = js_func { let call = Box::new(move || { // TODO: func.call // let func = js_func.open(scope); // Receiver might have been already dropped let _ = tx.send(()); }); // This call should never fail self.sender.unbounded_send(call).unwrap(); } if is_blocking { rx.recv().unwrap(); } } } #[napi_sym::napi_sym] fn napi_create_threadsafe_function( env: *mut Env, func: napi_value, _async_resource: napi_value, _async_resource_name: napi_value, _max_queue_size: usize, initial_thread_count: usize, _thread_finialize_data: *mut c_void, _thread_finalize_cb: napi_finalize, context: *mut c_void, maybe_call_js_cb: Option, result: *mut napi_threadsafe_function, ) -> Result { let env_ref = env.as_mut().ok_or(Error::GenericFailure)?; if initial_thread_count == 0 { return Err(Error::InvalidArg); } let maybe_func = func .map(|value| { let func = v8::Local::::try_from(value) .map_err(|_| Error::FunctionExpected)?; Ok(v8::Global::new(&mut env_ref.scope(), func)) }) .transpose()?; let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let tsfn = TsFn { id, maybe_func, maybe_call_js_cb, context, thread_counter: initial_thread_count, sender: env_ref.async_work_sender.clone(), tsfn_sender: env_ref.threadsafe_function_sender.clone(), ref_counter: Arc::new(AtomicUsize::new(1)), env, }; env_ref .add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone()); env_ref .threadsafe_function_sender .unbounded_send(ThreadSafeFunctionStatus::Alive) .map_err(|_| Error::GenericFailure)?; *result = transmute::, _>(Box::new(tsfn)); Ok(()) } #[napi_sym::napi_sym] fn napi_acquire_threadsafe_function( tsfn: napi_threadsafe_function, _mode: napi_threadsafe_function_release_mode, ) -> Result { let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn); tsfn.acquire()?; Ok(()) } #[napi_sym::napi_sym] fn napi_unref_threadsafe_function( _env: &mut Env, tsfn: napi_threadsafe_function, ) -> Result { let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn); tsfn.unref()?; Ok(()) } /// Maybe called from any thread. #[napi_sym::napi_sym] pub fn napi_get_threadsafe_function_context( func: napi_threadsafe_function, result: *mut *const c_void, ) -> Result { let tsfn: &TsFn = &*(func as *const TsFn); *result = tsfn.context; Ok(()) } #[napi_sym::napi_sym] fn napi_call_threadsafe_function( func: napi_threadsafe_function, data: *mut c_void, is_blocking: napi_threadsafe_function_call_mode, ) -> Result { let tsfn: &TsFn = &*(func as *const TsFn); tsfn.call(data, is_blocking != 0); Ok(()) } #[napi_sym::napi_sym] fn napi_ref_threadsafe_function( _env: &mut Env, func: napi_threadsafe_function, ) -> Result { let tsfn: &mut TsFn = &mut *(func as *mut TsFn); tsfn.ref_()?; Ok(()) } #[napi_sym::napi_sym] fn napi_release_threadsafe_function( tsfn: napi_threadsafe_function, _mode: napi_threadsafe_function_release_mode, ) -> Result { let tsfn: Box = Box::from_raw(tsfn as *mut TsFn); tsfn.release()?; Ok(()) }