perf(ext/ffi): switch from middleware to tasks (#21239)

Deno-side changes for https://github.com/denoland/deno_core/pull/350

---------

Co-authored-by: Aapo Alasuutari <aapo.alasuutari@gmail.com>
This commit is contained in:
Matt Mastracci 2023-12-11 20:10:33 -07:00 committed by GitHub
parent d13e45f2b3
commit a4f45f7092
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 300 additions and 97 deletions

View file

@ -1,6 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertEquals, assertThrows } from "./test_util.ts";
import { assertEquals, assertRejects, assertThrows } from "./test_util.ts";
Deno.test({ permissions: { ffi: true } }, function dlopenInvalidArguments() {
const filename = "/usr/lib/libc.so.6";
@ -98,3 +98,40 @@ Deno.test({ permissions: { ffi: true } }, function pointerOf() {
);
assertEquals(Number(baseAddress) + 80, float64AddressOffset);
});
Deno.test({ permissions: { ffi: true } }, function callWithError() {
const throwCb = () => {
throw new Error("Error");
};
const cb = new Deno.UnsafeCallback({
parameters: [],
result: "void",
}, throwCb);
const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, {
parameters: [],
result: "void",
});
assertThrows(() => fnPointer.call());
cb.close();
});
Deno.test(
{ permissions: { ffi: true }, ignore: true },
async function callNonBlockingWithError() {
const throwCb = () => {
throw new Error("Error");
};
const cb = new Deno.UnsafeCallback({
parameters: [],
result: "void",
}, throwCb);
const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, {
parameters: [],
result: "void",
nonblocking: true,
});
// TODO(mmastrac): currently ignored as we do not thread callback exceptions through nonblocking pointers
await assertRejects(async () => await fnPointer.call());
cb.close();
},
);

View file

