refactor: split worker and worker host logic (#3722)

* split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs

* refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime

* BREAKING CHANGE: remove support for blob: URL in Worker

* BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor

* introduce WebWorker struct which is a stripped down version of cli::Worker
This commit is contained in:
Bartek Iwańczuk 2020-01-21 09:49:47 +01:00 committed by GitHub
parent c90036ab88
commit 7966bf14c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 439 additions and 263 deletions

View file

@ -231,7 +231,7 @@ impl TsCompiler {
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, None, true, int)
ThreadSafeState::new(global_state.clone(), None, None, int)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.

View file

@ -45,7 +45,7 @@ impl WasmCompiler {
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, None, true, int)
ThreadSafeState::new(global_state.clone(), None, None, int)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.

View file

@ -32,7 +32,7 @@ import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts";
import * as os from "./os.ts";
import { assert } from "./util.ts";
import * as util from "./util.ts";
import { postMessage, workerClose, workerMain } from "./workers.ts";
import { postMessage, workerClose, workerMain } from "./worker_main.ts";
const self = globalThis;

View file

@ -21,6 +21,7 @@ import * as textEncoding from "./text_encoding.ts";
import * as timers from "./timers.ts";
import * as url from "./url.ts";
import * as urlSearchParams from "./url_search_params.ts";
import * as workerRuntime from "./worker_main.ts";
import * as workers from "./workers.ts";
import * as performanceUtil from "./performance.ts";
import * as request from "./request.ts";
@ -194,12 +195,12 @@ const globalProperties = {
Response: nonEnumerable(fetchTypes.Response),
performance: writable(new performanceUtil.Performance()),
onmessage: writable(workers.onmessage),
onerror: writable(workers.onerror),
onmessage: writable(workerRuntime.onmessage),
onerror: writable(workerRuntime.onerror),
workerMain: nonEnumerable(workers.workerMain),
workerClose: nonEnumerable(workers.workerClose),
postMessage: writable(workers.postMessage),
workerMain: nonEnumerable(workerRuntime.workerMain),
workerClose: nonEnumerable(workerRuntime.workerClose),
postMessage: writable(workerRuntime.postMessage),
Worker: nonEnumerable(workers.WorkerImpl),
[domTypes.eventTargetHost]: nonEnumerable(null),

View file

@ -2128,9 +2128,9 @@ declare interface Window {
performance: __performanceUtil.Performance;
onmessage: (e: { data: any }) => void;
onerror: undefined | typeof onerror;
workerMain: typeof __workers.workerMain;
workerClose: typeof __workers.workerClose;
postMessage: typeof __workers.postMessage;
workerMain: typeof __workerMain.workerMain;
workerClose: typeof __workerMain.workerClose;
postMessage: typeof __workerMain.postMessage;
Worker: typeof __workers.WorkerImpl;
addEventListener: (
type: string,
@ -2187,9 +2187,9 @@ declare let onerror:
e: Event
) => boolean | void)
| undefined;
declare const workerMain: typeof __workers.workerMain;
declare const workerClose: typeof __workers.workerClose;
declare const postMessage: typeof __workers.postMessage;
declare const workerMain: typeof __workerMain.workerMain;
declare const workerClose: typeof __workerMain.workerClose;
declare const postMessage: typeof __workerMain.postMessage;
declare const Worker: typeof __workers.WorkerImpl;
declare const addEventListener: (
type: string,
@ -3437,31 +3437,25 @@ declare namespace __url {
};
}
declare namespace __workers {
// @url js/workers.d.ts
export function encodeMessage(data: any): Uint8Array;
export function decodeMessage(dataIntArray: Uint8Array): any;
declare namespace __workerMain {
export let onmessage: (e: { data: any }) => void;
export function postMessage(data: any): void;
export function getMessage(): Promise<any>;
export let isClosing: boolean;
export function workerClose(): void;
export function workerMain(): Promise<void>;
}
declare namespace __workers {
// @url js/workers.d.ts
export interface Worker {
onerror?: (e: Event) => void;
onmessage?: (e: { data: any }) => void;
onmessageerror?: () => void;
postMessage(data: any): void;
closed: Promise<void>;
}
export interface WorkerOptions {}
/** Extended Deno Worker initialization options.
* `noDenoNamespace` hides global `window.Deno` namespace for
* spawned worker and nested workers spawned by it (default: false).
*/
export interface DenoWorkerOptions extends WorkerOptions {
noDenoNamespace?: boolean;
export interface WorkerOptions {
type?: "classic" | "module";
}
export class WorkerImpl implements Worker {
private readonly id;
@ -3470,8 +3464,7 @@ declare namespace __workers {
onerror?: (e: Event) => void;
onmessage?: (data: any) => void;
onmessageerror?: () => void;
constructor(specifier: string, options?: DenoWorkerOptions);
readonly closed: Promise<void>;
constructor(specifier: string, options?: WorkerOptions);
postMessage(data: any): void;
private run;
}

98
cli/js/worker_main.ts Normal file
View file

@ -0,0 +1,98 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
/* eslint-disable @typescript-eslint/no-explicit-any */
import { core } from "./core.ts";
import * as dispatch from "./dispatch.ts";
import { sendAsync, sendSync } from "./dispatch_json.ts";
import { log } from "./util.ts";
import { TextDecoder, TextEncoder } from "./text_encoding.ts";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
function encodeMessage(data: any): Uint8Array {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}
function decodeMessage(dataIntArray: Uint8Array): any {
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}
// Stuff for workers
export const onmessage: (e: { data: any }) => void = (): void => {};
export const onerror: (e: { data: any }) => void = (): void => {};
export function postMessage(data: any): void {
const dataIntArray = encodeMessage(data);
sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
}
export async function getMessage(): Promise<any> {
log("getMessage");
const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
if (res.data != null) {
return decodeMessage(new Uint8Array(res.data));
} else {
return null;
}
}
export let isClosing = false;
export function workerClose(): void {
isClosing = true;
}
export async function workerMain(): Promise<void> {
const ops = core.ops();
// TODO(bartlomieju): this is a prototype, we should come up with
// something a bit more sophisticated
for (const [name, opId] of Object.entries(ops)) {
const opName = `OP_${name.toUpperCase()}`;
// Assign op ids to actual variables
// TODO(ry) This type casting is gross and should be fixed.
((dispatch as unknown) as { [key: string]: number })[opName] = opId;
core.setAsyncHandler(opId, dispatch.getAsyncHandler(opName));
}
log("workerMain");
while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
let result: void | Promise<void>;
const event = { data };
try {
if (!globalThis["onmessage"]) {
break;
}
result = globalThis.onmessage!(event);
if (result && "then" in result) {
await result;
}
if (!globalThis["onmessage"]) {
break;
}
} catch (e) {
if (globalThis["onerror"]) {
const result = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e
);
if (result === true) {
continue;
}
}
throw e;
}
}
}

View file

@ -2,35 +2,35 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as dispatch from "./dispatch.ts";
import { sendAsync, sendSync } from "./dispatch_json.ts";
import { log, createResolvable, Resolvable } from "./util.ts";
import { log } from "./util.ts";
import { TextDecoder, TextEncoder } from "./text_encoding.ts";
/*
import { blobURLMap } from "./url.ts";
import { blobBytesWeakMap } from "./blob.ts";
*/
import { Event } from "./event.ts";
import { EventTarget } from "./event_target.ts";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
export function encodeMessage(data: any): Uint8Array {
function encodeMessage(data: any): Uint8Array {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}
export function decodeMessage(dataIntArray: Uint8Array): any {
function decodeMessage(dataIntArray: Uint8Array): any {
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}
function createWorker(
specifier: string,
includeDenoNamespace: boolean,
hasSourceCode: boolean,
sourceCode: Uint8Array
): { id: number; loaded: boolean } {
return sendSync(dispatch.OP_CREATE_WORKER, {
specifier,
includeDenoNamespace,
hasSourceCode,
sourceCode: new TextDecoder().decode(sourceCode)
});
@ -67,92 +67,15 @@ async function hostGetMessage(id: number): Promise<any> {
}
}
// Stuff for workers
export const onmessage: (e: { data: any }) => void = (): void => {};
export const onerror: (e: { data: any }) => void = (): void => {};
export function postMessage(data: any): void {
const dataIntArray = encodeMessage(data);
sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
}
export async function getMessage(): Promise<any> {
log("getMessage");
const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
if (res.data != null) {
return decodeMessage(new Uint8Array(res.data));
} else {
return null;
}
}
export let isClosing = false;
export function workerClose(): void {
isClosing = true;
}
export async function workerMain(): Promise<void> {
log("workerMain");
while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
let result: void | Promise<void>;
const event = { data };
try {
if (!globalThis["onmessage"]) {
break;
}
result = globalThis.onmessage!(event);
if (result && "then" in result) {
await result;
}
if (!globalThis["onmessage"]) {
break;
}
} catch (e) {
if (globalThis["onerror"]) {
const result = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e
);
if (result === true) {
continue;
}
}
throw e;
}
}
}
export interface Worker {
onerror?: (e: any) => void;
onmessage?: (e: { data: any }) => void;
onmessageerror?: () => void;
postMessage(data: any): void;
// TODO(bartlomieju): remove this
closed: Promise<void>;
}
// TODO(kevinkassimo): Maybe implement reasonable web worker options?
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface WorkerOptions {}
/** Extended Deno Worker initialization options.
* `noDenoNamespace` hides global `globalThis.Deno` namespace for
* spawned worker and nested workers spawned by it (default: false).
*/
export interface DenoWorkerOptions extends WorkerOptions {
noDenoNamespace?: boolean;
export interface WorkerOptions {
type?: "classic" | "module";
}
export class WorkerImpl extends EventTarget implements Worker {
@ -160,20 +83,29 @@ export class WorkerImpl extends EventTarget implements Worker {
private isClosing = false;
private messageBuffer: any[] = [];
private ready = false;
private readonly isClosedPromise: Resolvable<void>;
public onerror?: (e: any) => void;
public onmessage?: (data: any) => void;
public onmessageerror?: () => void;
constructor(specifier: string, options?: DenoWorkerOptions) {
constructor(specifier: string, options?: WorkerOptions) {
super();
let hasSourceCode = false;
let sourceCode = new Uint8Array();
let includeDenoNamespace = true;
if (options && options.noDenoNamespace) {
includeDenoNamespace = false;
let type = "classic";
if (options?.type) {
type = options.type;
}
if (type !== "module") {
throw new Error(
'Not yet implemented: only "module" type workers are supported'
);
}
const hasSourceCode = false;
const sourceCode = new Uint8Array();
/* TODO(bartlomieju):
// Handle blob URL.
if (specifier.startsWith("blob:")) {
hasSourceCode = true;
@ -187,23 +119,14 @@ export class WorkerImpl extends EventTarget implements Worker {
}
sourceCode = blobBytes!;
}
*/
const { id, loaded } = createWorker(
specifier,
includeDenoNamespace,
hasSourceCode,
sourceCode
);
const { id, loaded } = createWorker(specifier, hasSourceCode, sourceCode);
this.id = id;
this.ready = loaded;
this.isClosedPromise = createResolvable();
this.poll();
}
get closed(): Promise<void> {
return this.isClosedPromise;
}
private handleError(e: any): boolean {
// TODO: this is being handled in a type unsafe way, it should be type safe
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -259,7 +182,6 @@ export class WorkerImpl extends EventTarget implements Worker {
} else {
this.isClosing = true;
hostCloseWorker(this.id);
this.isClosedPromise.resolve();
break;
}
}

View file

@ -51,6 +51,7 @@ pub mod state;
pub mod test_util;
mod tokio_util;
pub mod version;
mod web_worker;
pub mod worker;
use crate::deno_error::js_check;
@ -120,7 +121,6 @@ fn create_worker_and_state(
global_state.clone(),
None,
global_state.main_module.clone(),
true,
int,
)
.map_err(deno_error::print_err_and_exit)
@ -346,16 +346,15 @@ fn bundle_command(flags: DenoFlags) {
fn run_repl(flags: DenoFlags) {
let (mut worker, _state) = create_worker_and_state(flags);
// Make repl continue to function under uncaught async errors.
worker.set_error_handler(Box::new(|err| {
eprintln!("{}", err.to_string());
Ok(())
}));
// Setup runtime.
js_check(worker.execute("denoMain()"));
let main_future = async move {
let result = worker.await;
js_check(result);
loop {
let result = worker.clone().await;
if let Err(err) = result {
eprintln!("{}", err.to_string());
worker.clear_exception();
}
}
};
tokio_util::run(main_future);
}

View file

@ -23,4 +23,5 @@ pub mod repl;
pub mod resources;
pub mod timers;
pub mod tls;
pub mod workers;
pub mod web_worker;
pub mod worker_host;

77
cli/ops/web_worker.rs Normal file
View file

@ -0,0 +1,77 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno_core::*;
use futures;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std;
use std::convert::From;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op(
"worker_post_message",
s.core_op(json_op(s.stateful_op(op_worker_post_message))),
);
i.register_op(
"worker_get_message",
s.core_op(json_op(s.stateful_op(op_worker_get_message))),
);
}
struct GetMessageFuture {
state: ThreadSafeState,
}
impl Future for GetMessageFuture {
type Output = Option<Buf>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut channels = inner.state.worker_channels.lock().unwrap();
let receiver = &mut channels.receiver;
receiver.poll_next_unpin(cx)
}
}
/// Get message from host as guest worker
fn op_worker_get_message(
state: &ThreadSafeState,
_args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let op = GetMessageFuture {
state: state.clone(),
};
let op = async move {
let maybe_buf = op.await;
debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf }))
};
Ok(JsonOp::Async(op.boxed()))
}
/// Post message to host as guest worker
fn op_worker_post_message(
state: &ThreadSafeState,
_args: Value,
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let mut channels = state.worker_channels.lock().unwrap();
let sender = &mut channels.sender;
futures::executor::block_on(sender.send(d))
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}

View file

@ -9,7 +9,7 @@ use crate::fmt_errors::JSError;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use crate::web_worker::WebWorker;
use deno_core::*;
use futures;
use futures::channel::mpsc;
@ -54,15 +54,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
"host_get_message",
s.core_op(json_op(s.stateful_op(op_host_get_message))),
);
// TODO: make sure these two ops are only accessible to appropriate Worker
i.register_op(
"worker_post_message",
s.core_op(json_op(s.stateful_op(op_worker_post_message))),
);
i.register_op(
"worker_get_message",
s.core_op(json_op(s.stateful_op(op_worker_get_message))),
);
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
@ -81,45 +72,10 @@ impl Future for GetMessageFuture {
}
}
/// Get message from host as guest worker
fn op_worker_get_message(
state: &ThreadSafeState,
_args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let op = GetMessageFuture {
state: state.clone(),
};
let op = async move {
let maybe_buf = op.await;
debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf }))
};
Ok(JsonOp::Async(op.boxed()))
}
/// Post message to host as guest worker
fn op_worker_post_message(
state: &ThreadSafeState,
_args: Value,
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let mut channels = state.worker_channels.lock().unwrap();
let sender = &mut channels.sender;
futures::executor::block_on(sender.send(d))
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateWorkerArgs {
specifier: String,
include_deno_namespace: bool,
has_source_code: bool,
source_code: String,
}
@ -133,10 +89,6 @@ fn op_create_worker(
let args: CreateWorkerArgs = serde_json::from_value(args)?;
let specifier = args.specifier.as_ref();
// Only include deno namespace if requested AND current worker
// has included namespace (to avoid escalation).
let include_deno_namespace =
args.include_deno_namespace && state.include_deno_namespace;
let has_source_code = args.has_source_code;
let source_code = args.source_code;
@ -156,16 +108,13 @@ fn op_create_worker(
state.global_state.clone(),
Some(parent_state.permissions.clone()), // by default share with parent
Some(module_specifier.clone()),
include_deno_namespace,
int,
)?;
// TODO: add a new option to make child worker not sharing permissions
// with parent (aka .clone(), requests from child won't reflect in parent)
let name = format!("USER-WORKER-{}", specifier);
let deno_main_call = format!("denoMain({})", include_deno_namespace);
let mut worker =
Worker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute(&deno_main_call));
WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute("workerMain()"));
let worker_id = parent_state.add_child_worker(worker.clone());

