Port readSync/writeSync ops to minimal

This removes dispatch_flatbuffers as it is now unused. There are still a
few places where msg_generated is used: ErrorKind and MediaType. These
will be dealt with later.
This commit is contained in:
Ryan Dahl 2019-08-26 10:58:44 -04:00
parent 7ff67017f2
commit d8ada4d3fc
16 changed files with 46 additions and 494 deletions

View file

@ -82,7 +82,6 @@ ts_sources = [
"../js/diagnostics.ts",
"../js/dir.ts",
"../js/dispatch.ts",
"../js/dispatch_flatbuffers.ts",
"../js/dispatch_json.ts",
"../js/dispatch_minimal.ts",
"../js/dom_file.ts",

View file

@ -1,11 +1,3 @@
union Any {
Read,
ReadRes,
Seek,
Write,
WriteRes,
}
enum ErrorKind: byte {
NoError = 0,
@ -73,62 +65,9 @@ enum ErrorKind: byte {
JSError,
}
table Cwd {}
table CwdRes {
cwd: string;
}
enum MediaType: byte {
JavaScript = 0,
TypeScript,
Json,
Unknown
}
table Base {
cmd_id: uint32;
sync: bool = false;
error_kind: ErrorKind = NoError;
error: string;
inner: Any;
}
table FormatError {
error: string;
}
table FormatErrorRes {
error: string;
}
table KeyValue {
key: string;
value: string;
}
table Read {
rid: uint32;
// (ptr, len) is passed as second parameter to Deno.core.send().
}
table ReadRes {
nread: uint;
eof: bool;
}
table Write {
rid: uint32;
}
table WriteRes {
nbyte: uint;
}
table Seek {
rid: uint32;
offset: int;
whence: uint;
}
root_type Base;

View file

@ -1,128 +0,0 @@
use super::utils::CliOpResult;
use crate::deno_error::GetErrorKind;
use crate::msg;
use crate::state::ThreadSafeState;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use hyper::rt::Future;
type CliDispatchFn = fn(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
) -> CliOpResult;
use super::files::{op_read, op_write};
/// Processes raw messages from JavaScript.
/// This functions invoked every time Deno.core.dispatch() is called.
/// control corresponds to the first argument of Deno.core.dispatch().
/// data corresponds to the second argument of Deno.core.dispatch().
pub fn dispatch(
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let base = msg::get_root_as_base(&control);
let inner_type = base.inner_type();
let is_sync = base.sync();
let cmd_id = base.cmd_id();
debug!(
"msg_from_js {} sync {}",
msg::enum_name_any(inner_type),
is_sync
);
let op_func: CliDispatchFn = match op_selector_std(inner_type) {
Some(v) => v,
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};
let op_result = op_func(state, &base, zero_copy);
match op_result {
Ok(Op::Sync(buf)) => Op::Sync(buf),
Ok(Op::Async(fut)) => {
let result_fut = Box::new(
fut
.or_else(move |err: ErrBox| -> Result<Buf, ()> {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
))
})
.and_then(move |buf: Buf| -> Result<Buf, ()> {
// Handle empty responses. For sync responses we just want
// to send null. For async we want to send a small message
// with the cmd_id.
let buf = if buf.len() > 0 {
buf
} else {
let builder = &mut FlatBufferBuilder::new();
serialize_response(
cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
)
};
Ok(buf)
})
.map_err(|err| panic!("unexpected error {:?}", err)),
);
Op::Async(result_fut)
}
Err(err) => {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
let response_buf = serialize_response(
cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
);
Op::Sync(response_buf)
}
}
}
pub fn serialize_response(
cmd_id: u32,
builder: &mut FlatBufferBuilder<'_>,
mut args: msg::BaseArgs<'_>,
) -> Buf {
args.cmd_id = cmd_id;
let base = msg::Base::create(builder, &args);
msg::finish_base_buffer(builder, base);
let data = builder.finished_data();
// println!("serialize_response {:x?}", data);
data.into()
}
/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
match inner_type {
msg::Any::Read => Some(op_read),
msg::Any::Write => Some(op_write),
_ => None,
}
}

