First pass at setTimeout with Tokio (#434)

This commit is contained in:
Ryan Dahl 2018-08-09 12:17:08 -07:00 committed by GitHub
parent 0e96125260
commit fb87cb38ec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 291 additions and 37 deletions

View file

@ -250,6 +250,7 @@ run_node("gen_declarations") {
"js/deno.ts",
"js/globals.ts",
"js/os.ts",
"js/timers.ts",
"js/tsconfig.generated.json",
"js/util.ts",
]
@ -258,6 +259,7 @@ run_node("gen_declarations") {
out_dir + "/js/deno.d.ts",
out_dir + "/js/globals.d.ts",
out_dir + "/js/os.d.ts",
out_dir + "/js/timers.d.ts",
out_dir + "/js/util.d.ts",
]
deps = [
@ -285,6 +287,7 @@ run_node("bundle") {
"js/os.ts",
"js/plugins.d.ts",
"js/runtime.ts",
"js/timers.ts",
"js/types.d.ts",
"js/util.ts",
"js/v8_source_maps.ts",

View file

@ -11,6 +11,7 @@ import consoleDts from "gen/js/console.d.ts!string";
import denoDts from "gen/js/deno.d.ts!string";
import globalsDts from "gen/js/globals.d.ts!string";
import osDts from "gen/js/os.d.ts!string";
import timersDts from "gen/js/timers.d.ts!string";
import utilDts from "gen/js/util.d.ts!string";
// Static libraries
@ -56,6 +57,7 @@ export const assetSourceCode: { [key: string]: string } = {
"deno.d.ts": denoDts,
"globals.d.ts": globalsDts,
"os.d.ts": osDts,
"timers.d.ts": timersDts,
"util.d.ts": utilDts,
// Static libraries
@ -92,4 +94,7 @@ export const assetSourceCode: { [key: string]: string } = {
// Static definitions
"typescript.d.ts": typescriptDts,
"types.d.ts": typesDts,
// TODO(ry) Remove the following when possible. It's a workaround.
"msg_generated.d.ts": "",
};

View file

@ -2,12 +2,18 @@
import { Console } from "./console";
import { RawSourceMap } from "./types";
import * as timers from "./timers";
declare global {
interface Window {
console: Console;
}
const clearTimeout: typeof timers.clearTimer;
const clearInterval: typeof timers.clearTimer;
const setTimeout: typeof timers.setTimeout;
const setInterval: typeof timers.setInterval;
const console: Console;
const window: Window;
}
@ -37,11 +43,10 @@ window.libdeno = null;
// import "./url";
// import * as timer from "./timers";
// window["setTimeout"] = timer.setTimeout;
// window["setInterval"] = timer.setInterval;
// window["clearTimeout"] = timer.clearTimer;
// window["clearInterval"] = timer.clearTimer;
window.setTimeout = timers.setTimeout;
window.setInterval = timers.setInterval;
window.clearTimeout = timers.clearTimer;
window.clearInterval = timers.clearTimer;
window.console = new Console(libdeno.print);

View file

@ -4,6 +4,7 @@ import { deno as fbs } from "gen/msg_generated";
import { assert, log, assignCmdId } from "./util";
import * as runtime from "./runtime";
import { libdeno } from "./globals";
import * as timers from "./timers";
function startMsg(cmdId: number): Uint8Array {
const builder = new flatbuffers.Builder();
@ -17,8 +18,26 @@ function startMsg(cmdId: number): Uint8Array {
return builder.asUint8Array();
}
function onMessage(ui8: Uint8Array) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = fbs.Base.getRootAsBase(bb);
switch (base.msgType()) {
case fbs.Any.TimerReady: {
const msg = new fbs.TimerReady();
assert(base.msg(msg) != null);
timers.onMessage(msg);
break;
}
default: {
assert(false, "Unhandled message type");
break;
}
}
}
/* tslint:disable-next-line:no-default-export */
export default function denoMain() {
libdeno.recv(onMessage);
runtime.setup();
// First we send an empty "Start" message to let the privlaged side know we

View file

@ -1,7 +1,9 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
import { deno as pb } from "./msg.pb";
import { pubInternal, sub } from "./dispatch";
import { assert } from "./util";
import * as util from "./util";
import { deno as fbs } from "gen/msg_generated";
import { flatbuffers } from "flatbuffers";
import { libdeno } from "./globals";
let nextTimerId = 1;
@ -19,14 +21,9 @@ interface Timer {
const timers = new Map<number, Timer>();
export function initTimers() {
sub("timers", onMessage);
}
function onMessage(payload: Uint8Array) {
const msg = pb.Msg.decode(payload);
assert(msg.command === pb.Msg.Command.TIMER_READY);
const { timerReadyId, timerReadyDone } = msg;
export function onMessage(msg: fbs.TimerReady) {
const timerReadyId = msg.id();
const timerReadyDone = msg.done();
const timer = timers.get(timerReadyId);
if (!timer) {
return;
@ -37,7 +34,7 @@ function onMessage(payload: Uint8Array) {
}
}
function setTimer(
function startTimer(
cb: TimerCallback,
delay: number,
interval: boolean,
@ -52,12 +49,23 @@ function setTimer(
cb
};
timers.set(timer.id, timer);
pubInternal("timers", {
command: pb.Msg.Command.TIMER_START,
timerStartId: timer.id,
timerStartInterval: timer.interval,
timerStartDelay: timer.delay
});
util.log("timers.ts startTimer");
// Send TimerStart message
const builder = new flatbuffers.Builder();
fbs.TimerStart.startTimerStart(builder);
fbs.TimerStart.addId(builder, timer.id);
fbs.TimerStart.addInterval(builder, timer.interval);
fbs.TimerStart.addDelay(builder, timer.delay);
const msg = fbs.TimerStart.endTimerStart(builder);
fbs.Base.startBase(builder);
fbs.Base.addMsg(builder, msg);
fbs.Base.addMsgType(builder, fbs.Any.TimerStart);
builder.finish(fbs.Base.endBase(builder));
const resBuf = libdeno.send(builder.asUint8Array());
assert(resBuf == null);
return timer.id;
}
@ -67,7 +75,7 @@ export function setTimeout(
// tslint:disable-next-line:no-any
...args: any[]
): number {
return setTimer(cb, delay, false, args);
return startTimer(cb, delay, false, args);
}
export function setInterval(
@ -76,13 +84,20 @@ export function setInterval(
// tslint:disable-next-line:no-any
...args: any[]
): number {
return setTimer(cb, delay, true, args);
return startTimer(cb, delay, true, args);
}
export function clearTimer(id: number) {
timers.delete(id);
pubInternal("timers", {
command: pb.Msg.Command.TIMER_CLEAR,
timerClearId: id
});
const builder = new flatbuffers.Builder();
fbs.TimerClear.startTimerClear(builder);
fbs.TimerClear.addId(builder, id);
const msg = fbs.TimerClear.endTimerClear(builder);
fbs.Base.startBase(builder);
fbs.Base.addMsg(builder, msg);
fbs.Base.addMsgType(builder, fbs.Any.TimerClear);
builder.finish(fbs.Base.endBase(builder));
const resBuf = libdeno.send(builder.asUint8Array());
assert(resBuf == null);
}

View file

@ -29,6 +29,7 @@ extern "C" {
pub fn deno_last_exception(d: *const DenoC) -> *const c_char;
pub fn deno_get_data(d: *const DenoC) -> *const c_void;
pub fn deno_set_response(d: *const DenoC, buf: deno_buf);
pub fn deno_send(d: *const DenoC, buf: deno_buf);
pub fn deno_execute(
d: *const DenoC,
js_filename: *const c_char,

View file

@ -10,5 +10,9 @@ void handle_code_fetch(Deno* d, uint32_t cmd_id, const char* module_specifier,
const char* containing_file);
void handle_code_cache(Deno* d, uint32_t cmd_id, const char* filename,
const char* source_code, const char* output_code);
void handle_timer_start(Deno* d, uint32_t cmd_id, uint32_t timer_id,
bool interval, uint32_t delay);
void handle_timer_clear(Deno* d, uint32_t cmd_id, uint32_t timer_id);
} // extern "C"
#endif // HANDLERS_H_

View file

@ -1,7 +1,10 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use binding;
use binding::{deno_buf, deno_set_response, DenoC};
use flatbuffers;
use from_c;
use futures;
use futures::sync::oneshot;
use libc::c_char;
use msg_generated::deno as msg;
use std::ffi::CStr;
@ -24,6 +27,7 @@ pub fn deno_handle_msg_from_js(d: *const DenoC, buf: deno_buf) {
}
*/
// TODO(ry) Use Deno instead of DenoC as first arg.
fn reply_error(d: *const DenoC, cmd_id: u32, msg: &String) {
let mut builder = flatbuffers::FlatBufferBuilder::new();
// println!("reply_error{}", msg);
@ -35,16 +39,14 @@ fn reply_error(d: *const DenoC, cmd_id: u32, msg: &String) {
set_response_base(d, &mut builder, &args)
}
fn set_response_base(
d: *const DenoC,
fn create_msg(
builder: &mut flatbuffers::FlatBufferBuilder,
args: &msg::BaseArgs,
) {
) -> deno_buf {
let base = msg::CreateBase(builder, &args);
builder.finish(base);
let data = builder.get_active_buf_slice();
// println!("buf slice {} {} {} {} {}", data[0], data[1], data[2], data[3], data[4]);
let buf = deno_buf {
deno_buf {
// TODO(ry)
// The deno_buf / ImportBuf / ExportBuf semantics should be such that we do not need to yield
// ownership. Temporarally there is a hack in ImportBuf that when alloc_ptr is null, it will
@ -53,12 +55,29 @@ fn set_response_base(
alloc_len: 0,
data_ptr: data.as_ptr() as *mut u8,
data_len: data.len(),
};
// println!("data_ptr {:p}", data_ptr);
// println!("data_len {}", data.len());
}
}
// TODO(ry) Use Deno instead of DenoC as first arg.
fn set_response_base(
d: *const DenoC,
builder: &mut flatbuffers::FlatBufferBuilder,
args: &msg::BaseArgs,
) {
let buf = create_msg(builder, args);
unsafe { deno_set_response(d, buf) }
}
// TODO(ry) Use Deno instead of DenoC as first arg.
fn send_base(
d: *const DenoC,
builder: &mut flatbuffers::FlatBufferBuilder,
args: &msg::BaseArgs,
) {
let buf = create_msg(builder, args);
unsafe { binding::deno_send(d, buf) }
}
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
#[no_mangle]
pub extern "C" fn handle_code_fetch(
@ -131,3 +150,136 @@ pub extern "C" fn handle_code_cache(
}
// null response indicates success.
}
fn set_timeout<F>(
cb: F,
delay: u32,
) -> (
impl Future<Item = (), Error = ()>,
futures::sync::oneshot::Sender<()>,
)
where
F: FnOnce() -> (),
{
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
let when = Instant::now() + Duration::from_millis(delay.into());
let delay_task = Delay::new(when)
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(|_| {
cb();
Ok(())
})
.select(cancel_rx)
.map(|_| ())
.map_err(|_| ());
(delay_task, cancel_tx)
}
fn set_interval<F>(
cb: F,
delay: u32,
) -> (
impl Future<Item = (), Error = ()>,
futures::sync::oneshot::Sender<()>,
)
where
F: Fn() -> (),
{
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
let delay = Duration::from_millis(delay.into());
let interval_task = future::lazy(move || {
Interval::new(Instant::now() + delay, delay)
.for_each(move |_| {
cb();
future::ok(())
})
.into_future()
.map_err(|_| panic!())
}).select(cancel_rx)
.map(|_| ())
.map_err(|_| ());
(interval_task, cancel_tx)
}
// TODO(ry) Use Deno instead of DenoC as first arg.
fn send_timer_ready(d: *const DenoC, timer_id: u32, done: bool) {
let mut builder = flatbuffers::FlatBufferBuilder::new();
let msg = msg::CreateTimerReady(
&mut builder,
&msg::TimerReadyArgs {
id: timer_id,
done,
..Default::default()
},
);
builder.finish(msg);
send_base(
d,
&mut builder,
&msg::BaseArgs {
msg: Some(msg.union()),
msg_type: msg::Any::TimerReady,
..Default::default()
},
);
}
// TODO(ry) Use Deno instead of DenoC as first arg.
fn remove_timer(d: *const DenoC, timer_id: u32) {
let deno = from_c(d);
deno.timers.remove(&timer_id);
}
use std::time::{Duration, Instant};
use tokio::prelude::*;
use tokio::timer::{Delay, Interval};
// Prototype: https://github.com/ry/deno/blob/golang/timers.go#L25-L39
#[no_mangle]
pub extern "C" fn handle_timer_start(
d: *const DenoC,
cmd_id: u32,
timer_id: u32,
interval: bool,
delay: u32,
) {
assert!(cmd_id == 0);
debug!("handle_timer_start");
let deno = from_c(d);
if interval {
let (interval_task, cancel_interval) = set_interval(
move || {
send_timer_ready(d, timer_id, false);
},
delay,
);
deno.timers.insert(timer_id, cancel_interval);
deno.rt.spawn(interval_task);
} else {
let (delay_task, cancel_delay) = set_timeout(
move || {
remove_timer(d, timer_id);
send_timer_ready(d, timer_id, true);
},
delay,
);
deno.timers.insert(timer_id, cancel_delay);
deno.rt.spawn(delay_task);
}
}
// Prototype: https://github.com/ry/deno/blob/golang/timers.go#L40-L43
#[no_mangle]
pub extern "C" fn handle_timer_clear(
d: *const DenoC,
cmd_id: u32,
timer_id: u32,
) {
assert!(cmd_id == 0);
debug!("handle_timer_clear");
remove_timer(d, timer_id);
}

View file

@ -1,8 +1,11 @@
extern crate flatbuffers;
extern crate futures;
extern crate libc;
extern crate msg_rs as msg_generated;
extern crate sha1;
extern crate tempfile;
extern crate tokio;
extern crate tokio_current_thread;
extern crate url;
#[macro_use]
extern crate log;
@ -14,6 +17,7 @@ pub mod handlers;
use libc::c_int;
use libc::c_void;
use std::collections::HashMap;
use std::env;
use std::ffi::CStr;
use std::ffi::CString;
@ -91,6 +95,8 @@ type DenoException<'a> = &'a str;
pub struct Deno {
ptr: *const binding::DenoC,
dir: deno_dir::DenoDir,
rt: tokio::runtime::current_thread::Runtime,
timers: HashMap<u32, futures::sync::oneshot::Sender<()>>,
}
static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT;
@ -104,6 +110,8 @@ impl Deno {
let deno_box = Box::new(Deno {
ptr: 0 as *const binding::DenoC,
dir: deno_dir::DenoDir::new(None).unwrap(),
rt: tokio::runtime::current_thread::Runtime::new().unwrap(),
timers: HashMap::new(),
});
let deno: &'a mut Deno = Box::leak(deno_box);
let external_ptr = deno as *mut _ as *const c_void;
@ -204,4 +212,7 @@ fn main() {
error!("{}", err);
std::process::exit(1);
});
// Start the Tokio event loop
d.rt.run().expect("err");
}

View file

@ -61,7 +61,7 @@ table Exit {
table TimerStart {
id: uint;
interval: bool;
delay: int;
delay: uint;
}
table TimerReady {

View file

@ -71,6 +71,23 @@ void deno_handle_msg_from_js(Deno* d, deno_buf buf) {
break;
}
case deno::Any_TimerStart: {
auto msg = base->msg_as_TimerStart();
handle_timer_start(d, cmd_id, msg->id(), msg->interval(), msg->delay());
break;
}
case deno::Any_TimerReady: {
CHECK(false && "Privileged side should not receive TimerReady message.");
break;
}
case deno::Any_TimerClear: {
auto msg = base->msg_as_TimerClear();
handle_timer_clear(d, cmd_id, msg->id());
break;
}
case deno::Any_Exit: {
auto msg = base->msg_as_Exit();
uint32_t code = msg->code();

11
tests/004_set_timeout.ts Normal file
View file

@ -0,0 +1,11 @@
setTimeout(() => {
console.log("World");
}, 10);
console.log("Hello");
const id = setTimeout(() => {
console.log("Not printed");
}, 10000);
clearTimeout(id);

View file

@ -0,0 +1,2 @@
Hello
World

View file

@ -0,0 +1,7 @@
const id = setInterval(function() {
console.log("test")
}, 200);
setTimeout(function() {
clearInterval(id)
}, 500)

View file

@ -0,0 +1,2 @@
test
test