Refactor dispatch handling (#2452)

Promise id is now created in core and passed back to JS.
This commit is contained in:
andy finch 2019-06-13 23:43:54 -04:00 committed by Ryan Dahl
parent fdd2eb5383
commit dc60fe9f30
18 changed files with 667 additions and 709 deletions

View File

@ -6,6 +6,7 @@
//! message or a "minimal" message.
use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::Op;
use deno::PinnedBuf;
use futures::Future;
@ -17,7 +18,6 @@ const OP_WRITE: i32 = 2;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
@ -25,15 +25,9 @@ pub struct Record {
impl Into<Buf> for Record {
fn into(self) -> Buf {
let vec = vec![
DISPATCH_MINIMAL_TOKEN,
self.promise_id,
self.op_id,
self.arg,
self.result,
];
let vec = vec![DISPATCH_MINIMAL_TOKEN, self.op_id, self.arg, self.result];
let buf32 = vec.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
let ptr = Box::into_raw(buf32) as *mut [u8; 4 * 4];
unsafe { Box::from_raw(ptr) }
}
}
@ -45,36 +39,32 @@ pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
let p = bytes.as_ptr();
#[allow(clippy::cast_ptr_alignment)]
let p32 = p as *const i32;
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 3) };
if s.len() < 5 {
if s.len() < 4 {
return None;
}
let ptr = s.as_ptr();
let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
if ints[0] != DISPATCH_MINIMAL_TOKEN {
return None;
}
Some(Record {
promise_id: ints[1],
op_id: ints[2],
arg: ints[3],
result: ints[4],
op_id: ints[1],
arg: ints[2],
result: ints[3],
})
}
#[test]
fn test_parse_min_record() {
let buf = vec![
0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0,
];
let buf = vec![0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0];
assert_eq!(
parse_min_record(&buf),
Some(Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
op_id: 1,
arg: 2,
result: 3,
})
);
@ -89,8 +79,7 @@ pub fn dispatch_minimal(
state: &ThreadSafeState,
mut record: Record,
zero_copy: Option<PinnedBuf>,
) -> Op {
let is_sync = record.promise_id == 0;
) -> CoreOp {
let min_op = match record.op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
@ -115,11 +104,7 @@ pub fn dispatch_minimal(
state.metrics_op_completed(buf.len());
Ok(buf)
}));
if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
}
Op::Async(fut)
}
mod ops {

View File

@ -243,6 +243,20 @@ pub fn no_buffer_specified() -> DenoError {
new(ErrorKind::InvalidInput, String::from("no buffer specified"))
}
pub fn no_async_support() -> DenoError {
new(
ErrorKind::NoAsyncSupport,
String::from("op doesn't support async calls"),
)
}
pub fn no_sync_support() -> DenoError {
new(
ErrorKind::NoSyncSupport,
String::from("op doesn't support sync calls"),
)
}
#[derive(Debug)]
pub enum RustOrJsError {
Rust(DenoError),

View File

@ -136,6 +136,8 @@ enum ErrorKind: byte {
OpNotAvaiable,
WorkerInitFailed,
UnixError,
NoAsyncSupport,
NoSyncSupport,
ImportMapError,
}
@ -153,7 +155,6 @@ enum MediaType: byte {
}
table Base {
cmd_id: uint32;
sync: bool = false;
error_kind: ErrorKind = NoError;
error: string;

File diff suppressed because it is too large Load Diff

View File

@ -15,9 +15,9 @@ use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
use deno::Buf;
use deno::CoreOp;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
use futures::future::Either;
use futures::future::Shared;
@ -106,7 +106,11 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
pub fn dispatch(&self, control: &[u8], zero_copy: Option<PinnedBuf>) -> Op {
pub fn dispatch(
&self,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
}
}

6
core/core.d.ts vendored
View File

@ -4,15 +4,13 @@
// Deno core. These are not intended to be used directly by runtime users of
// Deno and therefore do not flow through to the runtime type library.
declare interface MessageCallback {
(msg: Uint8Array): void;
}
declare type MessageCallback = (promiseId: number, msg: Uint8Array) => void;
declare interface DenoCore {
dispatch(
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
): Uint8Array | null;
): Uint8Array | null | number;
setAsyncHandler(cb: MessageCallback): void;
sharedQueue: {
head(): number;

View File

@ -13,7 +13,6 @@ const responseBuf = new Uint8Array(
.map(c => c.charCodeAt(0))
);
const promiseMap = new Map();
let nextPromiseId = 1;
function assert(cond) {
if (!cond) {
@ -37,8 +36,8 @@ const scratchBytes = new Uint8Array(
);
assert(scratchBytes.byteLength === 4 * 4);
function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
function send(isSync, opId, arg, zeroCopy = null) {
scratch32[0] = isSync;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
@ -47,10 +46,9 @@ function send(promiseId, opId, arg, zeroCopy = null) {
/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const promiseId = send(false, opId, arg, zeroCopy);
const p = createResolvable();
promiseMap.set(promiseId, p);
send(promiseId, opId, arg, zeroCopy);
return p;
}
@ -58,7 +56,7 @@ function recordFromBuf(buf) {
assert(buf.byteLength === 16);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
promiseId: buf32[0],
isSync: !!buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
@ -67,14 +65,14 @@ function recordFromBuf(buf) {
/** Returns i32 number */
function sendSync(opId, arg) {
const buf = send(0, opId, arg);
const buf = send(true, opId, arg);
const record = recordFromBuf(buf);
return record.result;
}
function handleAsyncMsgFromRust(buf) {
function handleAsyncMsgFromRust(promiseId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const { result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);

View File

@ -44,7 +44,7 @@ const OP_CLOSE: i32 = 5;
#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
pub is_sync: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
@ -52,8 +52,8 @@ pub struct Record {
impl Into<Buf> for Record {
fn into(self) -> Buf {
let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
.into_boxed_slice();
let buf32 =
vec![self.is_sync, self.op_id, self.arg, self.result].into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
unsafe { Box::from_raw(ptr) }
}
@ -65,7 +65,7 @@ impl From<&[u8]> for Record {
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
Record {
promise_id: ints[0],
is_sync: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
@ -81,7 +81,7 @@ impl From<Buf> for Record {
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
Record {
promise_id: ints[0],
is_sync: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
@ -92,7 +92,7 @@ impl From<Buf> for Record {
#[test]
fn test_record_from() {
let r = Record {
promise_id: 1,
is_sync: 1,
op_id: 2,
arg: 3,
result: 4,
@ -111,9 +111,9 @@ fn test_record_from() {
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op {
fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let is_sync = record.is_sync == 1;
let http_bench_op = match record.op_id {
OP_LISTEN => {
assert!(is_sync);

View File

@ -21,21 +21,29 @@ use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_char;
use libc::c_int;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
pub type Buf = Box<[u8]>;
pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>;
pub type OpAsyncFuture<I, E> = Box<dyn Future<Item = I, Error = E> + Send>;
pub enum Op {
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture),
Async(OpAsyncFuture<Buf, E>),
}
pub type CoreError = ();
type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>;
pub type CoreOp = Op<CoreError>;
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@ -68,7 +76,9 @@ pub enum StartupData<'a> {
None,
}
type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op;
pub type OpResult<E> = Result<Op<E>, E>;
type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>;
type DynImportFn = Fn(&str, &str) -> DynImportFuture;
@ -93,6 +103,12 @@ impl Future for DynImport {
}
}
enum ResponseData {
None,
Buffer(deno_buf),
PromiseId(c_int),
}
/// A single execution context of JavaScript. Corresponds roughly to the "Web
/// Worker" concept in the DOM. An Isolate is a Future that can be used with
/// Tokio. The Isolate future complete when there is an error or when all
@ -104,14 +120,15 @@ impl Future for DynImport {
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>,
dispatch: Option<Arc<DispatchFn>>,
dispatch: Option<Arc<CoreDispatchFn>>,
dyn_import: Option<Arc<DynImportFn>>,
needs_init: bool,
shared: SharedQueue,
pending_ops: FuturesUnordered<OpAsyncFuture>,
pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
next_promise_id: AtomicI32,
}
unsafe impl Send for Isolate {}
@ -176,6 +193,7 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
next_promise_id: AtomicI32::new(1),
}
}
@ -184,7 +202,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static,
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@ -239,6 +257,10 @@ impl Isolate {
}
}
pub fn get_next_promise_id(&self) -> i32 {
self.next_promise_id.fetch_add(1, Ordering::SeqCst)
}
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_argv0: deno_buf,
@ -279,9 +301,17 @@ impl Isolate {
// return value.
// TODO(ry) check that if JSError thrown during respond(), that it will be
// picked up.
let _ = isolate.respond(Some(&buf));
let _ =
isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())));
}
Op::Async(fut) => {
let promise_id = isolate.get_next_promise_id();
let _ = isolate.respond(ResponseData::PromiseId(promise_id));
let fut = Box::new(fut.and_then(
move |buf| -> Result<(c_int, Buf), CoreError> {
Ok((promise_id, buf))
},
));
isolate.pending_ops.push(fut);
isolate.have_unpolled_ops = true;
}
@ -340,14 +370,34 @@ impl Isolate {
}
}
fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> {
let buf = match maybe_buf {
None => deno_buf::empty(),
Some(r) => deno_buf::from(r),
// the result type is a placeholder for a more specific enum type
fn respond(&mut self, data: ResponseData) -> Result<(), JSError> {
match data {
ResponseData::PromiseId(pid) => unsafe {
libdeno::deno_respond(
self.libdeno_isolate,
self.as_raw_ptr(),
deno_buf::empty(),
&pid,
)
},
ResponseData::Buffer(r) => unsafe {
libdeno::deno_respond(
self.libdeno_isolate,
self.as_raw_ptr(),
r,
null(),
)
},
ResponseData::None => unsafe {
libdeno::deno_respond(
self.libdeno_isolate,
self.as_raw_ptr(),
deno_buf::empty(),
null(),
)
},
};
unsafe {
libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
}
if let Some(err) = self.last_exception() {
Err(err)
} else {
@ -525,7 +575,7 @@ impl Future for Isolate {
self.shared_init();
let mut overflow_response: Option<Buf> = None;
let mut overflow_response: Option<(c_int, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
@ -546,13 +596,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
Ok(Ready(Some(buf))) => {
let successful_push = self.shared.push(&buf);
Ok(Ready(Some(op))) => {
let successful_push = self.shared.push(op.0, &op.1);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(buf);
overflow_response = Some(op);
break;
}
}
@ -560,14 +610,16 @@ impl Future for Isolate {
}
if self.shared.size() > 0 {
self.respond(None)?;
self.respond(ResponseData::None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if overflow_response.is_some() {
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
let op = overflow_response.take().unwrap();
let promise_id_bytes = op.0.to_be_bytes();
let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into();
self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?;
}
self.check_promise_errors();
@ -664,7 +716,7 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
isolate.set_dispatch(move |control, _| -> Op {
isolate.set_dispatch(move |control, _| -> CoreOp {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
@ -834,7 +886,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
Deno.core.setAsyncHandler((buf) => {
Deno.core.setAsyncHandler((promiseId, buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
nrecv++;
@ -1025,7 +1077,7 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
@ -1047,7 +1099,7 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
@ -1068,7 +1120,7 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler((buf) => {
Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
@ -1076,8 +1128,8 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
// Async messages always have null response.
assert(response == null);
// Async messages always have number type response.
assert(typeof response == "number");
assert(asyncRecv == 0);
"#,
));
@ -1097,7 +1149,7 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler((buf) => {
Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@ -1105,7 +1157,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
assert(response == null);
assert(typeof response == "number");
assert(asyncRecv == 0);
"#,
));
@ -1125,7 +1177,7 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler((buf) => {
Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@ -1133,7 +1185,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
assert(response == null);
assert(typeof response == "number");
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows

View File

@ -267,6 +267,7 @@ extern "C" {
i: *const isolate,
user_data: *const c_void,
buf: deno_buf,
promise_id: *const c_int,
);
pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf);
pub fn deno_execute(

View File

@ -153,11 +153,15 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) {
auto _ = deno::PinnedBuf(buf);
}
void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
void deno_respond(Deno* d_, void* user_data, deno_buf buf, int* promise_id) {
auto* d = unwrap(d_);
if (d->current_args_ != nullptr) {
// Synchronous response.
if (buf.data_ptr != nullptr) {
if (promise_id != nullptr) {
auto number = v8::Number::New(d->isolate_, *promise_id);
d->current_args_->GetReturnValue().Set(number);
} else {
CHECK_NOT_NULL(buf.data_ptr);
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
}

View File

@ -81,8 +81,10 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// deno_respond sends up to one message back for every deno_recv_cb made.
//
// If this is called during deno_recv_cb, the issuing libdeno.send() in
// javascript will synchronously return the specified buf as an ArrayBuffer (or
// null if buf is empty).
// javascript will synchronously return the specified promise_id(number)
// or buf(Uint8Array) (or null if buf and promise_id are both null/empty).
// Calling with non-null for both buf and promise_id will result in the
// promise_id being returned.
//
// If this is called after deno_recv_cb has returned, the deno_respond
// will call into the JS callback specified by libdeno.recv().
@ -92,7 +94,7 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// releasing its memory.)
//
// If a JS exception was encountered, deno_last_exception() will be non-NULL.
void deno_respond(Deno* d, void* user_data, deno_buf buf);
void deno_respond(Deno* d, void* user_data, deno_buf buf, int* promise_id);
// consumes zero_copy
void deno_pinned_buf_delete(deno_pinned_buf* buf);

View File

@ -12,14 +12,13 @@ interface EvalErrorInfo {
thrown: any;
}
declare interface MessageCallback {
(msg: Uint8Array): void;
}
declare type MessageCallbackInternal = (msg: Uint8Array) => void;
declare interface DenoCore {
recv(cb: MessageCallback): void;
recv(cb: MessageCallbackInternal): void;
send(
cmdId: number,
control: null | ArrayBufferView,
data?: ArrayBufferView
): null | Uint8Array;

View File

@ -75,7 +75,7 @@ TEST(LibDenoTest, RecvReturnBar) {
EXPECT_EQ(buf.data_ptr[1], 'b');
EXPECT_EQ(buf.data_ptr[2], 'c');
uint8_t response[] = {'b', 'a', 'r'};
deno_respond(d, user_data, {response, sizeof response});
deno_respond(d, user_data, {response, sizeof response}, nullptr);
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr});
deno_execute(d, d, "a.js", "RecvReturnBar()");

View File

@ -151,14 +151,27 @@ SharedQueue Binary Layout
function handleAsyncMsgFromRust(buf) {
if (buf) {
asyncHandler(buf);
handleAsyncMsgFromRustInner(buf);
} else {
while ((buf = shift()) != null) {
asyncHandler(buf);
handleAsyncMsgFromRustInner(buf);
}
}
}
function handleAsyncMsgFromRustInner(buf) {
// DataView to extract cmdId value.
const dataView = new DataView(buf.buffer, buf.byteOffset, 4);
const promiseId = dataView.getInt32(0);
// Uint8 buffer view shifted right and shortened 4 bytes to remove cmdId from view window.
const bufViewFinal = new Uint8Array(
buf.buffer,
buf.byteOffset + 4,
buf.byteLength - 4
);
asyncHandler(promiseId, bufViewFinal);
}
function dispatch(control, zeroCopy = null) {
maybeInit();
// First try to push control to shared.

View File

@ -17,6 +17,7 @@ SharedQueue Binary Layout
*/
use crate::libdeno::deno_buf;
use libc::c_int;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@ -152,17 +153,19 @@ impl SharedQueue {
Some(&self.bytes[off..end])
}
pub fn push(&mut self, record: &[u8]) -> bool {
pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool {
let off = self.head();
let end = off + record.len();
let end = off + record.len() + 4;
let index = self.num_records();
if end > self.bytes.len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
self.set_end(index, end);
assert_eq!(end - off, record.len());
self.bytes[off..end].copy_from_slice(record);
assert_eq!(end - off, record.len() + 4);
let pid_bytes = promise_id.to_be_bytes();
self.bytes[off..off + 4].copy_from_slice(&pid_bytes);
self.bytes[off + 4..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = end as u32;
@ -189,30 +192,30 @@ mod tests {
assert!(h > 0);
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
let len = r.len() + h;
assert!(q.push(&r));
let len = r.len() + h + 4;
assert!(q.push(1, &r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
assert!(q.push(&r));
assert!(q.push(1, &r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
assert!(q.push(&r));
assert!(q.push(1, &r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
let r = q.shift().unwrap();
assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
let r = q.shift().unwrap();
assert_eq!(r, vec![6, 7].as_slice());
assert_eq!(&r[4..], vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
let r = q.shift().unwrap();
assert_eq!(r, vec![8, 9, 10, 11].as_slice());
assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
@ -232,19 +235,19 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1)));
assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2))));
assert_eq!(q.size(), 1);
assert!(!q.push(&alloc_buf(2)));
assert!(!q.push(1, &alloc_buf(2)));
assert_eq!(q.size(), 1);
assert!(q.push(&alloc_buf(1)));
assert!(q.push(1, &alloc_buf(1)));
assert_eq!(q.size(), 2);
assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4);
assert_eq!(q.size(), 1);
assert!(!q.push(&alloc_buf(1)));
assert!(!q.push(1, &alloc_buf(1)));
assert_eq!(q.shift().unwrap().len(), 1);
assert_eq!(q.shift().unwrap().len(), 1 + 4);
assert_eq!(q.size(), 0);
}
@ -252,11 +255,11 @@ mod tests {
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
assert!(q.push(&alloc_buf(1)))
assert!(q.push(1, &alloc_buf(1)))
}
assert_eq!(q.push(&alloc_buf(1)), false);
assert_eq!(q.push(1, &alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
assert_eq!(q.shift().unwrap().len(), 1);
assert_eq!(q.push(&alloc_buf(1)), false);
assert_eq!(q.shift().unwrap().len(), 1 + 4);
assert_eq!(q.push(1, &alloc_buf(1)), false);
}
}

View File

@ -5,36 +5,30 @@ import * as msg from "gen/cli/msg_generated";
import * as errors from "./errors";
import * as util from "./util";
import {
nextPromiseId,
recordFromBufMinimal,
handleAsyncMsgFromRustMinimal
} from "./dispatch_minimal";
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();
interface FlatbufferRecord {
promiseId: number;
base: msg.Base;
}
function flatbufferRecordFromBuf(buf: Uint8Array): FlatbufferRecord {
function flatbufferRecordFromBuf(buf: Uint8Array): msg.Base {
const bb = new flatbuffers.ByteBuffer(buf);
const base = msg.Base.getRootAsBase(bb);
return {
promiseId: base.cmdId(),
base
};
return base;
}
export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
export function handleAsyncMsgFromRust(
promiseId: number,
ui8: Uint8Array
): void {
const buf32 = new Int32Array(ui8.buffer, ui8.byteOffset, ui8.byteLength / 4);
const recordMin = recordFromBufMinimal(buf32);
if (recordMin) {
// Fast and new
handleAsyncMsgFromRustMinimal(ui8, recordMin);
handleAsyncMsgFromRustMinimal(promiseId, ui8, recordMin);
} else {
// Legacy
let { promiseId, base } = flatbufferRecordFromBuf(ui8);
let base = flatbufferRecordFromBuf(ui8);
const promise = promiseTable.get(promiseId);
util.assert(promise != null, `Expecting promise in table. ${promiseId}`);
promiseTable.delete(promiseId);
@ -56,14 +50,26 @@ function sendInternal(
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
sync = true
): [number, null | Uint8Array] {
const cmdId = nextPromiseId();
isSync: true
): Uint8Array | null;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: false
): Promise<msg.Base>;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: boolean
): Promise<msg.Base> | Uint8Array | null {
msg.Base.startBase(builder);
msg.Base.addSync(builder, isSync);
msg.Base.addInner(builder, inner);
msg.Base.addInnerType(builder, innerType);
msg.Base.addSync(builder, sync);
msg.Base.addCmdId(builder, cmdId);
builder.finish(msg.Base.endBase(builder));
const control = builder.asUint8Array();
@ -74,7 +80,25 @@ function sendInternal(
);
builder.inUse = false;
return [cmdId, response];
if (typeof response === "number") {
const promise = util.createResolvable<msg.Base>();
promiseTable.set(response, promise);
util.assert(!isSync);
return promise;
} else {
if (!isSync) {
util.assert(response !== null);
const base = flatbufferRecordFromBuf(response as Uint8Array);
const err = errors.maybeError(base);
if (err != null) {
return Promise.reject(err);
} else {
return Promise.resolve(base);
}
}
return response;
}
}
// @internal
@ -84,16 +108,7 @@ export function sendAsync(
inner: flatbuffers.Offset,
data?: ArrayBufferView
): Promise<msg.Base> {
const [cmdId, response] = sendInternal(
builder,
innerType,
inner,
data,
false
);
util.assert(response == null); // null indicates async.
const promise = util.createResolvable<msg.Base>();
promiseTable.set(cmdId, promise);
const promise = sendInternal(builder, innerType, inner, data, false);
return promise;
}
@ -104,10 +119,8 @@ export function sendSync(
inner: flatbuffers.Offset,
data?: ArrayBufferView
): null | msg.Base {
const [cmdId, response] = sendInternal(builder, innerType, inner, data, true);
util.assert(cmdId >= 0);
util.assert(response != null); // null indicates async.
if (response!.length === 0) {
const response = sendInternal(builder, innerType, inner, data, true);
if (response == null || response.length === 0) {
return null;
} else {
const bb = new flatbuffers.ByteBuffer(response!);

View File

@ -5,14 +5,8 @@ import { core } from "./core";
const DISPATCH_MINIMAL_TOKEN = 0xcafe;
const promiseTableMin = new Map<number, util.Resolvable<number>>();
let _nextPromiseId = 0;
export function nextPromiseId(): number {
return _nextPromiseId++;
}
export interface RecordMinimal {
promiseId: number;
opId: number;
arg: number;
result: number;
@ -28,10 +22,9 @@ export function hasMinimalToken(i32: Int32Array): boolean {
export function recordFromBufMinimal(buf32: Int32Array): null | RecordMinimal {
if (hasMinimalToken(buf32)) {
return {
promiseId: buf32[1],
opId: buf32[2],
arg: buf32[3],
result: buf32[4]
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
};
}
return null;
@ -46,12 +39,13 @@ const scratchBytes = new Uint8Array(
util.assert(scratchBytes.byteLength === scratch32.length * 4);
export function handleAsyncMsgFromRustMinimal(
promiseId: number,
ui8: Uint8Array,
record: RecordMinimal
): void {
// Fast and new
util.log("minimal handleAsyncMsgFromRust ", ui8.length);
const { promiseId, result } = record;
const { result } = record;
const promise = promiseTableMin.get(promiseId);
promiseTableMin.delete(promiseId);
promise!.resolve(result);
@ -62,16 +56,16 @@ export function sendAsyncMinimal(
arg: number,
zeroCopy: Uint8Array
): Promise<number> {
const promiseId = nextPromiseId(); // AKA cmdId
scratch32[0] = DISPATCH_MINIMAL_TOKEN;
scratch32[1] = promiseId;
scratch32[2] = opId;
scratch32[3] = arg;
scratch32[1] = opId;
scratch32[2] = arg;
const promiseId = core.dispatch(scratchBytes, zeroCopy);
util.assert(typeof promiseId == "number");
const promise = util.createResolvable<number>();
promiseTableMin.set(promiseId, promise);
promiseTableMin.set(promiseId as number, promise);
core.dispatch(scratchBytes, zeroCopy);
return promise;
}