@ -368,7 +368,7 @@ pub fn op_ffi_call_nonblocking(
})
}
#[op2]
#[op2(reentrant)]
#[serde]
pub fn op_ffi_call_ptr<FP>(
scope: &mut v8::HandleScope,

View file

@ -3,21 +3,19 @@
use crate::check_unstable;
use crate::symbol::NativeType;
use crate::FfiPermissions;
use crate::FfiState;
use crate::ForeignFunction;
use crate::PendingFfiAsyncWork;
use crate::MAX_SAFE_INTEGER;
use crate::MIN_SAFE_INTEGER;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::task::AtomicWaker;
use deno_core::op2;
use deno_core::v8;
use deno_core::v8::TryCatch;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::V8CrossThreadTaskSpawner;
use libffi::middle::Cif;
use serde::Deserialize;
use std::borrow::Cow;
@ -31,8 +29,6 @@ use std::ptr::NonNull;
use std::rc::Rc;
use std::sync::atomic;
use std::sync::atomic::AtomicU32;
use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::task::Poll;
static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
@ -93,13 +89,12 @@ impl Resource for UnsafeCallbackResource {
}
struct CallbackInfo {
pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
pub async_work_sender: V8CrossThreadTaskSpawner,
pub callback: NonNull<v8::Function>,
pub context: NonNull<v8::Context>,
pub parameters: Box<[NativeType]>,
pub result: NativeType,
pub thread_id: u32,
pub waker: Arc<AtomicWaker>,
}
impl Future for CallbackInfo {
@ -113,6 +108,32 @@ impl Future for CallbackInfo {
}
}
struct TaskArgs {
cif: NonNull<libffi::low::ffi_cif>,
result: NonNull<c_void>,
args: *const *const c_void,
info: NonNull<CallbackInfo>,
}
// SAFETY: we know these are valid Send-safe pointers as they are for FFI
unsafe impl Send for TaskArgs {}
impl TaskArgs {
fn run(&mut self, scope: &mut v8::HandleScope) {
// SAFETY: making a call using Send-safe pointers turned back into references. We know the
// lifetime of these will last because we block on the result of the spawn call.
unsafe {
do_ffi_callback(
scope,
self.cif.as_ref(),
self.info.as_ref(),
self.result.as_mut(),
self.args,
)
}
}
}
unsafe extern "C" fn deno_ffi_callback(
cif: &libffi::low::ffi_cif,
result: &mut c_void,
@ -121,51 +142,55 @@ unsafe extern "C" fn deno_ffi_callback(
) {
LOCAL_THREAD_ID.with(|s| {
if *s.borrow() == info.thread_id {
// Own isolate thread, okay to call directly
do_ffi_callback(cif, info, result, args);
// Call from main thread. If this callback is being triggered due to a
// function call coming from Deno itself, then this callback will build
// ontop of that stack.
// If this callback is being triggered outside of Deno (for example from a
// signal handler) then this will either create an empty new stack if
// Deno currently has nothing running and is waiting for promises to resolve,
// or will (very incorrectly) build ontop of whatever stack exists.
// The callback will even be called through from a `while (true)` liveloop, but
// it somehow cannot change the values that the loop sees, even if they both
// refer the same `let bool_value`.
let context: NonNull<v8::Context> = info.context;
let context = std::mem::transmute::<
NonNull<v8::Context>,
v8::Local<v8::Context>,
>(context);
let mut cb_scope = v8::CallbackScope::new(context);
let scope = &mut v8::HandleScope::new(&mut cb_scope);
do_ffi_callback(scope, cif, info, result, args);
} else {
let async_work_sender = &info.async_work_sender;
// SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received.
let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif);
let result: &'static mut c_void = std::mem::transmute(result);
let info: &'static CallbackInfo = std::mem::transmute(info);
let (response_sender, response_receiver) = sync_channel::<()>(0);
let fut = Box::new(move || {
do_ffi_callback(cif, info, result, args);
response_sender.send(()).unwrap();
let mut args = TaskArgs {
cif: NonNull::from(cif),
result: NonNull::from(result),
args,
info: NonNull::from(info),
};
async_work_sender.spawn_blocking(move |scope| {
// We don't have a lot of choice here, so just print an unhandled exception message
let tc_scope = &mut TryCatch::new(scope);
args.run(tc_scope);
if tc_scope.exception().is_some() {
eprintln!("Illegal unhandled exception in nonblocking callback.");
}
});
async_work_sender.unbounded_send(fut).unwrap();
// Make sure event loop wakes up to receive our message before we start waiting for a response.
info.waker.wake();
response_receiver.recv().unwrap();
}
});
}
unsafe fn do_ffi_callback(
scope: &mut v8::HandleScope,
cif: &libffi::low::ffi_cif,
info: &CallbackInfo,
result: &mut c_void,
args: *const *const c_void,
) {
let callback: NonNull<v8::Function> = info.callback;
let context: NonNull<v8::Context> = info.context;
let context = std::mem::transmute::<
NonNull<v8::Context>,
v8::Local<v8::Context>,
>(context);
// Call from main thread. If this callback is being triggered due to a
// function call coming from Deno itself, then this callback will build
// ontop of that stack.
// If this callback is being triggered outside of Deno (for example from a
// signal handler) then this will either create an empty new stack if
// Deno currently has nothing running and is waiting for promises to resolve,
// or will (very incorrectly) build ontop of whatever stack exists.
// The callback will even be called through from a `while (true)` liveloop, but
// it somehow cannot change the values that the loop sees, even if they both
// refer the same `let bool_value`.
let mut cb_scope = v8::CallbackScope::new(context);
let scope = &mut v8::HandleScope::new(&mut cb_scope);
let func = std::mem::transmute::<
NonNull<v8::Function>,
v8::Local<v8::Function>,
@ -562,25 +587,12 @@ where
panic!("Isolate ID counter overflowed u32");
}
let async_work_sender =
if let Some(ffi_state) = state.try_borrow_mut::<FfiState>() {
ffi_state.async_work_sender.clone()
} else {
let (async_work_sender, async_work_receiver) =
mpsc::unbounded::<PendingFfiAsyncWork>();
let async_work_sender = state.borrow::<V8CrossThreadTaskSpawner>().clone();
state.put(FfiState {
async_work_receiver,
async_work_sender: async_work_sender.clone(),
});
async_work_sender
};
let callback = v8::Global::new(scope, cb).into_raw();
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
let waker = state.waker.clone();
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
async_work_sender,
callback,
@ -588,7 +600,6 @@ where
parameters: args.parameters.clone().into(),
result: args.result.clone(),
thread_id,
waker,
}));
let cif = Cif::new(
args

View file

@ -1,15 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::OpState;
use std::cell::RefCell;
use std::mem::size_of;
use std::os::raw::c_char;
use std::os::raw::c_short;
use std::path::Path;
use std::rc::Rc;
mod call;
mod callback;
@ -59,13 +56,6 @@ pub trait FfiPermissions {
fn check_partial(&mut self, path: Option<&Path>) -> Result<(), AnyError>;
}
pub(crate) type PendingFfiAsyncWork = Box<dyn FnOnce()>;
pub(crate) struct FfiState {
pub(crate) async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
pub(crate) async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
}
deno_core::extension!(deno_ffi,
deps = [ deno_web ],
parameters = [P: FfiPermissions],
@ -101,35 +91,4 @@ deno_core::extension!(deno_ffi,
op_ffi_unsafe_callback_ref,
],
esm = [ "00_ffi.js" ],
event_loop_middleware = event_loop_middleware,
);
fn event_loop_middleware(
op_state_rc: Rc<RefCell<OpState>>,
_cx: &mut std::task::Context,
) -> bool {
// FFI callbacks coming in from other threads will call in and get queued.
let mut maybe_scheduling = false;
let mut op_state = op_state_rc.borrow_mut();
if let Some(ffi_state) = op_state.try_borrow_mut::<FfiState>() {
// TODO(mmastrac): This should be a SmallVec to avoid allocations in most cases
let mut work_items = Vec::with_capacity(1);
while let Ok(Some(async_work_fut)) =
ffi_state.async_work_receiver.try_next()
{
// Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work.
work_items.push(async_work_fut);
maybe_scheduling = true;
}
// Drop the op_state and ffi_state borrows
drop(op_state);
for async_work_fut in work_items.into_iter() {
async_work_fut();
}
}
maybe_scheduling
}

View file

@ -258,6 +258,20 @@ pub extern "C" fn call_stored_function_thread_safe_and_log() {
});
}
#[no_mangle]
pub extern "C" fn call_stored_function_2_thread_safe(arg: u8) {
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(1500));
unsafe {
if STORED_FUNCTION_2.is_none() {
return;
}
println!("Calling");
STORED_FUNCTION_2.unwrap()(arg);
}
});
}
#[no_mangle]
pub extern "C" fn log_many_parameters(
a: u8,

View file

@ -0,0 +1,141 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const targetDir = Deno.execPath().replace(/[^\/\\]+$/, "");
const [libPrefix, libSuffix] = {
darwin: ["lib", "dylib"],
linux: ["lib", "so"],
windows: ["", "dll"],
}[Deno.build.os];
const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`;
const dylib = Deno.dlopen(
libPath,
{
store_function_2: {
parameters: ["function"],
result: "void",
},
call_stored_function_2: {
parameters: ["u8"],
result: "void",
},
call_stored_function_2_from_other_thread: {
name: "call_stored_function_2",
parameters: ["u8"],
result: "void",
nonblocking: true,
},
call_stored_function_2_thread_safe: {
parameters: ["u8"],
result: "void",
},
} as const,
);
globalThis.addEventListener("error", (data) => {
console.log("Unhandled error");
data.preventDefault();
});
globalThis.onerror = (data) => {
console.log("Unhandled error");
if (typeof data !== "string") {
data.preventDefault();
}
};
globalThis.addEventListener("unhandledrejection", (data) => {
console.log("Unhandled rejection");
data.preventDefault();
});
const timer = setTimeout(() => {
console.error(
"Test failed, final callback did not get picked up by Deno event loop",
);
Deno.exit(-1);
}, 5_000);
Deno.unrefTimer(timer);
enum CallCase {
SyncSelf,
SyncFfi,
AsyncSelf,
AsyncSyncFfi,
AsyncFfi,
}
type U8CallCase = Deno.NativeU8Enum<CallCase>;
const throwCb = (c: CallCase): number => {
console.log("CallCase:", CallCase[c]);
if (c === CallCase.AsyncFfi) {
cb.unref();
}
throw new Error("Error");
};
const THROW_CB_DEFINITION = {
parameters: ["u8" as U8CallCase],
result: "u8",
} as const;
const cb = new Deno.UnsafeCallback(THROW_CB_DEFINITION, throwCb);
try {
const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, THROW_CB_DEFINITION);
fnPointer.call(CallCase.SyncSelf);
} catch (_err) {
console.log(
"Throwing errors from an UnsafeCallback called from a synchronous UnsafeFnPointer works. Terribly excellent.",
);
}
dylib.symbols.store_function_2(cb.pointer);
try {
dylib.symbols.call_stored_function_2(CallCase.SyncFfi);
} catch (_err) {
console.log(
"Throwing errors from an UnsafeCallback called from a synchronous FFI symbol works. Terribly excellent.",
);
}
try {
const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, {
...THROW_CB_DEFINITION,
nonblocking: true,
});
await fnPointer.call(CallCase.AsyncSelf);
} catch (err) {
throw new Error(
"Nonblocking UnsafeFnPointer should not be threading through a JS error thrown on the other side of the call",
{
cause: err,
},
);
}
try {
await dylib.symbols.call_stored_function_2_from_other_thread(
CallCase.AsyncSyncFfi,
);
} catch (err) {
throw new Error(
"Nonblocking symbol call should not be threading through a JS error thrown on the other side of the call",
{
cause: err,
},
);
}
try {
// Ref the callback to make sure we do not exit before the call is done.
cb.ref();
dylib.symbols.call_stored_function_2_thread_safe(CallCase.AsyncFfi);
} catch (err) {
throw new Error(
"Blocking symbol call should not be travelling 1.5 seconds forward in time to figure out that it call will trigger a JS error to be thrown",
{
cause: err,
},
);
}

View file

@ -237,3 +237,44 @@ fn event_loop_integration() {
assert_eq!(stdout, expected);
assert_eq!(stderr, "");
}
#[test]
fn ffi_callback_errors_test() {
build();
let output = deno_cmd()
.arg("run")
.arg("--allow-ffi")
.arg("--allow-read")
.arg("--unstable")
.arg("--quiet")
.arg("tests/ffi_callback_errors.ts")
.env("NO_COLOR", "1")
.output()
.unwrap();
let stdout = std::str::from_utf8(&output.stdout).unwrap();
let stderr = std::str::from_utf8(&output.stderr).unwrap();
if !output.status.success() {
println!("stdout {stdout}");
println!("stderr {stderr}");
}
println!("{:?}", output.status);
assert!(output.status.success());
let expected = "\
CallCase: SyncSelf\n\
Throwing errors from an UnsafeCallback called from a synchronous UnsafeFnPointer works. Terribly excellent.\n\
CallCase: SyncFfi\n\
0\n\
Throwing errors from an UnsafeCallback called from a synchronous FFI symbol works. Terribly excellent.\n\
CallCase: AsyncSelf\n\
CallCase: AsyncSyncFfi\n\
0\n\
Calling\n\
CallCase: AsyncFfi\n";
assert_eq!(stdout, expected);
assert_eq!(
stderr,
"Illegal unhandled exception in nonblocking callback.\n".repeat(3)
);
}