View file

@ -80,9 +80,6 @@ pub fn dispatch(
) -> CoreOp {
let mut record = parse_min_record(control).unwrap();
let is_sync = record.promise_id == 0;
// TODO(ry) Currently there aren't any sync minimal ops. This is just a sanity
// check. Remove later.
assert!(!is_sync);
let rid = record.arg;
let min_op = d(rid, zero_copy);
@ -102,6 +99,11 @@ pub fn dispatch(
}));
if is_sync {
// Warning! Possible deadlocks can occur if we try to wait for a future
// while in a future. The safe but expensive alternative is to use
// tokio_util::block_on.
// This block is only exercised for readSync and writeSync, which I think
// works since they're simple polling futures.
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)

View file

@ -1,15 +1,10 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_flatbuffers::serialize_response;
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::utils::*;
use crate::deno_error;
use crate::fs as deno_fs;
use crate::msg;
use crate::resources;
use crate::state::ThreadSafeState;
use crate::tokio_write;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use futures::Future;
use std;
use std::convert::From;
@ -118,91 +113,6 @@ pub fn op_close(
}
}
pub fn op_read(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_read().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
None => Err(deno_error::bad_resource()),
Some(resource) => {
let op = tokio::io::read(resource, data.unwrap())
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nread)| {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::ReadRes::create(
builder,
&msg::ReadResArgs {
nread: nread as u32,
eof: nread == 0,
},
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::ReadRes,
..Default::default()
},
))
});
if base.sync() {
let buf = op.wait()?;
Ok(Op::Sync(buf))
} else {
Ok(Op::Async(Box::new(op)))
}
}
}
}
pub fn op_write(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_write().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
None => Err(deno_error::bad_resource()),
Some(resource) => {
let op = tokio_write::write(resource, data.unwrap())
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nwritten)| {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::WriteRes::create(
builder,
&msg::WriteResArgs {
nbyte: nwritten as u32,
},
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::WriteRes,
..Default::default()
},
))
});
if base.sync() {
let buf = op.wait()?;
Ok(Op::Sync(buf))
} else {
Ok(Op::Async(Box::new(op)))
}
}
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct SeekArgs {

View file

@ -3,7 +3,6 @@ use crate::state::ThreadSafeState;
use deno::*;
mod compiler;
mod dispatch_flatbuffers;
mod dispatch_json;
mod dispatch_minimal;
mod errors;
@ -26,7 +25,6 @@ mod workers;
// Warning! These values are duplicated in the TypeScript code (js/dispatch.ts),
// update with care.
pub const OP_FLATBUFFER: OpId = 100;
pub const OP_READ: OpId = 1;
pub const OP_WRITE: OpId = 2;
pub const OP_EXIT: OpId = 3;
@ -296,7 +294,6 @@ pub fn dispatch(
dispatch_json::dispatch(fs::op_make_temp_dir, state, control, zero_copy)
}
OP_CWD => dispatch_json::dispatch(fs::op_cwd, state, control, zero_copy),
OP_FLATBUFFER => dispatch_flatbuffers::dispatch(state, control, zero_copy),
_ => panic!("bad op_id"),
};

View file

@ -1,6 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as ts from "typescript";
import { assetSourceCode } from "./assets";
import { bold, cyan, yellow } from "./colors";
import { Console } from "./console";
@ -9,7 +8,6 @@ import { Diagnostic, fromTypeScriptDiagnostic } from "./diagnostics";
import { cwd } from "./dir";
import * as dispatch from "./dispatch";
import { sendSync } from "./dispatch_json";
import { msg } from "./dispatch_flatbuffers";
import * as os from "./os";
import { TextEncoder } from "./text_encoding";
import { getMappedModuleName, parseTypeDirectives } from "./type_directives";
@ -19,6 +17,9 @@ import { window } from "./window";
import { postMessage, workerClose, workerMain } from "./workers";
import { writeFileSync } from "./write_file";
// TODO(ry) msg_generated import will be removed soon.
import * as msg from "gen/cli/msg_generated";
// Startup boilerplate. This is necessary because the compiler has its own
// snapshot. (It would be great if we could remove these things or centralize
// them somewhere else.)