View file

@ -7,7 +7,7 @@ use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::Worker;
use crate::web_worker::WebWorker;
use crate::worker::WorkerChannels;
use deno_core::Buf;
use deno_core::CoreOp;
@ -44,7 +44,6 @@ pub struct ThreadSafeState(Arc<State>);
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct State {
pub global_state: ThreadSafeGlobalState,
pub modules: Arc<Mutex<deno_core::Modules>>,
pub permissions: Arc<Mutex<DenoPermissions>>,
pub main_module: Option<ModuleSpecifier>,
pub worker_channels: Mutex<WorkerChannels>,
@ -53,12 +52,11 @@ pub struct State {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<HashMap<u32, Worker>>,
pub workers: Mutex<HashMap<u32, WebWorker>>,
pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
pub resource_table: Mutex<ResourceTable>,
}
@ -219,7 +217,6 @@ impl ThreadSafeState {
// If Some(perm), use perm. Else copy from global_state.
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>,
main_module: Option<ModuleSpecifier>,
include_deno_namespace: bool,
internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> =
@ -233,7 +230,6 @@ impl ThreadSafeState {
None => None,
};
let modules = Arc::new(Mutex::new(deno_core::Modules::new()));
let permissions = if let Some(perm) = shared_permissions {
perm
} else {
@ -242,7 +238,6 @@ impl ThreadSafeState {
let state = State {
global_state,
modules,
main_module,
permissions,
import_map,
@ -254,14 +249,14 @@ impl ThreadSafeState {
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
seeded_rng,
include_deno_namespace,
resource_table: Mutex::new(ResourceTable::default()),
};
Ok(ThreadSafeState(Arc::new(state)))
}
pub fn add_child_worker(&self, worker: Worker) -> u32 {
pub fn add_child_worker(&self, worker: WebWorker) -> u32 {
let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
let mut workers_tl = self.workers.lock().unwrap();
workers_tl.insert(worker_id, worker);
@ -344,7 +339,6 @@ impl ThreadSafeState {
ThreadSafeGlobalState::mock(argv),
None,
module_specifier,
true,
internal_channels,
)
.unwrap()

View file

@ -1,5 +1,5 @@
const jsWorker = new Worker("./subdir/test_worker.js");
const tsWorker = new Worker("./subdir/test_worker.ts");
const jsWorker = new Worker("./subdir/test_worker.js", { type: "module" });
const tsWorker = new Worker("./subdir/test_worker.ts", { type: "module" });
tsWorker.onmessage = (e): void => {
console.log("Received ts: " + e.data);

View file

@ -1,7 +1,5 @@
const w1 = new Worker("./039_worker_deno_ns/has_ns.ts");
const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", {
noDenoNamespace: true
});
const w1 = new Worker("./039_worker_deno_ns/has_ns.ts", { type: "module" });
const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", { type: "module" });
let w1MsgCount = 0;
let w2MsgCount = 0;
w1.onmessage = (msg): void => {

View file

@ -301,6 +301,7 @@ itest!(_038_checkjs {
output: "038_checkjs.js.out",
});
/* TODO(bartlomieju):
itest!(_039_worker_deno_ns {
args: "run --reload 039_worker_deno_ns.ts",
output: "039_worker_deno_ns.ts.out",
@ -310,6 +311,7 @@ itest!(_040_worker_blob {
args: "run --reload 040_worker_blob.ts",
output: "040_worker_blob.ts.out",
});
*/
itest!(_041_dyn_import_eval {
args: "eval import('./subdir/mod4.js').then(console.log)",
@ -567,12 +569,14 @@ itest!(error_type_definitions {
output: "error_type_definitions.ts.out",
});
/* TODO(bartlomieju)
itest!(error_worker_dynamic {
args: "run --reload error_worker_dynamic.ts",
check_stderr: true,
exit_code: 1,
output: "error_worker_dynamic.ts.out",
});
*/
itest!(exit_error42 {
exit_code: 42,

View file

@ -14,6 +14,7 @@ onmessage = function(e): void {
postMessage({ cmdId });
break;
case 3: // Close
postMessage({ cmdId: 3 });
workerClose();
break;
}

View file

@ -37,12 +37,11 @@ function handleAsyncMsgFromWorker(
async function main(): Promise<void> {
const workers: Array<[Map<number, Resolvable<string>>, Worker]> = [];
for (let i = 1; i <= workerCount; ++i) {
const worker = new Worker("./subdir/bench_worker.ts");
const promise = new Promise((resolve): void => {
worker.onmessage = (e): void => {
if (e.data.cmdId === 0) resolve();
};
});
const worker = new Worker("./subdir/bench_worker.ts", { type: "module" });
const promise = createResolvable<void>();
worker.onmessage = (e): void => {
if (e.data.cmdId === 0) promise.resolve();
};
worker.postMessage({ cmdId: 0, action: 2 });
await promise;
workers.push([new Map(), worker]);
@ -66,8 +65,12 @@ async function main(): Promise<void> {
}
}
for (const [, worker] of workers) {
const promise = createResolvable<void>();
worker.onmessage = (e): void => {
if (e.data.cmdId === 3) promise.resolve();
};
worker.postMessage({ action: 3 });
await worker.closed; // Required to avoid a cmdId not in table error.
await promise;
}
console.log("Finished!");
}

View file

@ -4,7 +4,7 @@ const workerCount = 50;
async function bench(): Promise<void> {
const workers: Worker[] = [];
for (let i = 1; i <= workerCount; ++i) {
const worker = new Worker("./subdir/bench_worker.ts");
const worker = new Worker("./subdir/bench_worker.ts", { type: "module" });
const promise = new Promise((resolve): void => {
worker.onmessage = (e): void => {
if (e.data.cmdId === 0) resolve();
@ -16,8 +16,13 @@ async function bench(): Promise<void> {
}
console.log("Done creating workers closing workers!");
for (const worker of workers) {
const promise = new Promise((resolve): void => {
worker.onmessage = (e): void => {
if (e.data.cmdId === 3) resolve();
};
});
worker.postMessage({ action: 3 });
await worker.closed; // Required to avoid a cmdId not in table error.
await promise;
}
console.log("Finished!");
}

145
cli/web_worker.rs Normal file
View file

@ -0,0 +1,145 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::fmt_errors::JSError;
use crate::ops;
use crate::state::ThreadSafeState;
use crate::worker::WorkerChannels;
use crate::worker::WorkerReceiver;
use deno_core;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
use deno_core::StartupData;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::sink::SinkExt;
use futures::task::AtomicWaker;
use std::env;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
use url::Url;
#[derive(Clone)]
pub struct WebWorker {
pub name: String,
pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
pub state: ThreadSafeState,
external_channels: Arc<Mutex<WorkerChannels>>,
}
impl WebWorker {
pub fn new(
name: String,
startup_data: StartupData,
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
ops::web_worker::init(&mut isolate, &state);
ops::worker_host::init(&mut isolate, &state);
let global_state_ = state.global_state.clone();
isolate.set_js_error_create(move |v8_exception| {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
});
Self {
name,
isolate: Arc::new(AsyncMutex::new(isolate)),
state,
external_channels: Arc::new(Mutex::new(external_channels)),
}
}
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
let path = env::current_dir().unwrap().join("__anonymous__");
let url = Url::from_file_path(path).unwrap();
self.execute2(url.as_str(), js_source)
}
/// Executes the provided JavaScript source code. The js_filename argument is
/// provided only for debugging purposes.
fn execute2(
&mut self,
js_filename: &str,
js_source: &str,
) -> Result<(), ErrBox> {
let mut isolate = self.isolate.try_lock().unwrap();
isolate.execute(js_filename, js_source)
}
/// Executes the provided JavaScript module.
///
/// Takes ownership of the isolate behind mutex.
pub async fn execute_mod_async(
&mut self,
module_specifier: &ModuleSpecifier,
maybe_code: Option<String>,
is_prefetch: bool,
) -> Result<(), ErrBox> {
let specifier = module_specifier.to_string();
let worker = self.clone();
let mut isolate = self.isolate.lock().await;
let id = isolate.load_module(&specifier, maybe_code).await?;
worker.state.global_state.progress.done();
if !is_prefetch {
return isolate.mod_evaluate(id);
}
Ok(())
}
/// Post message to worker as a host.
///
/// This method blocks current thread.
pub fn post_message(
&self,
buf: Buf,
) -> impl Future<Output = Result<(), ErrBox>> {
let channels = self.external_channels.lock().unwrap();
let mut sender = channels.sender.clone();
async move {
let result = sender.send(buf).map_err(ErrBox::from).await;
drop(sender);
result
}
}
/// Get message from worker as a host.
pub fn get_message(&self) -> WorkerReceiver {
WorkerReceiver {
channels: self.external_channels.clone(),
}
}
pub fn clear_exception(&mut self) {
let mut isolate = self.isolate.try_lock().unwrap();
isolate.clear_exception();
}
}
impl Future for WebWorker {
type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let waker = AtomicWaker::new();
waker.register(cx.waker());
match inner.isolate.try_lock() {
Ok(mut isolate) => isolate.poll_unpin(cx),
Err(_) => {
waker.wake();
Poll::Pending
}
}
}
}

View file

@ -48,55 +48,42 @@ impl Worker {
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let isolate = Arc::new(AsyncMutex::new(deno_core::EsIsolate::new(
Box::new(state.clone()),
startup_data,
false,
)));
{
let mut i = isolate.try_lock().unwrap();
let op_registry = i.op_registry.clone();
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
let op_registry = isolate.op_registry.clone();
ops::compiler::init(&mut i, &state);
ops::errors::init(&mut i, &state);
ops::fetch::init(&mut i, &state);
ops::files::init(&mut i, &state);
ops::fs::init(&mut i, &state);
ops::io::init(&mut i, &state);
ops::plugins::init(&mut i, &state, op_registry);
ops::net::init(&mut i, &state);
ops::tls::init(&mut i, &state);
ops::os::init(&mut i, &state);
ops::permissions::init(&mut i, &state);
ops::process::init(&mut i, &state);
ops::random::init(&mut i, &state);
ops::repl::init(&mut i, &state);
ops::resources::init(&mut i, &state);
ops::timers::init(&mut i, &state);
ops::workers::init(&mut i, &state);
ops::compiler::init(&mut isolate, &state);
ops::errors::init(&mut isolate, &state);
ops::fetch::init(&mut isolate, &state);
ops::files::init(&mut isolate, &state);
ops::fs::init(&mut isolate, &state);
ops::io::init(&mut isolate, &state);
ops::plugins::init(&mut isolate, &state, op_registry);
ops::net::init(&mut isolate, &state);
ops::tls::init(&mut isolate, &state);
ops::os::init(&mut isolate, &state);
ops::permissions::init(&mut isolate, &state);
ops::process::init(&mut isolate, &state);
ops::random::init(&mut isolate, &state);
ops::repl::init(&mut isolate, &state);
ops::resources::init(&mut isolate, &state);
ops::timers::init(&mut isolate, &state);
ops::worker_host::init(&mut isolate, &state);
ops::web_worker::init(&mut isolate, &state);
let global_state_ = state.global_state.clone();
i.set_js_error_create(move |v8_exception| {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
let global_state_ = state.global_state.clone();
isolate.set_js_error_create(move |v8_exception| {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
});
Self {
name,
isolate,
isolate: Arc::new(AsyncMutex::new(isolate)),
state,
external_channels: Arc::new(Mutex::new(external_channels)),
}
}
pub fn set_error_handler(
&mut self,
handler: Box<dyn FnMut(ErrBox) -> Result<(), ErrBox>>,
) {
let mut i = self.isolate.try_lock().unwrap();
i.set_error_handler(handler);
}
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
let path = env::current_dir().unwrap().join("__anonymous__");
@ -188,7 +175,7 @@ impl Future for Worker {
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
channels: Arc<Mutex<WorkerChannels>>,
pub channels: Arc<Mutex<WorkerChannels>>,
}
impl Future for WorkerReceiver {
@ -255,7 +242,6 @@ mod tests {
global_state,
None,
Some(module_specifier.clone()),
true,
int,
)
.unwrap();
@ -299,7 +285,6 @@ mod tests {
global_state,
None,
Some(module_specifier.clone()),
true,
int,
)
.unwrap();
@ -342,7 +327,6 @@ mod tests {
global_state.clone(),
None,
Some(module_specifier.clone()),
true,
int,
)
.unwrap();

View file

@ -440,6 +440,8 @@ impl Isolate {
isolate.exit();
}
// TODO(bartlomieju): `error_handler` should be removed
#[allow(dead_code)]
pub fn set_error_handler(&mut self, handler: Box<IsolateErrorHandleFn>) {
self.error_handler = Some(handler);
}