A new low-level crate with focus on speed. 
This doesn't yet hook into the existing code base.
This commit is contained in:
Ryan Dahl 2019-02-26 17:36:05 -05:00 committed by GitHub
parent 5dfbbbb07a
commit b8a537d020
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1478 additions and 120 deletions

View file

@ -13,6 +13,8 @@ group("default") {
":deno",
":hyper_hello",
":test_rs",
"core:deno_core_http_bench",
"core:deno_core_test",
"libdeno:test_cc",
]
}

37
core/BUILD.gn Normal file
View file

@ -0,0 +1,37 @@
import("//build_extra/rust/rust.gni")
# deno_core does not depend on flatbuffers nor tokio.
main_extern = [
"$rust_build:futures",
"$rust_build:libc",
"$rust_build:serde_json",
"$rust_build:log",
]
rust_crate("deno_core") {
source_root = "lib.rs"
extern = main_extern
deps = [
"../libdeno:libdeno_static_lib",
]
}
rust_test("deno_core_test") {
source_root = "lib.rs"
extern = main_extern
deps = [
"../libdeno:libdeno_static_lib",
]
}
rust_executable("deno_core_http_bench") {
source_root = "http_bench.rs"
extern = [
"$rust_build:futures",
"$rust_build:lazy_static",
"$rust_build:libc",
"$rust_build:log",
"$rust_build:tokio",
":deno_core"
]
}

150
core/http_bench.js Normal file
View file