View file

@ -1,10 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as minimal from "./dispatch_minimal";
import * as flatbuffers from "./dispatch_flatbuffers";
import * as json from "./dispatch_json";
// These consts are shared with Rust. Update with care.
export const OP_FLATBUFFER = 100;
export const OP_READ = 1;
export const OP_WRITE = 2;
export const OP_EXIT = 3;
@ -64,9 +62,6 @@ export const OP_CWD = 56;
export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void {
switch (opId) {
case OP_FLATBUFFER:
flatbuffers.asyncMsgFromRust(opId, ui8);
break;
case OP_WRITE:
case OP_READ:
minimal.asyncMsgFromRust(opId, ui8);

View file

@ -1,151 +0,0 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as flatbuffers from "./flatbuffers";
import { DenoError } from "./errors";
import { core } from "./core";
import * as msg from "gen/cli/msg_generated";
import * as util from "./util";
import { OP_FLATBUFFER } from "./dispatch";
export { msg, flatbuffers };
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();
let _nextPromiseId = 1;
export function nextPromiseId(): number {
return _nextPromiseId++;
}
interface FlatbufferRecord {
promiseId: number;
base: msg.Base;
}
export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void {
let { promiseId, base } = flatbufferRecordFromBuf(ui8);
const promise = promiseTable.get(promiseId);
util.assert(promise != null, `Expecting promise in table. ${promiseId}`);
promiseTable.delete(promiseId);
const err = maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
}
function flatbufferRecordFromBuf(buf: Uint8Array): FlatbufferRecord {
const bb = new flatbuffers.ByteBuffer(buf);
const base = msg.Base.getRootAsBase(bb);
return {
promiseId: base.cmdId(),
base
};
}
function ui8FromArrayBufferView(abv: ArrayBufferView): Uint8Array {
return new Uint8Array(abv.buffer, abv.byteOffset, abv.byteLength);
}
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: true
): Uint8Array;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: false
): Promise<msg.Base>;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
isSync: boolean
): Promise<msg.Base> | Uint8Array {
const cmdId = nextPromiseId();
msg.Base.startBase(builder);
msg.Base.addInner(builder, inner);
msg.Base.addInnerType(builder, innerType);
msg.Base.addSync(builder, isSync);
msg.Base.addCmdId(builder, cmdId);
builder.finish(msg.Base.endBase(builder));
const control = builder.asUint8Array();
const response = core.dispatch(
OP_FLATBUFFER, // TODO(ry) Use actual opId later.
control,
zeroCopy ? ui8FromArrayBufferView(zeroCopy) : undefined
);
builder.inUse = false;
if (response == null) {
util.assert(!isSync);
const promise = util.createResolvable<msg.Base>();
promiseTable.set(cmdId, promise);
return promise;
} else {
if (!isSync) {
// We can easily and correctly allow for sync responses to async calls
// by creating and returning a promise from the sync response.
const bb = new flatbuffers.ByteBuffer(response);
const base = msg.Base.getRootAsBase(bb);
const err = maybeError(base);
if (err != null) {
return Promise.reject(err);
} else {
return Promise.resolve(base);
}
}
return response;
}
}
// @internal
export function sendAsync(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
data?: ArrayBufferView
): Promise<msg.Base> {
return sendInternal(builder, innerType, inner, data, false);
}
// @internal
export function sendSync(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
data?: ArrayBufferView
): null | msg.Base {
const response = sendInternal(builder, innerType, inner, data, true);
if (response!.length === 0) {
return null;
} else {
const bb = new flatbuffers.ByteBuffer(response!);
const baseRes = msg.Base.getRootAsBase(bb);
maybeThrowError(baseRes);
return baseRes;
}
}
function maybeError(base: msg.Base): null | DenoError<msg.ErrorKind> {
const kind = base.errorKind();
if (kind === msg.ErrorKind.NoError) {
return null;
} else {
return new DenoError(kind, base.error()!);
}
}
function maybeThrowError(base: msg.Base): void {
const err = maybeError(base);
if (err != null) {
throw err;
}
}

