refactor dispatch take 2 (#2533)

This commit is contained in:
andy finch 2019-06-17 21:02:08 -04:00 committed by Ryan Dahl
parent 9ad5b0653e
commit 76d51b0f9a
8 changed files with 470 additions and 497 deletions

View file

@ -6,6 +6,7 @@
//! message or a "minimal" message.
use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::Op;
use deno::PinnedBuf;
use futures::Future;
@ -89,7 +90,7 @@ pub fn dispatch_minimal(
state: &ThreadSafeState,
mut record: Record,
zero_copy: Option<PinnedBuf>,
) -> Op {
) -> CoreOp {
let is_sync = record.promise_id == 0;
let min_op = match record.op_id {
OP_READ => ops::read(record.arg, zero_copy),

View file

@ -243,6 +243,20 @@ pub fn no_buffer_specified() -> DenoError {
new(ErrorKind::InvalidInput, String::from("no buffer specified"))
}
pub fn no_async_support() -> DenoError {
new(
ErrorKind::NoAsyncSupport,
String::from("op doesn't support async calls"),
)
}
pub fn no_sync_support() -> DenoError {
new(
ErrorKind::NoSyncSupport,
String::from("op doesn't support sync calls"),
)
}
#[derive(Debug)]
pub enum RustOrJsError {
Rust(DenoError),

View file

@ -136,6 +136,8 @@ enum ErrorKind: byte {
OpNotAvaiable,
WorkerInitFailed,
UnixError,
NoAsyncSupport,
NoSyncSupport,
ImportMapError,
}

File diff suppressed because it is too large Load diff

View file

@ -15,9 +15,9 @@ use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
use deno::Buf;
use deno::CoreOp;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
use futures::future::Either;
use futures::future::Shared;
@ -106,7 +106,11 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
pub fn dispatch(&self, control: &[u8], zero_copy: Option<PinnedBuf>) -> Op {
pub fn dispatch(
&self,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
}
}

View file

@ -111,7 +111,7 @@ fn test_record_from() {
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op {
fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {

View file

@ -29,13 +29,19 @@ use std::sync::{Arc, Mutex, Once, ONCE_INIT};
pub type Buf = Box<[u8]>;
pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>;
pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
pub enum Op {
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture),
Async(OpAsyncFuture<E>),
}
pub type CoreError = ();
type CoreOpAsyncFuture = OpAsyncFuture<CoreError>;
pub type CoreOp = Op<CoreError>;
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@ -68,7 +74,9 @@ pub enum StartupData<'a> {
None,
}
type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op;
pub type OpResult<E> = Result<Op<E>, E>;
type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>;
type DynImportFn = Fn(&str, &str) -> DynImportFuture;
@ -104,11 +112,11 @@ impl Future for DynImport {
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>,
dispatch: Option<Arc<DispatchFn>>,
dispatch: Option<Arc<CoreDispatchFn>>,
dyn_import: Option<Arc<DynImportFn>>,
needs_init: bool,
shared: SharedQueue,
pending_ops: FuturesUnordered<OpAsyncFuture>,
pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
@ -184,7 +192,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static,
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@ -664,7 +672,7 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
isolate.set_dispatch(move |control, _| -> Op {
isolate.set_dispatch(move |control, _| -> CoreOp {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {

View file

@ -56,13 +56,27 @@ function sendInternal(
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
sync = true
): [number, null | Uint8Array] {
isSync: true
): Uint8Array;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: false
): Promise<msg.Base>;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: boolean
): Promise<msg.Base> | Uint8Array {
const cmdId = nextPromiseId();
msg.Base.startBase(builder);
msg.Base.addInner(builder, inner);
msg.Base.addInnerType(builder, innerType);
msg.Base.addSync(builder, sync);
msg.Base.addSync(builder, isSync);
msg.Base.addCmdId(builder, cmdId);
builder.finish(msg.Base.endBase(builder));
@ -74,7 +88,27 @@ function sendInternal(
);
builder.inUse = false;
return [cmdId, response];
if (response == null) {
util.assert(!isSync);
const promise = util.createResolvable<msg.Base>();
promiseTable.set(cmdId, promise);
return promise;
} else {
if (!isSync) {
// We can easily and correctly allow for sync responses to async calls
// by creating and returning a promise from the sync response.
const bb = new flatbuffers.ByteBuffer(response);
const base = msg.Base.getRootAsBase(bb);
const err = errors.maybeError(base);
if (err != null) {
return Promise.reject(err);
} else {
return Promise.resolve(base);
}
}
return response;
}
}
// @internal
@ -84,17 +118,7 @@ export function sendAsync(
inner: flatbuffers.Offset,
data?: ArrayBufferView
): Promise<msg.Base> {
const [cmdId, response] = sendInternal(
builder,
innerType,
inner,
data,
false
);
util.assert(response == null); // null indicates async.
const promise = util.createResolvable<msg.Base>();
promiseTable.set(cmdId, promise);
return promise;
return sendInternal(builder, innerType, inner, data, false);
}
// @internal
@ -104,9 +128,7 @@ export function sendSync(
inner: flatbuffers.Offset,
data?: ArrayBufferView
): null | msg.Base {
const [cmdId, response] = sendInternal(builder, innerType, inner, data, true);
util.assert(cmdId >= 0);
util.assert(response != null); // null indicates async.
const response = sendInternal(builder, innerType, inner, data, true);
if (response!.length === 0) {
return null;
} else {