@ -0,0 +1,150 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
const shared32 = new Int32Array(libdeno.shared);
const INDEX_NUM_RECORDS = 0;
const INDEX_RECORDS = 1;
const RECORD_OFFSET_PROMISE_ID = 0;
const RECORD_OFFSET_OP = 1;
const RECORD_OFFSET_ARG = 2;
const RECORD_OFFSET_RESULT = 3;
const RECORD_SIZE = 4;
const OP_LISTEN = 1;
const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 5;
const NUM_RECORDS = (shared32.length - INDEX_RECORDS) / RECORD_SIZE;
if (NUM_RECORDS != 100) {
throw Error("expected 100 entries");
}
const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
.split("")
.map(c => c.charCodeAt(0))
);
const promiseMap = new Map();
let nextPromiseId = 1;
function createResolvable() {
let methods;
const promise = new Promise((resolve, reject) => {
methods = { resolve, reject };
});
return Object.assign(promise, methods);
}
/** Returns Promise<number> */
function sendAsync(op, arg, zeroCopyData) {
const id = nextPromiseId++;
const p = createResolvable();
shared32[INDEX_NUM_RECORDS] = 1;
setRecord(0, RECORD_OFFSET_PROMISE_ID, id);
setRecord(0, RECORD_OFFSET_OP, op);
setRecord(0, RECORD_OFFSET_ARG, arg);
setRecord(0, RECORD_OFFSET_RESULT, -1);
promiseMap.set(id, p);
libdeno.send(null, zeroCopyData);
return p;
}
/** Returns u32 number */
function sendSync(op, arg) {
shared32[INDEX_NUM_RECORDS] = 1;
setRecord(0, RECORD_OFFSET_PROMISE_ID, 0);
setRecord(0, RECORD_OFFSET_OP, op);
setRecord(0, RECORD_OFFSET_ARG, arg);
setRecord(0, RECORD_OFFSET_RESULT, -1);
libdeno.send();
return getRecord(0, RECORD_OFFSET_RESULT);
}
function setRecord(i, off, value) {
if (i >= NUM_RECORDS) {
throw Error("out of range");
}
shared32[INDEX_RECORDS + RECORD_SIZE * i + off] = value;
}
function getRecord(i, off) {
if (i >= NUM_RECORDS) {
throw Error("out of range");
}
return shared32[INDEX_RECORDS + RECORD_SIZE * i + off];
}
function handleAsyncMsgFromRust() {
for (let i = 0; i < shared32[INDEX_NUM_RECORDS]; i++) {
let id = getRecord(i, RECORD_OFFSET_PROMISE_ID);
const p = promiseMap.get(id);
promiseMap.delete(id);
p.resolve(getRecord(i, RECORD_OFFSET_RESULT));
}
}
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return sendSync(OP_LISTEN, -1);
}
/** Accepts a connection, returns rid. */
async function accept(rid) {
return await sendAsync(OP_ACCEPT, rid);
}
/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
return await sendAsync(OP_READ, rid, data);
}
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
return await sendAsync(OP_WRITE, rid, data);
}
function close(rid) {
return sendSync(OP_CLOSE, rid);
}
async function serve(rid) {
while (true) {
const nread = await read(rid, requestBuf);
if (nread <= 0) {
break;
}
const nwritten = await write(rid, responseBuf);
if (nwritten < 0) {
break;
}
}
close(rid);
}
async function main() {
libdeno.recv(handleAsyncMsgFromRust);
libdeno.print("http_bench.js start");
const listener_rid = listen();
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listener_rid}`);
while (true) {
const rid = await accept(listener_rid);
// libdeno.print(`accepted ${rid}`);
if (rid < 0) {
libdeno.print(`accept error ${rid}`);
return;
}
serve(rid);
}
}
main();

210
core/http_bench.rs Normal file
View file

@ -0,0 +1,210 @@
/// To run this benchmark:
///
/// > DENO_BUILD_MODE=release ./tools/build.py && \
/// ./target/release/deno_core_http_bench --multi-thread
extern crate deno_core;
extern crate futures;
extern crate libc;
extern crate tokio;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
use deno_core::deno_buf;
use deno_core::AsyncResult;
use deno_core::Isolate;
use deno_core::JSError;
use deno_core::Op;
use deno_core::RECORD_OFFSET_ARG;
use deno_core::RECORD_OFFSET_OP;
use deno_core::RECORD_OFFSET_PROMISE_ID;
use deno_core::RECORD_OFFSET_RESULT;
use futures::future::lazy;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use tokio::prelude::*;
const OP_LISTEN: i32 = 1;
const OP_ACCEPT: i32 = 2;
const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;
fn main() {
let js_source = include_str!("http_bench.js");
let isolate = deno_core::Isolate::new(recv_cb);
let main_future = lazy(move || {
// TODO currently isolate.execute() must be run inside tokio, hence the
// lazy(). It would be nice to not have that contraint. Probably requires
// using v8::MicrotasksPolicy::kExplicit
js_check(isolate.execute("http_bench.js", js_source));
isolate.then(|r| {
js_check(r);
Ok(())
})
});
let args: Vec<String> = env::args().collect();
if args.len() > 1 && args[1] == "--multi-thread" {
println!("multi-thread");
tokio::run(main_future);
} else {
println!("single-thread");
tokio::runtime::current_thread::run(main_future);
}
}
enum Repr {
TcpListener(tokio::net::TcpListener),
TcpStream(tokio::net::TcpStream),
}
type ResourceTable = HashMap<i32, Repr>;
lazy_static! {
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(HashMap::new());
static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3);
}
fn new_rid() -> i32 {
let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst);
rid as i32
}
fn recv_cb(isolate: &mut Isolate, zero_copy_buf: deno_buf) {
isolate.test_send_counter += 1; // TODO ideally store this in isolate.state?
let promise_id = isolate.shared.get_record(0, RECORD_OFFSET_PROMISE_ID);
let op_id = isolate.shared.get_record(0, RECORD_OFFSET_OP);
let arg = isolate.shared.get_record(0, RECORD_OFFSET_ARG);
// dbg!(promise_id);
// dbg!(op_id);
// dbg!(arg);
let is_sync = promise_id == 0;
if is_sync {
// sync ops
match op_id {
OP_CLOSE => {
debug!("close");
assert!(is_sync);
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&arg);
isolate.shared.set_record(
0,
RECORD_OFFSET_RESULT,
if r.is_some() { 0 } else { -1 },
);
}
OP_LISTEN => {
debug!("listen");
assert!(is_sync);
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
let rid = new_rid();
isolate.shared.set_record(0, RECORD_OFFSET_RESULT, rid);
let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpListener(listener));
}
_ => panic!("bad op"),
}
} else {
// async ops
let zero_copy_id = zero_copy_buf.zero_copy_id;
let op = match op_id {
OP_ACCEPT => {
let listener_rid = arg;
op_accept(listener_rid)
}
OP_READ => {
let rid = arg;
op_read(rid, zero_copy_buf)
}
OP_WRITE => {
let rid = arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op"),
};
isolate.add_op(promise_id, op, zero_copy_id);
}
}
fn op_accept(listener_rid: i32) -> Box<Op> {
debug!("accept {}", listener_rid);
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&listener_rid);
match maybe_repr {
Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(),
_ => panic!("bad rid"),
}
}).and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
let rid = new_rid();
let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpStream(stream));
Ok(AsyncResult { result: rid })
}),
)
}
fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> {
debug!("read rid={}", rid);
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::TcpStream(ref mut stream)) => {
stream.poll_read(&mut zero_copy_buf)
}
_ => panic!("bad rid"),
}
}).and_then(move |nread| {
debug!("read success {}", nread);
Ok(AsyncResult {
result: nread as i32,
})
}),
)
}
fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> {
debug!("write rid={}", rid);
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::TcpStream(ref mut stream)) => {
stream.poll_write(&zero_copy_buf)
}
_ => panic!("bad rid"),
}
}).and_then(move |nwritten| {
debug!("write success {}", nwritten);
Ok(AsyncResult {
result: nwritten as i32,
})
}),
)
}
fn js_check(r: Result<(), JSError>) {
if let Err(e) = r {
panic!(e.to_string());
}
}

416
core/js_errors.rs Normal file
View file

@ -0,0 +1,416 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
// Note that source_map_mappings requires 0-indexed line and column numbers but
// V8 Exceptions are 1-indexed.
// TODO: This currently only applies to uncaught exceptions. It would be nice to
// also have source maps for situations like this:
// const err = new Error("Boo!");
// console.log(err.stack);
// It would require calling into Rust from Error.prototype.prepareStackTrace.
use serde_json;
use std::fmt;
use std::str;
#[derive(Debug, PartialEq)]
pub struct StackFrame {
pub line: i64, // zero indexed
pub column: i64, // zero indexed
pub script_name: String,
pub function_name: String,
pub is_eval: bool,
pub is_constructor: bool,
pub is_wasm: bool,
}
#[derive(Debug, PartialEq)]
pub struct JSError {
pub message: String,
pub source_line: Option<String>,
pub script_resource_name: Option<String>,
pub line_number: Option<i64>,
pub start_position: Option<i64>,
pub end_position: Option<i64>,
pub error_level: Option<i64>,
pub start_column: Option<i64>,
pub end_column: Option<i64>,
pub frames: Vec<StackFrame>,
}
impl fmt::Display for StackFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Note when we print to string, we change from 0-indexed to 1-indexed.
let function_name = self.function_name.clone();
let script_line_column =
format_script_line_column(&self.script_name, self.line, self.column);
if !self.function_name.is_empty() {
write!(f, " at {} ({})", function_name, script_line_column)
} else if self.is_eval {
write!(f, " at eval ({})", script_line_column)
} else {
write!(f, " at {}", script_line_column)
}
}
}
fn format_script_line_column(
script_name: &str,
line: i64,
column: i64,
) -> String {
// TODO match this style with how typescript displays errors.
let line = (1 + line).to_string();
let column = (1 + column).to_string();
let script_name = script_name.to_string();
format!("{}:{}:{}", script_name, line, column)
}
impl fmt::Display for JSError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.script_resource_name.is_some() {
let script_resource_name = self.script_resource_name.as_ref().unwrap();
// Avoid showing internal code from gen/bundle/main.js
if script_resource_name != "gen/bundle/main.js"
&& script_resource_name != "gen/bundle/compiler.js"
{
if self.line_number.is_some() && self.start_column.is_some() {
assert!(self.line_number.is_some());
assert!(self.start_column.is_some());
let script_line_column = format_script_line_column(
script_resource_name,
self.line_number.unwrap() - 1,
self.start_column.unwrap() - 1,
);
write!(f, "{}", script_line_column)?;
}
if self.source_line.is_some() {
write!(f, "\n{}\n", self.source_line.as_ref().unwrap())?;
let mut s = String::new();
for i in 0..self.end_column.unwrap() {
if i >= self.start_column.unwrap() {
s.push('^');
} else {
s.push(' ');
}
}
writeln!(f, "{}", s)?;
}
}
}
write!(f, "{}", self.message.clone())?;
for frame in &self.frames {
write!(f, "\n{}", &frame.to_string())?;
}
Ok(())
}
}
impl StackFrame {
// TODO Maybe use serde_derive?
fn from_json_value(v: &serde_json::Value) -> Option<Self> {
if !v.is_object() {
return None;
}
let obj = v.as_object().unwrap();
let line_v = &obj["line"];
if !line_v.is_u64() {
return None;
}
let line = line_v.as_u64().unwrap() as i64;
let column_v = &obj["column"];
if !column_v.is_u64() {
return None;
}
let column = column_v.as_u64().unwrap() as i64;
let script_name_v = &obj["scriptName"];
if !script_name_v.is_string() {
return None;
}
let script_name = String::from(script_name_v.as_str().unwrap());
// Optional fields. See EncodeExceptionAsJSON() in libdeno.
// Sometimes V8 doesn't provide all the frame information.
let mut function_name = String::from(""); // default
if obj.contains_key("functionName") {
let function_name_v = &obj["functionName"];
if function_name_v.is_string() {
function_name = String::from(function_name_v.as_str().unwrap());
}
}
let mut is_eval = false; // default
if obj.contains_key("isEval") {
let is_eval_v = &obj["isEval"];
if is_eval_v.is_boolean() {
is_eval = is_eval_v.as_bool().unwrap();
}
}
let mut is_constructor = false; // default
if obj.contains_key("isConstructor") {
let is_constructor_v = &obj["isConstructor"];
if is_constructor_v.is_boolean() {
is_constructor = is_constructor_v.as_bool().unwrap();
}
}
let mut is_wasm = false; // default
if obj.contains_key("isWasm") {
let is_wasm_v = &obj["isWasm"];
if is_wasm_v.is_boolean() {
is_wasm = is_wasm_v.as_bool().unwrap();
}
}
Some(StackFrame {
line: line - 1,
column: column - 1,
script_name,
function_name,
is_eval,
is_constructor,
is_wasm,
})
}
}
impl JSError {
/// Creates a new JSError by parsing the raw exception JSON string from V8.
pub fn from_v8_exception(json_str: &str) -> Option<Self> {
let v = serde_json::from_str::<serde_json::Value>(json_str);
if v.is_err() {
return None;
}
let v = v.unwrap();
if !v.is_object() {
return None;
}
let obj = v.as_object().unwrap();
let message_v = &obj["message"];
if !message_v.is_string() {
return None;
}
let message = String::from(message_v.as_str().unwrap());
let source_line = obj
.get("sourceLine")
.and_then(|v| v.as_str().map(String::from));
let script_resource_name = obj
.get("scriptResourceName")
.and_then(|v| v.as_str().map(String::from));
let line_number = obj.get("lineNumber").and_then(|v| v.as_i64());
let start_position = obj.get("startPosition").and_then(|v| v.as_i64());
let end_position = obj.get("endPosition").and_then(|v| v.as_i64());
let error_level = obj.get("errorLevel").and_then(|v| v.as_i64());
let start_column = obj.get("startColumn").and_then(|v| v.as_i64());
let end_column = obj.get("endColumn").and_then(|v| v.as_i64());
let frames_v = &obj["frames"];
if !frames_v.is_array() {
return None;
}
let frame_values = frames_v.as_array().unwrap();
let mut frames = Vec::<StackFrame>::new();
for frame_v in frame_values {
match StackFrame::from_json_value(frame_v) {
None => return None,
Some(frame) => frames.push(frame),
}
}
Some(JSError {
message,
source_line,
script_resource_name,
line_number,
start_position,
end_position,
error_level,
start_column,
end_column,
frames,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn error1() -> JSError {
JSError {
message: "Error: foo bar".to_string(),
source_line: None,
script_resource_name: None,
line_number: None,
start_position: None,
end_position: None,
error_level: None,
start_column: None,
end_column: None,
frames: vec![
StackFrame {
line: 4,
column: 16,
script_name: "foo_bar.ts".to_string(),
function_name: "foo".to_string(),
is_eval: false,
is_constructor: false,
is_wasm: false,
},
StackFrame {
line: 5,
column: 20,
script_name: "bar_baz.ts".to_string(),
function_name: "qat".to_string(),
is_eval: false,
is_constructor: false,
is_wasm: false,
},
StackFrame {
line: 1,
column: 1,
script_name: "deno_main.js".to_string(),
function_name: "".to_string(),
is_eval: false,
is_constructor: false,
is_wasm: false,
},
],
}
}
#[test]
fn stack_frame_from_json_value_1() {
let v = serde_json::from_str::<serde_json::Value>(
r#"{
"line":2,
"column":11,
"functionName":"foo",
"scriptName":"/Users/rld/src/deno/tests/error_001.ts",
"isEval":true,
"isConstructor":false,
"isWasm":false
}"#,
).unwrap();
let r = StackFrame::from_json_value(&v);
assert_eq!(
r,
Some(StackFrame {
line: 1,
column: 10,
script_name: "/Users/rld/src/deno/tests/error_001.ts".to_string(),
function_name: "foo".to_string(),
is_eval: true,
is_constructor: false,
is_wasm: false,
})
);
}
#[test]
fn stack_frame_from_json_value_2() {
let v = serde_json::from_str::<serde_json::Value>(
r#"{
"scriptName": "/Users/rld/src/deno/tests/error_001.ts",
"line": 2,
"column": 11
}"#,
).unwrap();
let r = StackFrame::from_json_value(&v);
assert!(r.is_some());
let f = r.unwrap();
assert_eq!(f.line, 1);
assert_eq!(f.column, 10);
assert_eq!(f.script_name, "/Users/rld/src/deno/tests/error_001.ts");
}
#[test]
fn js_error_from_v8_exception() {
let r = JSError::from_v8_exception(
r#"{
"message":"Uncaught Error: bad",
"frames":[
{
"line":2,
"column":11,
"functionName":"foo",
"scriptName":"/Users/rld/src/deno/tests/error_001.ts",
"isEval":true,
"isConstructor":false,
"isWasm":false
}, {
"line":5,
"column":5,
"functionName":"bar",
"scriptName":"/Users/rld/src/deno/tests/error_001.ts",
"isEval":true,
"isConstructor":false,
"isWasm":false
}
]}"#,
);
assert!(r.is_some());
let e = r.unwrap();
assert_eq!(e.message, "Uncaught Error: bad");
assert_eq!(e.frames.len(), 2);
assert_eq!(
e.frames[0],
StackFrame {
line: 1,
column: 10,
script_name: "/Users/rld/src/deno/tests/error_001.ts".to_string(),
function_name: "foo".to_string(),
is_eval: true,
is_constructor: false,
is_wasm: false,
}
)
}
#[test]
fn js_error_from_v8_exception2() {
let r = JSError::from_v8_exception(
"{\"message\":\"Error: boo\",\"sourceLine\":\"throw Error('boo');\",\"scriptResourceName\":\"a.js\",\"lineNumber\":3,\"startPosition\":8,\"endPosition\":9,\"errorLevel\":8,\"startColumn\":6,\"endColumn\":7,\"isSharedCrossOrigin\":false,\"isOpaque\":false,\"frames\":[{\"line\":3,\"column\":7,\"functionName\":\"\",\"scriptName\":\"a.js\",\"isEval\":false,\"isConstructor\":false,\"isWasm\":false}]}"
);
assert!(r.is_some());
let e = r.unwrap();
assert_eq!(e.message, "Error: boo");
assert_eq!(e.source_line, Some("throw Error('boo');".to_string()));
assert_eq!(e.script_resource_name, Some("a.js".to_string()));
assert_eq!(e.line_number, Some(3));
assert_eq!(e.start_position, Some(8));
assert_eq!(e.end_position, Some(9));
assert_eq!(e.error_level, Some(8));
assert_eq!(e.start_column, Some(6));
assert_eq!(e.end_column, Some(7));
assert_eq!(e.frames.len(), 1);
}
#[test]
fn stack_frame_to_string() {
let e = error1();
assert_eq!(" at foo (foo_bar.ts:5:17)", &e.frames[0].to_string());
assert_eq!(" at qat (bar_baz.ts:6:21)", &e.frames[1].to_string());
}
#[test]
fn js_error_to_string() {
let e = error1();
let expected = "Error: foo bar\n at foo (foo_bar.ts:5:17)\n at qat (bar_baz.ts:6:21)\n at deno_main.js:2:2";
assert_eq!(expected, &e.to_string());
}
}

364
core/lib.rs Normal file
View file

@ -0,0 +1,364 @@
#[macro_use]
extern crate log;
extern crate futures;
extern crate libc;
mod js_errors;
mod libdeno;
mod shared;
pub use crate::js_errors::JSError;
pub use crate::libdeno::deno_buf;
pub use crate::shared::*;
use futures::Async;
use futures::Future;
use futures::Poll;
use libc::c_void;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
use std::sync::{Once, ONCE_INIT};
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
pending_ops: HashMap<i32, PendingOp>, // promise_id -> op
polled_recently: bool,
recv_cb: RecvCallback,
pub shared: Shared,
pub test_send_counter: u32, // TODO only used for testing- REMOVE.
}
pub type RecvCallback = fn(isolate: &mut Isolate, zero_copy_buf: deno_buf);
pub const NUM_RECORDS: usize = 100;
// TODO rename to AsyncResult
pub struct AsyncResult {
pub result: i32,
}
pub type Op = dyn Future<Item = AsyncResult, Error = std::io::Error> + Send;
struct PendingOp {
op: Box<Op>,
polled_recently: bool,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}
static DENO_INIT: Once = ONCE_INIT;
unsafe impl Send for Isolate {}
impl Isolate {
pub fn new(recv_cb: RecvCallback) -> Self {
DENO_INIT.call_once(|| {
unsafe { libdeno::deno_init() };
});
// Allocate unmanaged memory for the shared buffer by creating a Vec<u8>,
// grabbing the raw pointer, and then leaking the Vec so it is never freed.
let mut shared = Shared::new();
let shared_deno_buf = shared.as_deno_buf();
let config = libdeno::deno_config {
will_snapshot: 0,
load_snapshot: deno_buf::empty(), // TODO
shared: shared_deno_buf,
recv_cb: pre_dispatch,
};
let libdeno_isolate = unsafe { libdeno::deno_new(config) };
Self {
pending_ops: HashMap::new(),
polled_recently: false,
libdeno_isolate,
test_send_counter: 0,
recv_cb,
shared,
}
}
fn zero_copy_release(&self, zero_copy_id: usize) {
unsafe {
libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id)
}
}
pub fn add_op(
self: &mut Self,
promise_id: i32,
op: Box<Op>,
zero_copy_id: usize,
) {
debug!("add_op {}", zero_copy_id);
self.pending_ops.insert(
promise_id,
PendingOp {
op,
polled_recently: false,
zero_copy_id,
},
);
self.polled_recently = false;
}
#[inline]
pub unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self {
let ptr = ptr as *mut _;
&mut *ptr
}
#[inline]
pub fn as_raw_ptr(&self) -> *const c_void {
self as *const _ as *const c_void
}
pub fn execute(
&self,
js_filename: &str,
js_source: &str,
) -> Result<(), JSError> {
let filename = CString::new(js_filename).unwrap();
let source = CString::new(js_source).unwrap();
unsafe {
libdeno::deno_execute(
self.libdeno_isolate,
self.as_raw_ptr(),
filename.as_ptr(),
source.as_ptr(),
)
};
if let Some(err) = self.last_exception() {
return Err(err);
}
Ok(())
}
pub fn last_exception(&self) -> Option<JSError> {
let ptr = unsafe { libdeno::deno_last_exception(self.libdeno_isolate) };
if ptr.is_null() {
None
} else {
let cstr = unsafe { CStr::from_ptr(ptr) };
let v8_exception = cstr.to_str().unwrap();
debug!("v8_exception\n{}\n", v8_exception);
let js_error = JSError::from_v8_exception(v8_exception).unwrap();
Some(js_error)
}
}
fn check_promise_errors(&self) {
unsafe {
libdeno::deno_check_promise_errors(self.libdeno_isolate);
}
}
fn respond(&mut self) -> Result<(), JSError> {
let buf = deno_buf::empty();
unsafe {
libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
}
if let Some(err) = self.last_exception() {
Err(err)
} else {
Ok(())
}
}
}
struct LockerScope {
libdeno_isolate: *const libdeno::isolate,
}
impl LockerScope {
fn new(isolate: &Isolate) -> LockerScope {
let libdeno_isolate = isolate.libdeno_isolate;
unsafe { libdeno::deno_lock(libdeno_isolate) }
LockerScope { libdeno_isolate }
}
}
impl Drop for LockerScope {
fn drop(&mut self) {
unsafe { libdeno::deno_unlock(self.libdeno_isolate) }
}
}
impl Future for Isolate {
type Item = ();
type Error = JSError;
fn poll(&mut self) -> Poll<(), JSError> {
// Lock the current thread for V8.
let _locker = LockerScope::new(self);
// Clear
self.polled_recently = false;
for (_, pending) in self.pending_ops.iter_mut() {
pending.polled_recently = false;
}
while !self.polled_recently {
let mut complete = HashMap::<i32, AsyncResult>::new();
self.polled_recently = true;
for (promise_id, pending) in self.pending_ops.iter_mut() {
// Do not call poll on futures we've already polled this turn.
if pending.polled_recently {
continue;
}
pending.polled_recently = true;
let promise_id = *promise_id;
let op = &mut pending.op;
match op.poll() {
Err(op_err) => {
eprintln!("op err {:?}", op_err);
complete.insert(promise_id, AsyncResult { result: -1 });
debug!("pending op {} complete err", promise_id);
}
Ok(Async::Ready(async_result)) => {
complete.insert(promise_id, async_result);
debug!("pending op {} complete ready", promise_id);
}
Ok(Async::NotReady) => {
debug!("pending op {} not ready", promise_id);
continue;
}
}
}
self.shared.set_num_records(complete.len() as i32);
if complete.len() > 0 {
// self.zero_copy_release() and self.respond() need Locker.
let mut i = 0;
for (promise_id, async_result) in complete.iter_mut() {
let pending = self.pending_ops.remove(promise_id).unwrap();
if pending.zero_copy_id > 0 {
self.zero_copy_release(pending.zero_copy_id);
}
self
.shared
.set_record(i, RECORD_OFFSET_PROMISE_ID, *promise_id);
self
.shared
.set_record(i, RECORD_OFFSET_RESULT, async_result.result);
i += 1;
}
self.respond()?;
}
}
self.check_promise_errors();
if let Some(err) = self.last_exception() {
return Err(err);
}
// We're idle if pending_ops is empty.
if self.pending_ops.is_empty() {
Ok(futures::Async::Ready(()))
} else {
Ok(futures::Async::NotReady)
}
}
}
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_buf: deno_buf,
zero_copy_buf: deno_buf,
) {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
assert_eq!(control_buf.len(), 0);
(isolate.recv_cb)(isolate, zero_copy_buf);
}
#[cfg(test)]
mod tests {
use super::*;
fn inc_counter(isolate: &mut Isolate, zero_copy_buf: deno_buf) {
assert_eq!(zero_copy_buf.len(), 0);
isolate.test_send_counter += 1; // TODO ideally store this in isolate.state?
}
fn js_check(r: Result<(), JSError>) {
if let Err(e) = r {
panic!(e.to_string());
}
}
#[test]
fn test_execute() {
let isolate = Isolate::new(inc_counter);
js_check(isolate.execute(
"filename.js",
r#"
libdeno.send();
async function main() {
libdeno.send();
}
main();
"#,
));
// We expect that main is executed even tho we didn't poll.
assert_eq!(isolate.test_send_counter, 2);
}
fn async_immediate(isolate: &mut Isolate, zero_copy_buf: deno_buf) {
assert_eq!(zero_copy_buf.len(), 0);
isolate.test_send_counter += 1; // TODO ideally store this in isolate.state?
let promise_id = 0;
let op = Box::new(futures::future::ok(AsyncResult { result: 0 }));
isolate.add_op(promise_id, op, zero_copy_buf.zero_copy_id);
}
#[test]
fn test_poll_async_immediate_ops() {
let mut isolate = Isolate::new(async_immediate);
js_check(isolate.execute(
"setup.js",
r#"
let nrecv = 0;
libdeno.recv(() => {
nrecv++;
});
function assertEq(actual, expected) {
if (expected != actual) {
throw Error(`actual ${actual} expected ${expected} `);
}
}
"#,
));
assert_eq!(isolate.test_send_counter, 0);
js_check(isolate.execute(
"check1.js",
r#"
assertEq(nrecv, 0);
libdeno.send();
assertEq(nrecv, 0);
"#,
));
assert_eq!(isolate.test_send_counter, 1);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
assert_eq!(isolate.test_send_counter, 1);
js_check(isolate.execute(
"check2.js",
r#"
assertEq(nrecv, 1);
libdeno.send();
assertEq(nrecv, 1);
"#,
));
assert_eq!(isolate.test_send_counter, 2);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)"));
assert_eq!(isolate.test_send_counter, 2);
// We are idle, so the next poll should be the last.
assert_eq!(Ok(Async::Ready(())), isolate.poll());
}
}

1
core/libdeno.rs Symbolic link
View file

@ -0,0 +1 @@
../src/libdeno.rs

49
core/shared.rs Normal file
View file

@ -0,0 +1,49 @@
use crate::libdeno::deno_buf;
use std::mem;
// TODO this is where we abstract flatbuffers at.
// TODO make these constants private to this file.
const INDEX_NUM_RECORDS: usize = 0;
const INDEX_RECORDS: usize = 1;
pub const RECORD_OFFSET_PROMISE_ID: usize = 0;
pub const RECORD_OFFSET_OP: usize = 1;
pub const RECORD_OFFSET_ARG: usize = 2;
pub const RECORD_OFFSET_RESULT: usize = 3;
const RECORD_SIZE: usize = 4;
const NUM_RECORDS: usize = 100;
/// Represents the shared buffer between JS and Rust.
/// Used for FFI.
pub struct Shared(Vec<i32>);
impl Shared {
pub fn new() -> Shared {
let mut vec = Vec::<i32>::new();
vec.resize(INDEX_RECORDS + RECORD_SIZE * NUM_RECORDS, 0);
Shared(vec)
}
pub fn set_record(&mut self, i: usize, off: usize, value: i32) {
assert!(i < NUM_RECORDS);
self.0[INDEX_RECORDS + RECORD_SIZE * i + off] = value;
}
pub fn get_record(&self, i: usize, off: usize) -> i32 {
assert!(i < NUM_RECORDS);
return self.0[INDEX_RECORDS + RECORD_SIZE * i + off];
}
pub fn set_num_records(&mut self, num_records: i32) {
self.0[INDEX_NUM_RECORDS] = num_records;
}
pub fn get_num_records(&self) -> i32 {
return self.0[INDEX_NUM_RECORDS];
}
pub fn as_deno_buf(&mut self) -> deno_buf {
let ptr = self.0.as_mut_ptr() as *mut u8;
let len = mem::size_of::<i32>() * self.0.len();
unsafe { deno_buf::from_raw_parts(ptr, len) }
}
}

View file

@ -17,7 +17,7 @@ export function setFireTimersCallback(fn: () => void) {
export function handleAsyncMsgFromRust(ui8: Uint8Array) {
// If a the buffer is empty, recv() on the native side timed out and we
// did not receive a message.
if (ui8.length) {
if (ui8 && ui8.length) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();

View file

@ -78,6 +78,19 @@ deno::DenoIsolate* unwrap(Deno* d_) {
return reinterpret_cast<deno::DenoIsolate*>(d_);
}
void deno_lock(Deno* d_) {
auto* d = unwrap(d_);
CHECK_NULL(d->locker_);
d->locker_ = new v8::Locker(d->isolate_);
}
void deno_unlock(Deno* d_) {
auto* d = unwrap(d_);
CHECK_NOT_NULL(d->locker_);
delete d->locker_;
d->locker_ = nullptr;
}
deno_buf deno_get_snapshot(Deno* d_) {
auto* d = unwrap(d_);
CHECK_NOT_NULL(d->snapshot_creator_);
@ -87,7 +100,7 @@ deno_buf deno_get_snapshot(Deno* d_) {
auto blob = d->snapshot_creator_->CreateBlob(
v8::SnapshotCreator::FunctionCodeHandling::kKeep);
return {nullptr, 0, reinterpret_cast<uint8_t*>(const_cast<char*>(blob.data)),
blob.raw_size};
blob.raw_size, 0};
}
static std::unique_ptr<v8::Platform> platform;
@ -127,12 +140,23 @@ void deno_execute(Deno* d_, void* user_data, const char* js_filename,
deno::Execute(context, js_filename, js_source);
}
void deno_respond(Deno* d_, void* user_data, int32_t req_id, deno_buf buf) {
void deno_zero_copy_release(Deno* d_, size_t zero_copy_id) {
auto* d = unwrap(d_);
v8::Isolate::Scope isolate_scope(d->isolate_);
v8::Locker locker(d->isolate_);
v8::HandleScope handle_scope(d->isolate_);
d->DeleteZeroCopyRef(zero_copy_id);
}
void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
auto* d = unwrap(d_);
if (d->current_args_ != nullptr) {
// Synchronous response.
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
if (buf.data_ptr != nullptr) {
DCHECK_EQ(buf.zero_copy_id, 0);
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
}
d->current_args_ = nullptr;
return;
}
@ -148,8 +172,6 @@ void deno_respond(Deno* d_, void* user_data, int32_t req_id, deno_buf buf) {
v8::TryCatch try_catch(d->isolate_);
deno::DeleteDataRef(d, req_id);
auto recv_ = d->recv_.Get(d->isolate_);
if (recv_.IsEmpty()) {
d->last_exception_ = "libdeno.recv_ has not been called.";
@ -157,8 +179,17 @@ void deno_respond(Deno* d_, void* user_data, int32_t req_id, deno_buf buf) {
}
v8::Local<v8::Value> args[1];
args[0] = deno::ImportBuf(d, buf);
auto v = recv_->Call(context, context->Global(), 1, args);
int argc = 0;
// You cannot use zero_copy_buf with deno_respond(). Use
// deno_zero_copy_release() instead.
DCHECK_EQ(buf.zero_copy_id, 0);
if (buf.data_ptr != nullptr) {
args[0] = deno::ImportBuf(d, buf);
argc = 1;
}
auto v = recv_->Call(context, context->Global(), argc, args);
if (try_catch.HasCaught()) {
CHECK(v.IsEmpty());

View file

@ -44,20 +44,6 @@ v8::StartupData SerializeInternalFields(v8::Local<v8::Object> holder, int index,
return {payload, size};
}
void AddDataRef(DenoIsolate* d, int32_t req_id, v8::Local<v8::Value> data_v) {
d->async_data_map_.emplace(std::piecewise_construct, std::make_tuple(req_id),
std::make_tuple(d->isolate_, data_v));
}
void DeleteDataRef(DenoIsolate* d, int32_t req_id) {
// Delete persistent reference to data ArrayBuffer.
auto it = d->async_data_map_.find(req_id);
if (it != d->async_data_map_.end()) {
it->second.Reset();
d->async_data_map_.erase(it);
}
}
// Extracts a C string from a v8::V8 Utf8Value.
const char* ToCString(const v8::String::Utf8Value& value) {
return *value ? *value : "<string conversion failed>";
@ -131,6 +117,13 @@ void ErrorToJSON(const v8::FunctionCallbackInfo<v8::Value>& args) {
}
v8::Local<v8::Uint8Array> ImportBuf(DenoIsolate* d, deno_buf buf) {
// Do not use ImportBuf with zero_copy buffers.
DCHECK_EQ(buf.zero_copy_id, 0);
if (buf.data_ptr == nullptr) {
return v8::Local<v8::Uint8Array>();
}
if (buf.alloc_ptr == nullptr) {
// If alloc_ptr isn't set, we memcpy.
// This is currently used for flatbuffers created in Rust.
@ -209,42 +202,44 @@ void Send(const v8::FunctionCallbackInfo<v8::Value>& args) {
DenoIsolate* d = DenoIsolate::FromIsolate(isolate);
DCHECK_EQ(d->isolate_, isolate);
v8::Locker locker(d->isolate_);
deno_buf control = {nullptr, 0u, nullptr, 0u, 0u};
deno_buf zero_copy = {nullptr, 0u, nullptr, 0u, 0u};
v8::HandleScope handle_scope(isolate);
CHECK_NULL(d->current_args_); // libdeno.send re-entry forbidden.
int32_t req_id = d->next_req_id_++;
if (args.Length() > 0) {
v8::Local<v8::Value> control_v = args[0];
if (control_v->IsArrayBufferView()) {
control =
GetContents(isolate, v8::Local<v8::ArrayBufferView>::Cast(control_v));
}
}
v8::Local<v8::Value> control_v = args[0];
CHECK(control_v->IsArrayBufferView());
deno_buf control =
GetContents(isolate, v8::Local<v8::ArrayBufferView>::Cast(control_v));
deno_buf data = {nullptr, 0u, nullptr, 0u};
v8::Local<v8::Value> data_v;
v8::Local<v8::Value> zero_copy_v;
if (args.Length() == 2) {
if (args[1]->IsArrayBufferView()) {
data_v = args[1];
data = GetContents(isolate, v8::Local<v8::ArrayBufferView>::Cast(data_v));
zero_copy_v = args[1];
zero_copy = GetContents(
isolate, v8::Local<v8::ArrayBufferView>::Cast(zero_copy_v));
size_t zero_copy_id = d->next_zero_copy_id_++;
DCHECK_GT(zero_copy_id, 0);
zero_copy.zero_copy_id = zero_copy_id;
// If the zero_copy ArrayBuffer was given, we must maintain a strong
// reference to it until deno_zero_copy_release is called.
d->AddZeroCopyRef(zero_copy_id, zero_copy_v);
}
} else {
CHECK_EQ(args.Length(), 1);
}
DCHECK_NULL(d->current_args_);
d->current_args_ = &args;
d->recv_cb_(d->user_data_, req_id, control, data);
d->recv_cb_(d->user_data_, control, zero_copy);
if (d->current_args_ == nullptr) {
// This indicates that deno_repond() was called already.
} else {
// Asynchronous.
d->current_args_ = nullptr;
// If the data ArrayBuffer was given, we must maintain a strong reference
// to it until deno_respond is called.
if (!data_v.IsEmpty()) {
AddDataRef(d, req_id, data_v);
}
}
}

View file

@ -15,6 +15,7 @@ typedef struct {
size_t alloc_len; // Length of the memory allocation.
uint8_t* data_ptr; // Start of logical contents (within the allocation).
size_t data_len; // Length of logical contents.
size_t zero_copy_id; // 0 = normal, 1 = must call deno_zero_copy_release.
} deno_buf;
typedef struct deno_s Deno;
@ -22,8 +23,8 @@ typedef struct deno_s Deno;
// A callback to receive a message from a libdeno.send() javascript call.
// control_buf is valid for only for the lifetime of this callback.
// data_buf is valid until deno_respond() is called.
typedef void (*deno_recv_cb)(void* user_data, int32_t req_id,
deno_buf control_buf, deno_buf data_buf);
typedef void (*deno_recv_cb)(void* user_data, deno_buf control_buf,
deno_buf zerop_copy_buf);
void deno_init();
const char* deno_v8_version();
@ -47,6 +48,9 @@ deno_buf deno_get_snapshot(Deno* d);
void deno_delete(Deno* d);
void deno_lock(Deno* d);
void deno_unlock(Deno* d);
// Compile and execute a traditional JavaScript script that does not use
// module import statements.
// If it succeeded deno_last_exception() will return NULL.
@ -66,11 +70,13 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// longer owns `buf` and must not use it; deno_respond() is responsible for
// releasing its memory.)
//
// Calling this function more than once with the same req_id will result in
// an error.
//
// If a JS exception was encountered, deno_last_exception() will be non-NULL.
void deno_respond(Deno* d, void* user_data, int32_t req_id, deno_buf buf);
void deno_respond(Deno* d, void* user_data, deno_buf buf);
// consumes zero_copy
// Calling this function more than once with the same zero_copy_id will result
// in an error.
void deno_zero_copy_release(Deno* d, size_t zero_copy_id);
void deno_check_promise_errors(Deno* d);

View file

@ -30,12 +30,13 @@ class DenoIsolate {
public:
explicit DenoIsolate(deno_config config)
: isolate_(nullptr),
locker_(nullptr),
shared_(config.shared),
current_args_(nullptr),
snapshot_creator_(nullptr),
global_import_buf_ptr_(nullptr),
recv_cb_(config.recv_cb),
next_req_id_(0),
next_zero_copy_id_(1), // zero_copy_id must not be zero.
user_data_(nullptr),
resolve_cb_(nullptr) {
array_buffer_allocator_ = v8::ArrayBuffer::Allocator::NewDefaultAllocator();
@ -48,6 +49,9 @@ class DenoIsolate {
~DenoIsolate() {
shared_ab_.Reset();
if (locker_) {
delete locker_;
}
if (snapshot_creator_) {
delete snapshot_creator_;
} else {
@ -78,14 +82,31 @@ class DenoIsolate {
}
}
void DeleteZeroCopyRef(size_t zero_copy_id) {
DCHECK_NE(zero_copy_id, 0);
// Delete persistent reference to data ArrayBuffer.
auto it = zero_copy_map_.find(zero_copy_id);
if (it != zero_copy_map_.end()) {
it->second.Reset();
zero_copy_map_.erase(it);
}
}
void AddZeroCopyRef(size_t zero_copy_id, v8::Local<v8::Value> zero_copy_v) {
zero_copy_map_.emplace(std::piecewise_construct,
std::make_tuple(zero_copy_id),
std::make_tuple(isolate_, zero_copy_v));
}
v8::Isolate* isolate_;
v8::Locker* locker_;
v8::ArrayBuffer::Allocator* array_buffer_allocator_;
deno_buf shared_;
const v8::FunctionCallbackInfo<v8::Value>* current_args_;
v8::SnapshotCreator* snapshot_creator_;
void* global_import_buf_ptr_;
deno_recv_cb recv_cb_;
int32_t next_req_id_;
size_t next_zero_copy_id_;
void* user_data_;
v8::Persistent<v8::Object> builtin_modules_;
@ -94,7 +115,7 @@ class DenoIsolate {
deno_resolve_cb resolve_cb_;
v8::Persistent<v8::Context> context_;
std::map<int32_t, v8::Persistent<v8::Value>> async_data_map_;
std::map<size_t, v8::Persistent<v8::Value>> zero_copy_map_;
std::map<int, v8::Persistent<v8::Value>> pending_promise_map_;
std::string last_exception_;
v8::Persistent<v8::Function> recv_;
@ -152,7 +173,7 @@ static intptr_t external_references[] = {
reinterpret_cast<intptr_t>(MessageCallback),
0};
static const deno_buf empty_buf = {nullptr, 0, nullptr, 0};
static const deno_buf empty_buf = {nullptr, 0, nullptr, 0, 0};
Deno* NewFromSnapshot(void* user_data, deno_recv_cb cb);
@ -166,8 +187,6 @@ v8::StartupData SerializeInternalFields(v8::Local<v8::Object> holder, int index,
v8::Local<v8::Uint8Array> ImportBuf(DenoIsolate* d, deno_buf buf);
void DeleteDataRef(DenoIsolate* d, int32_t req_id);
bool Execute(v8::Local<v8::Context> context, const char* js_filename,
const char* js_source);
bool ExecuteMod(v8::Local<v8::Context> context, const char* js_filename,

View file

@ -26,9 +26,11 @@ TEST(LibDenoTest, Snapshotter) {
TEST(LibDenoTest, CanCallFunction) {
Deno* d = deno_new(deno_config{0, snapshot, empty, nullptr});
deno_lock(d);
deno_execute(d, nullptr, "a.js",
"if (CanCallFunction() != 'foo') throw Error();");
EXPECT_EQ(nullptr, deno_last_exception(d));
deno_unlock(d);
deno_delete(d);
}
@ -47,6 +49,7 @@ deno_buf strbuf(const char* str) {
buf.alloc_len = len + 1;
buf.data_ptr = buf.alloc_ptr;
buf.data_len = len;
buf.zero_copy_id = 0;
return buf;
}
@ -71,8 +74,8 @@ void assert_null(deno_buf b) {
TEST(LibDenoTest, RecvReturnEmpty) {
static int count = 0;
auto recv_cb = [](auto _, int req_id, auto buf, auto data_buf) {
assert_null(data_buf);
auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) {
assert_null(zero_copy_buf);
count++;
EXPECT_EQ(static_cast<size_t>(3), buf.data_len);
EXPECT_EQ(buf.data_ptr[0], 'a');
@ -88,15 +91,17 @@ TEST(LibDenoTest, RecvReturnEmpty) {
TEST(LibDenoTest, RecvReturnBar) {
static int count = 0;
auto recv_cb = [](auto user_data, int req_id, auto buf, auto data_buf) {
auto recv_cb = [](auto user_data, auto buf, auto zero_copy_buf) {
auto d = reinterpret_cast<Deno*>(user_data);
assert_null(data_buf);
assert_null(zero_copy_buf);
count++;
EXPECT_EQ(static_cast<size_t>(3), buf.data_len);
EXPECT_EQ(buf.data_ptr[0], 'a');
EXPECT_EQ(buf.data_ptr[1], 'b');
EXPECT_EQ(buf.data_ptr[2], 'c');
deno_respond(d, user_data, req_id, strbuf("bar"));
EXPECT_EQ(zero_copy_buf.zero_copy_id, 0u);
EXPECT_EQ(zero_copy_buf.data_ptr, nullptr);
deno_respond(d, user_data, strbuf("bar"));
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb});
deno_execute(d, d, "a.js", "RecvReturnBar()");
@ -114,9 +119,9 @@ TEST(LibDenoTest, DoubleRecvFails) {
TEST(LibDenoTest, SendRecvSlice) {
static int count = 0;
auto recv_cb = [](auto user_data, int req_id, auto buf, auto data_buf) {
auto recv_cb = [](auto user_data, auto buf, auto zero_copy_buf) {
auto d = reinterpret_cast<Deno*>(user_data);
assert_null(data_buf);
assert_null(zero_copy_buf);
static const size_t alloc_len = 1024;
size_t i = count++;
// Check the size and offset of the slice.
@ -134,12 +139,12 @@ TEST(LibDenoTest, SendRecvSlice) {
memcpy(alloc_ptr, buf.alloc_ptr, alloc_len);
// Make a slice that is a bit shorter than the original.
deno_buf buf2{alloc_ptr, alloc_len, alloc_ptr + data_offset,
buf.data_len - 19};
buf.data_len - 19, 0};
// Place some values into the buffer for the JS side to verify.
buf2.data_ptr[0] = 200 + i;
buf2.data_ptr[buf2.data_len - 1] = 200 - i;
// Send back.
deno_respond(d, user_data, req_id, buf2);
deno_respond(d, user_data, buf2);
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb});
deno_execute(d, d, "a.js", "SendRecvSlice()");
@ -150,8 +155,8 @@ TEST(LibDenoTest, SendRecvSlice) {
TEST(LibDenoTest, JSSendArrayBufferViewTypes) {
static int count = 0;
auto recv_cb = [](auto _, int req_id, auto buf, auto data_buf) {
assert_null(data_buf);
auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) {
assert_null(zero_copy_buf);
count++;
size_t data_offset = buf.data_ptr - buf.alloc_ptr;
EXPECT_EQ(data_offset, 2468u);
@ -197,33 +202,39 @@ TEST(LibDenoTest, GlobalErrorHandling) {
deno_delete(d);
}
TEST(LibDenoTest, DataBuf) {
TEST(LibDenoTest, ZeroCopyBuf) {
static int count = 0;
static deno_buf data_buf_copy;
auto recv_cb = [](auto _, int req_id, deno_buf buf, deno_buf data_buf) {
static deno_buf zero_copy_buf2;
auto recv_cb = [](auto user_data, deno_buf buf, deno_buf zero_copy_buf) {
count++;
data_buf.data_ptr[0] = 4;
data_buf.data_ptr[1] = 2;
data_buf_copy = data_buf;
EXPECT_GT(zero_copy_buf.zero_copy_id, 0u);
zero_copy_buf.data_ptr[0] = 4;
zero_copy_buf.data_ptr[1] = 2;
zero_copy_buf2 = zero_copy_buf;
EXPECT_EQ(2u, buf.data_len);
EXPECT_EQ(2u, data_buf.data_len);
EXPECT_EQ(2u, zero_copy_buf.data_len);
EXPECT_EQ(buf.data_ptr[0], 1);
EXPECT_EQ(buf.data_ptr[1], 2);
// Note zero_copy_buf won't actually be freed here because in
// libdeno_test.js zeroCopyBuf is a rooted global. We just want to exercise
// the API here.
auto d = reinterpret_cast<Deno*>(user_data);
deno_zero_copy_release(d, zero_copy_buf.zero_copy_id);
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb});
deno_execute(d, nullptr, "a.js", "DataBuf()");
deno_execute(d, d, "a.js", "ZeroCopyBuf()");
EXPECT_EQ(nullptr, deno_last_exception(d));
EXPECT_EQ(count, 1);
// data_buf was subsequently changed in JS, let's check that our copy reflects
// that.
EXPECT_EQ(data_buf_copy.data_ptr[0], 9);
EXPECT_EQ(data_buf_copy.data_ptr[1], 8);
// zero_copy_buf was subsequently changed in JS, let's check that our copy
// reflects that.
EXPECT_EQ(zero_copy_buf2.data_ptr[0], 9);
EXPECT_EQ(zero_copy_buf2.data_ptr[1], 8);
deno_delete(d);
}
TEST(LibDenoTest, CheckPromiseErrors) {
static int count = 0;
auto recv_cb = [](auto _, int req_id, auto buf, auto data_buf) { count++; };
auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { count++; };
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb});
EXPECT_EQ(deno_last_exception(d), nullptr);
deno_execute(d, nullptr, "a.js", "CheckPromiseErrors()");
@ -271,7 +282,7 @@ TEST(LibDenoTest, EncodeErrorBug) {
TEST(LibDenoTest, Shared) {
uint8_t s[] = {0, 1, 2};
deno_buf shared = {nullptr, 0, s, 3};
deno_buf shared = {nullptr, 0, s, 3, 0};
Deno* d = deno_new(deno_config{0, snapshot, shared, nullptr});
deno_execute(d, nullptr, "a.js", "Shared()");
EXPECT_EQ(nullptr, deno_last_exception(d));
@ -306,7 +317,7 @@ TEST(LibDenoTest, LibDenoEvalContextError) {
TEST(LibDenoTest, SharedAtomics) {
int32_t s[] = {0, 1, 2};
deno_buf shared = {nullptr, 0, reinterpret_cast<uint8_t*>(s), sizeof s};
deno_buf shared = {nullptr, 0, reinterpret_cast<uint8_t*>(s), sizeof s, 0};
Deno* d = deno_new(deno_config{0, empty, shared, nullptr});
deno_execute(d, nullptr, "a.js",
"Atomics.add(new Int32Array(libdeno.shared), 0, 1)");

View file

@ -103,11 +103,11 @@ global.GlobalErrorHandling = () => {
};
// Allocate this buf at the top level to avoid GC.
const dataBuf = new Uint8Array([3, 4]);
const zeroCopyBuf = new Uint8Array([3, 4]);
global.DataBuf = () => {
global.ZeroCopyBuf = () => {
const a = new Uint8Array([1, 2]);
const b = dataBuf;
const b = zeroCopyBuf;
// The second parameter of send should modified by the
// privileged side.
const r = libdeno.send(a, b);

View file

@ -2,10 +2,12 @@
#include "test.h"
static int exec_count = 0;
void recv_cb(void* user_data, int req_id, deno_buf buf, deno_buf data_buf) {
void recv_cb(void* user_data, deno_buf buf, deno_buf zero_copy_buf) {
// We use this to check that scripts have executed.
EXPECT_EQ(1u, buf.data_len);
EXPECT_EQ(buf.data_ptr[0], 4);
EXPECT_EQ(zero_copy_buf.zero_copy_id, 0u);
EXPECT_EQ(zero_copy_buf.data_ptr, nullptr);
exec_count++;
}

View file

@ -3,7 +3,7 @@
#include <string>
#include "file_util.h"
deno_buf snapshot = {nullptr, 0, nullptr, 0};
deno_buf snapshot = {nullptr, 0, nullptr, 0, 0};
int main(int argc, char** argv) {
// Locate the snapshot.

View file

@ -6,6 +6,6 @@
#include "testing/gtest/include/gtest/gtest.h"
extern deno_buf snapshot; // Loaded in libdeno/test.cc
const deno_buf empty = {nullptr, 0, nullptr, 0};
const deno_buf empty = {nullptr, 0, nullptr, 0, 0};
#endif // TEST_H_

View file

@ -48,15 +48,17 @@ pub type Buf = Box<[u8]>;
pub type Op = dyn Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
pub type Dispatch =
fn(isolate: &Isolate, buf: libdeno::deno_buf, data_buf: libdeno::deno_buf)
-> (bool, Box<Op>);
pub type Dispatch = fn(
isolate: &Isolate,
buf: libdeno::deno_buf,
zero_copy_buf: libdeno::deno_buf,
) -> (bool, Box<Op>);
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
dispatch: Dispatch,
rx: mpsc::Receiver<(i32, Buf)>,
tx: mpsc::Sender<(i32, Buf)>,
rx: mpsc::Receiver<(usize, Buf)>,
tx: mpsc::Sender<(usize, Buf)>,
ntasks: Cell<i32>,
timeout_due: Cell<Option<Instant>>,
pub modules: RefCell<Modules>,
@ -204,7 +206,7 @@ impl Isolate {
};
let libdeno_isolate = unsafe { libdeno::deno_new(config) };
// This channel handles sending async messages back to the runtime.
let (tx, rx) = mpsc::channel::<(i32, Buf)>();
let (tx, rx) = mpsc::channel::<(usize, Buf)>();
Self {
libdeno_isolate,
@ -404,37 +406,39 @@ impl Isolate {
Ok(())
}
pub fn respond(&self, req_id: i32, buf: Buf) {
pub fn respond(&self, zero_copy_id: usize, buf: Buf) {
self.state.metrics_op_completed(buf.len());
// This will be cleaned up in the future.
if zero_copy_id > 0 {
unsafe {
libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id)
}
}
// deno_respond will memcpy the buf into V8's heap,
// so borrowing a reference here is sufficient.
unsafe {
libdeno::deno_respond(
self.libdeno_isolate,
self.as_raw_ptr(),
req_id,
buf.as_ref().into(),
)
}
}
fn complete_op(&self, req_id: i32, buf: Buf) {
fn complete_op(&self, zero_copy_id: usize, buf: Buf) {
// Receiving a message on rx exactly corresponds to an async task
// completing.
self.ntasks_decrement();
// Call into JS with the buf.
self.respond(req_id, buf);
self.respond(zero_copy_id, buf);
}
fn timeout(&self) {
let dummy_buf = libdeno::deno_buf::empty();
unsafe {
libdeno::deno_respond(
self.libdeno_isolate,
self.as_raw_ptr(),
-1,
dummy_buf,
)
libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), dummy_buf)
}
}
@ -450,7 +454,7 @@ impl Isolate {
// Main thread event loop.
while !self.is_idle() {
match recv_deadline(&self.rx, self.get_timeout_due()) {
Ok((req_id, buf)) => self.complete_op(req_id, buf),
Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
Err(e) => panic!("recv_deadline() failed: {:?}", e),
}
@ -532,23 +536,24 @@ extern "C" fn resolve_cb(
// Dereferences the C pointer into the Rust Isolate object.
extern "C" fn pre_dispatch(
user_data: *mut c_void,
req_id: i32,
control_buf: libdeno::deno_buf,
data_buf: libdeno::deno_buf,
zero_copy_buf: libdeno::deno_buf,
) {
// for metrics
let bytes_sent_control = control_buf.len();
let bytes_sent_data = data_buf.len();
let bytes_sent_zero_copy = zero_copy_buf.len();
let zero_copy_id = zero_copy_buf.zero_copy_id;
// We should ensure that there is no other `&mut Isolate` exists.
// And also, it should be in the same thread with other `&Isolate`s.
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
let dispatch = isolate.dispatch;
let (is_sync, op) = dispatch(isolate, control_buf, data_buf);
let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf);
isolate
.state
.metrics_op_dispatched(bytes_sent_control, bytes_sent_data);
.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
if is_sync {
// Execute op synchronously.
@ -560,7 +565,7 @@ extern "C" fn pre_dispatch(
isolate.state.metrics_op_completed(buf.len());
} else {
// Set the synchronous response, the value returned from isolate.send().
isolate.respond(req_id, buf);
isolate.respond(zero_copy_id, buf);
}
} else {
// Execute op asynchronously.
@ -574,7 +579,7 @@ extern "C" fn pre_dispatch(
let task = op
.and_then(move |buf| {
let sender = tx; // tx is moved to new thread
sender.send((req_id, buf)).expect("tx.send error");
sender.send((zero_copy_id, buf)).expect("tx.send error");
Ok(())
}).map_err(|_| ());
tokio::spawn(task);

View file

@ -1,4 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
// TODO Remove. While core is being developed, it may not use the complete
// libdeno API. Thus we allow dead code until things settle.
#![allow(dead_code)]
use libc::c_char;
use libc::c_int;
use libc::c_void;
@ -25,6 +30,7 @@ pub struct deno_buf {
alloc_len: usize,
data_ptr: *const u8,
data_len: usize,
pub zero_copy_id: usize,
}
/// `deno_buf` can not clone, and there is no interior mutability.
@ -39,6 +45,7 @@ impl deno_buf {
alloc_len: 0,
data_ptr: null(),
data_len: 0,
zero_copy_id: 0,
}
}
@ -49,6 +56,7 @@ impl deno_buf {
alloc_len: 0,
data_ptr: ptr,
data_len: len,
zero_copy_id: 0,
}
}
}
@ -62,6 +70,7 @@ impl<'a> From<&'a [u8]> for deno_buf {
alloc_len: 0,
data_ptr: x.as_ref().as_ptr(),
data_len: x.len(),
zero_copy_id: 0,
}
}
}
@ -106,9 +115,8 @@ impl AsMut<[u8]> for deno_buf {
#[allow(non_camel_case_types)]
type deno_recv_cb = unsafe extern "C" fn(
user_data: *mut c_void,
req_id: i32,
buf: deno_buf,
data_buf: deno_buf,
control_buf: deno_buf, // deprecated
zero_copy_buf: deno_buf,
);
#[allow(non_camel_case_types)]
@ -137,12 +145,14 @@ extern "C" {
pub fn deno_delete(i: *const isolate);
pub fn deno_last_exception(i: *const isolate) -> *const c_char;
pub fn deno_check_promise_errors(i: *const isolate);
pub fn deno_lock(i: *const isolate);
pub fn deno_unlock(i: *const isolate);
pub fn deno_respond(
i: *const isolate,
user_data: *const c_void,
req_id: i32,
buf: deno_buf,
);
pub fn deno_zero_copy_release(i: *const isolate, zero_copy_id: usize);
pub fn deno_execute(
i: *const isolate,
user_data: *const c_void,

View file

@ -204,8 +204,10 @@ def main(argv):
# pipe.
if os.name != 'nt':
hyper_hello_path = os.path.join(build_dir, "hyper_hello")
core_http_bench_exe = os.path.join(build_dir, "deno_core_http_bench")
new_data["throughput"] = run_throughput(deno_path)
new_data["req_per_sec"] = http_benchmark(deno_path, hyper_hello_path)
new_data["req_per_sec"] = http_benchmark(deno_path, hyper_hello_path,
core_http_bench_exe)
if "linux" in sys.platform:
# Thread count test, only on linux
new_data["thread_count"] = run_thread_count_benchmark(deno_path)

View file

@ -39,7 +39,7 @@ qrun(
print "prettier"
qrun(["node", prettier, "--write", "--loglevel=error"] + ["rollup.config.js"] +
glob("*.json") + glob("*.md") +
find_exts([".github", "js", "tests", "tools", "website"],
find_exts([".github", "js", "tests", "tools", "website", "core"],
[".js", ".json", ".ts", ".md"],
skip=["tools/clang", "js/deps"]))
@ -47,4 +47,4 @@ print "rustfmt"
qrun([
"third_party/rustfmt/" + platform() +
"/rustfmt", "--config-path", rustfmt_config, "build.rs"
] + find_exts(["src"], [".rs"]))
] + find_exts(["src", "core"], [".rs"]))

View file

@ -30,6 +30,16 @@ def deno_net_http_benchmark(deno_exe):
})
def deno_core_single(exe):
print "http_benchmark testing deno_core_single"
return run([exe, "--single-thread"])
def deno_core_multi(exe):
print "http_benchmark testing deno_core_multi"
return run([exe, "--multi-thread"])
def node_http_benchmark():
node_cmd = ["node", "tools/node_http.js", ADDR.split(":")[1]]
print "http_benchmark testing NODE."
@ -48,11 +58,13 @@ def hyper_http_benchmark(hyper_hello_exe):
return run(hyper_cmd)
def http_benchmark(deno_exe, hyper_hello_exe):
def http_benchmark(deno_exe, hyper_hello_exe, core_http_bench_exe):
r = {}
# TODO Rename to "deno_tcp"
r["deno"] = deno_http_benchmark(deno_exe)
r["deno_net_http"] = deno_net_http_benchmark(deno_exe)
r["deno_core_single"] = deno_core_single(core_http_bench_exe)
r["deno_core_multi"] = deno_core_multi(core_http_bench_exe)
r["node"] = node_http_benchmark()
r["node_tcp"] = node_tcp_benchmark()
r["hyper"] = hyper_http_benchmark(hyper_hello_exe)
@ -68,8 +80,14 @@ def run(server_cmd, merge_env=None):
for key, value in merge_env.iteritems():
env[key] = value
# Wait for port 4544 to become available.
# TODO Need to use SO_REUSEPORT with tokio::net::TcpListener.
time.sleep(5)
server = subprocess.Popen(server_cmd, env=env)
time.sleep(5) # wait for server to wake up. TODO racy.
try:
cmd = "third_party/wrk/%s/wrk -d %s http://%s/" % (util.platform(),
DURATION, ADDR)

View file

@ -21,8 +21,8 @@ run([
run(["node", tslint, "-p", ".", "--exclude", "**/gen/**/*.ts"])
run([
"node", tslint, "./js/**/*_test.ts", "./tests/**/*.ts", "--exclude",
"**/gen/**/*.ts", "--project", "tsconfig.json"
"node", tslint, "./js/**/*_test.ts", "./tests/**/*.ts", "./core/*.js",
"--exclude", "**/gen/**/*.ts", "--project", "tsconfig.json"
])
run([sys.executable, "third_party/depot_tools/pylint.py"] +

25
tools/node_tcp_promise.js Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
// Note: this is a keep-alive server.
const { Server } = require("net");
const port = process.argv[2] || "4544";
console.log("port", port);
const response = Buffer.from(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
);
async function write(socket, buffer) {
let p = new Promise((resolve, reject) => {
socket.write(buffer, resolve);
});
return p;
}
Server(async socket => {
socket.on("error", _ => {
socket.destroy();
});
for await (const data of socket) {
write(socket, response);
}
}).listen(port);

View file

@ -85,6 +85,11 @@ def main(argv):
check_exists(test_rs)
run([test_rs])
deno_core_test = os.path.join(build_dir,
"deno_core_test" + executable_suffix)
check_exists(deno_core_test)
run([deno_core_test])
unit_tests(deno_exe)
prefetch_test(deno_exe)