View file

@ -4,6 +4,9 @@ import * as util from "./util";
import { core } from "./core";
const promiseTableMin = new Map<number, util.Resolvable<number>>();
// Note it's important that promiseId starts at 1 instead of 0, because sync
// messages are indicated with promiseId 0. If we ever add wrap around logic for
// overflows, this should be taken into account.
let _nextPromiseId = 1;
function nextPromiseId(): number {
@ -63,3 +66,16 @@ export function sendAsyncMinimal(
core.dispatch(opId, scratchBytes, zeroCopy);
return promise;
}
export function sendSyncMinimal(
opId: number,
arg: number,
zeroCopy: Uint8Array
): number {
scratch32[0] = 0; // promiseId 0 indicates sync
scratch32[1] = arg;
const res = core.dispatch(opId, scratchBytes, zeroCopy)!;
const res32 = new Int32Array(res.buffer, res.byteOffset, 3);
const resRecord = recordFromBufMinimal(opId, res32);
return resRecord.result;
}

View file

@ -10,15 +10,12 @@ import {
SyncWriter,
SyncSeeker
} from "./io";
import { sendAsyncMinimal } from "./dispatch_minimal";
import { assert } from "./util";
import { sendAsyncMinimal, sendSyncMinimal } from "./dispatch_minimal";
import * as dispatch from "./dispatch";
import {
sendSync as sendSyncJson,
sendAsync as sendAsyncJson
} from "./dispatch_json";
import { sendSync, msg, flatbuffers } from "./dispatch_flatbuffers";
import { OP_READ, OP_WRITE } from "./dispatch";
/** Open a file and return an instance of the `File` object
* synchronously.
@ -44,26 +41,6 @@ export async function open(
return new File(rid);
}
function reqRead(
rid: number,
p: Uint8Array
): [flatbuffers.Builder, msg.Any, flatbuffers.Offset, Uint8Array] {
const builder = flatbuffers.createBuilder();
const inner = msg.Read.createRead(builder, rid);
return [builder, msg.Any.Read, inner, p];
}
function resRead(baseRes: null | msg.Base): number | EOF {
assert(baseRes != null);
assert(msg.Any.ReadRes === baseRes!.innerType());
const res = new msg.ReadRes();
assert(baseRes!.inner(res) != null);
if (res.eof()) {
return EOF;
}
return res.nread();
}
/** Read synchronously from a file ID into an array buffer.
*
* Return `number | EOF` for the operation.
@ -75,7 +52,14 @@ function resRead(baseRes: null | msg.Base): number | EOF {
*
*/
export function readSync(rid: number, p: Uint8Array): number | EOF {
return resRead(sendSync(...reqRead(rid, p)));
const nread = sendSyncMinimal(dispatch.OP_READ, rid, p);
if (nread < 0) {
throw new Error("read error");
} else if (nread == 0) {
return EOF;
} else {
return nread;
}
}
/** Read from a file ID into an array buffer.
@ -90,7 +74,7 @@ export function readSync(rid: number, p: Uint8Array): number | EOF {
* })();
*/
export async function read(rid: number, p: Uint8Array): Promise<number | EOF> {
const nread = await sendAsyncMinimal(OP_READ, rid, p);
const nread = await sendAsyncMinimal(dispatch.OP_READ, rid, p);
if (nread < 0) {
throw new Error("read error");
} else if (nread == 0) {
@ -100,23 +84,6 @@ export async function read(rid: number, p: Uint8Array): Promise<number | EOF> {
}
}
function reqWrite(
rid: number,
p: Uint8Array
): [flatbuffers.Builder, msg.Any, flatbuffers.Offset, Uint8Array] {
const builder = flatbuffers.createBuilder();
const inner = msg.Write.createWrite(builder, rid);
return [builder, msg.Any.Write, inner, p];
}
function resWrite(baseRes: null | msg.Base): number {
assert(baseRes != null);
assert(msg.Any.WriteRes === baseRes!.innerType());
const res = new msg.WriteRes();
assert(baseRes!.inner(res) != null);
return res.nbyte();
}
/** Write synchronously to the file ID the contents of the array buffer.
*
* Resolves with the number of bytes written.
@ -127,7 +94,12 @@ function resWrite(baseRes: null | msg.Base): number {
* Deno.writeSync(file.rid, data);
*/
export function writeSync(rid: number, p: Uint8Array): number {
return resWrite(sendSync(...reqWrite(rid, p)));
const result = sendSyncMinimal(dispatch.OP_WRITE, rid, p);
if (result < 0) {
throw new Error("write error");
} else {
return result;
}
}
/** Write to the file ID the contents of the array buffer.
@ -143,7 +115,7 @@ export function writeSync(rid: number, p: Uint8Array): number {
*
*/
export async function write(rid: number, p: Uint8Array): Promise<number> {
let result = await sendAsyncMinimal(OP_WRITE, rid, p);
let result = await sendAsyncMinimal(dispatch.OP_WRITE, rid, p);
if (result < 0) {
throw new Error("write error");
} else {

View file

@ -2,7 +2,7 @@
[WILDCARD] js/dispatch_json.ts:[WILDCARD]
at DenoError (js/errors.ts:[WILDCARD])
at toDenoError (js/dispatch_json.ts:[WILDCARD])
at sendSync$1 (js/dispatch_json.ts:[WILDCARD])
at sendSync[WILDCARD] (js/dispatch_json.ts:[WILDCARD])
at fetchSourceFile (js/compiler.ts:[WILDCARD])
at _resolveModule (js/compiler.ts:[WILDCARD])
at js/compiler.ts:[WILDCARD]

View file

@ -2,7 +2,7 @@
[WILDCARD] js/dispatch_json.ts:[WILDCARD]
at DenoError (js/errors.ts:[WILDCARD])
at toDenoError (js/dispatch_json.ts:[WILDCARD])
at sendSync$1 (js/dispatch_json.ts:[WILDCARD])
at sendSync[WILDCARD] (js/dispatch_json.ts:[WILDCARD])
at fetchSourceFile (js/compiler.ts:[WILDCARD])
at _resolveModule (js/compiler.ts:[WILDCARD])
at js/compiler.ts:[WILDCARD]

View file

@ -2,7 +2,7 @@
[WILDCARD] js/dispatch_json.ts:[WILDCARD]
at DenoError (js/errors.ts:[WILDCARD])
at toDenoError (js/dispatch_json.ts:[WILDCARD])
at sendSync$1 (js/dispatch_json.ts:[WILDCARD])
at sendSync[WILDCARD] (js/dispatch_json.ts:[WILDCARD])
at fetchSourceFile (js/compiler.ts:[WILDCARD])
at _resolveModule (js/compiler.ts:[WILDCARD])
at js/compiler.ts:[WILDCARD]

View file

@ -2,7 +2,7 @@
[WILDCARD] js/dispatch_json.ts:[WILDCARD]
at DenoError (js/errors.ts:[WILDCARD])
at toDenoError (js/dispatch_json.ts:[WILDCARD])
at sendSync$1 (js/dispatch_json.ts:[WILDCARD])
at sendSync[WILDCARD] (js/dispatch_json.ts:[WILDCARD])
at fetchSourceFile (js/compiler.ts:[WILDCARD])
at _resolveModule (js/compiler.ts:[WILDCARD])
at js/compiler.ts:[WILDCARD]

View file

@ -2,7 +2,7 @@
[WILDCARD] js/dispatch_json.ts:[WILDCARD]
at DenoError (js/errors.ts:[WILDCARD])
at toDenoError (js/dispatch_json.ts:[WILDCARD])
at sendSync$1 (js/dispatch_json.ts:[WILDCARD])
at sendSync[WILDCARD] (js/dispatch_json.ts:[WILDCARD])
at fetchSourceFile (js/compiler.ts:[WILDCARD])
at _resolveModule (js/compiler.ts:[WILDCARD])
at js/compiler.ts:[WILDCARD]