From 2debbdacb935cfe1eb7bb8d1f40a5063b339d90b Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 8 Apr 2019 17:10:00 -0400 Subject: [PATCH] Merge Worker and Isolate types (#2078) Reduces generics. --- cli/cli_behavior.rs | 14 +- cli/compiler.rs | 171 +++++++------------ cli/isolate_state.rs | 38 ++--- cli/main.rs | 18 +- cli/ops.rs | 299 ++++++++++++++++------------------ cli/resources.rs | 17 +- cli/{isolate.rs => worker.rs} | 143 +++++++++++++--- cli/workers.rs | 284 -------------------------------- js/compiler.ts | 1 - js/workers.ts | 5 +- 10 files changed, 359 insertions(+), 631 deletions(-) rename cli/{isolate.rs => worker.rs} (68%) delete mode 100644 cli/workers.rs diff --git a/cli/cli_behavior.rs b/cli/cli_behavior.rs index 860ec76ece..510b2608c8 100644 --- a/cli/cli_behavior.rs +++ b/cli/cli_behavior.rs @@ -17,24 +17,12 @@ impl CliBehavior { } } -impl IsolateStateContainer for &CliBehavior { - fn state(&self) -> Arc { - self.state.clone() - } -} - -impl IsolateStateContainer for CliBehavior { - fn state(&self) -> Arc { - self.state.clone() - } -} - impl Behavior for CliBehavior { fn dispatch( &mut self, control: &[u8], zero_copy: deno_buf, ) -> (bool, Box) { - ops::dispatch_all(self, control, zero_copy, ops::op_selector_std) + ops::dispatch_all(&self.state, control, zero_copy, ops::op_selector_std) } } diff --git a/cli/compiler.rs b/cli/compiler.rs index e4d76c44ca..d01e9fc0b2 100644 --- a/cli/compiler.rs +++ b/cli/compiler.rs @@ -1,22 +1,17 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::flags::DenoFlags; +use crate::cli_behavior::CliBehavior; use crate::isolate_state::*; use crate::js_errors; use crate::js_errors::JSErrorColor; use crate::msg; -use crate::ops; use crate::resources; use crate::resources::ResourceId; use crate::startup_data; use crate::tokio_util; -use crate::workers; -use crate::workers::WorkerBehavior; -use crate::workers::WorkerInit; -use deno::deno_buf; -use deno::Behavior; +use crate::worker::Worker; +use deno::js_check; use deno::Buf; use deno::JSError; -use deno::Op; use futures::future::*; use futures::sync::oneshot; use futures::Future; @@ -44,51 +39,6 @@ lazy_static! { static ref C_RUNTIME: Mutex = Mutex::new(Runtime::new().unwrap()); } -pub struct CompilerBehavior { - pub state: Arc, -} - -impl CompilerBehavior { - pub fn new(flags: DenoFlags, argv_rest: Vec) -> Self { - Self { - state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), - } - } -} - -impl IsolateStateContainer for CompilerBehavior { - fn state(&self) -> Arc { - self.state.clone() - } -} - -impl IsolateStateContainer for &CompilerBehavior { - fn state(&self) -> Arc { - self.state.clone() - } -} - -impl Behavior for CompilerBehavior { - fn dispatch( - &mut self, - control: &[u8], - zero_copy: deno_buf, - ) -> (bool, Box) { - ops::dispatch_all(self, control, zero_copy, ops::op_selector_compiler) - } -} - -impl WorkerBehavior for CompilerBehavior { - fn set_internal_channels(&mut self, worker_channels: WorkerChannels) { - self.state = Arc::new(IsolateState::new( - self.state.flags.clone(), - self.state.argv.clone(), - Some(worker_channels), - true, - )); - } -} - // This corresponds to JS ModuleMetaData. // TODO Rename one or the other so they correspond. #[derive(Debug, Clone)] @@ -142,74 +92,67 @@ fn lazy_start(parent_state: Arc) -> ResourceId { let mut cell = C_RID.lock().unwrap(); cell .get_or_insert_with(|| { - let worker_result = workers::spawn( + let child_state = Arc::new(IsolateState::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + )); + let rid = child_state.resource.rid; + let resource = child_state.resource.clone(); + let behavior = CliBehavior::new(child_state); + + let mut worker = Worker::new( + "TS".to_string(), startup_data::compiler_isolate_init(), - CompilerBehavior::new( - parent_state.flags.clone(), - parent_state.argv.clone(), - ), - "TS", - WorkerInit::Script("compilerMain()".to_string()), + behavior, ); - match worker_result { - Ok(worker) => { - let rid = worker.resource.rid; - let mut runtime = C_RUNTIME.lock().unwrap(); - runtime.spawn(lazy(move || { - let resource = worker.resource.clone(); - worker.then(move |result| -> Result<(), ()> { - // Close resource so the future created by - // handle_worker_message_stream exits - resource.close(); - debug!("Compiler worker exited!"); - if let Err(e) = result { - eprintln!("{}", JSErrorColor(&e).to_string()); - } - std::process::exit(1); - }) - })); - runtime.spawn(lazy(move || { - debug!("Start worker stream handler!"); - let worker_stream = resources::get_message_stream_from_worker(rid); - worker_stream - .for_each(|msg: Buf| { - // All worker responses are handled here first before being sent via - // their respective sender. This system can be compared to the - // promise system used on the js side. This provides a way to - // resolve many futures via the same channel. - let res_json = std::str::from_utf8(&msg).unwrap(); - debug!("Got message from worker: {}", res_json); - // Get the intended receiver's cmd_id from the message. - let cmd_id = parse_cmd_id(res_json); - let mut table = C_RES_SENDER_TABLE.lock().unwrap(); - debug!("Cmd id for get message handler: {}", cmd_id); - // Get the corresponding response sender from the table and - // send a response. - let response_sender = table.remove(&(cmd_id as CmdId)).unwrap(); - response_sender.send(msg).unwrap(); - Ok(()) - }).map_err(|_| ()) - })); - rid - } - Err(err) => { - println!("{}", err.to_string()); + + js_check(worker.execute("denoMain()")); + js_check(worker.execute("workerMain()")); + js_check(worker.execute("compilerMain()")); + + let mut runtime = C_RUNTIME.lock().unwrap(); + runtime.spawn(lazy(move || { + worker.then(move |result| -> Result<(), ()> { + // Close resource so the future created by + // handle_worker_message_stream exits + resource.close(); + debug!("Compiler worker exited!"); + if let Err(e) = result { + eprintln!("{}", JSErrorColor(&e).to_string()); + } std::process::exit(1); - } - } + }) + })); + runtime.spawn(lazy(move || { + debug!("Start worker stream handler!"); + let worker_stream = resources::get_message_stream_from_worker(rid); + worker_stream + .for_each(|msg: Buf| { + // All worker responses are handled here first before being sent via + // their respective sender. This system can be compared to the + // promise system used on the js side. This provides a way to + // resolve many futures via the same channel. + let res_json = std::str::from_utf8(&msg).unwrap(); + debug!("Got message from worker: {}", res_json); + // Get the intended receiver's cmd_id from the message. + let cmd_id = parse_cmd_id(res_json); + let mut table = C_RES_SENDER_TABLE.lock().unwrap(); + debug!("Cmd id for get message handler: {}", cmd_id); + // Get the corresponding response sender from the table and + // send a response. + let response_sender = table.remove(&(cmd_id as CmdId)).unwrap(); + response_sender.send(msg).unwrap(); + Ok(()) + }).map_err(|_| ()) + })); + rid }).clone() } -fn req( - specifier: &str, - referrer: &str, - is_worker_main: bool, - cmd_id: u32, -) -> Buf { +fn req(specifier: &str, referrer: &str, cmd_id: u32) -> Buf { json!({ "specifier": specifier, "referrer": referrer, - "isWorker": is_worker_main, "cmdId": cmd_id, }).to_string() .into_boxed_str() @@ -228,7 +171,7 @@ pub fn compile_async( ); let cmd_id = new_cmd_id(); - let req_msg = req(&specifier, &referrer, parent_state.is_worker, cmd_id); + let req_msg = req(&specifier, &referrer, cmd_id); let module_meta_data_ = module_meta_data.clone(); let compiler_rid = lazy_start(parent_state.clone()); @@ -362,7 +305,7 @@ mod tests { fn test_parse_cmd_id() { let cmd_id = new_cmd_id(); - let msg = req("Hello", "World", false, cmd_id); + let msg = req("Hello", "World", cmd_id); let res_json = std::str::from_utf8(&msg).unwrap(); diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs index 313f4f6cef..a672f5ee2c 100644 --- a/cli/isolate_state.rs +++ b/cli/isolate_state.rs @@ -5,25 +5,23 @@ use crate::flags; use crate::global_timer::GlobalTimer; use crate::modules::Modules; use crate::permissions::DenoPermissions; +use crate::resources; use crate::resources::ResourceId; -use crate::workers::UserWorkerBehavior; -use crate::workers::Worker; +use crate::worker::Worker; use deno::Buf; use futures::future::Shared; -use futures::sync::mpsc as async_mpsc; use std; use std::collections::HashMap; use std::env; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; +use tokio::sync::mpsc as async_mpsc; pub type WorkerSender = async_mpsc::Sender; pub type WorkerReceiver = async_mpsc::Receiver; pub type WorkerChannels = (WorkerSender, WorkerReceiver); -pub type UserWorkerTable = - HashMap>>; +pub type UserWorkerTable = HashMap>; // AtomicU64 is currently unstable #[derive(Default)] @@ -48,22 +46,23 @@ pub struct IsolateState { pub flags: flags::DenoFlags, pub metrics: Metrics, pub modules: Mutex, - pub worker_channels: Option>, + pub worker_channels: Mutex, pub global_timer: Mutex, pub workers: Mutex, - pub is_worker: bool, pub start_time: Instant, + pub resource: resources::Resource, } impl IsolateState { - pub fn new( - flags: flags::DenoFlags, - argv_rest: Vec, - worker_channels: Option, - is_worker: bool, - ) -> Self { + pub fn new(flags: flags::DenoFlags, argv_rest: Vec) -> Self { let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); + let (worker_in_tx, worker_in_rx) = async_mpsc::channel::(1); + let (worker_out_tx, worker_out_rx) = async_mpsc::channel::(1); + let internal_channels = (worker_out_tx, worker_in_rx); + let external_channels = (worker_in_tx, worker_out_rx); + let resource = resources::add_worker(external_channels); + Self { dir: deno_dir::DenoDir::new(custom_root).unwrap(), argv: argv_rest, @@ -71,11 +70,11 @@ impl IsolateState { flags, metrics: Metrics::default(), modules: Mutex::new(Modules::new()), - worker_channels: worker_channels.map(Mutex::new), + worker_channels: Mutex::new(internal_channels), global_timer: Mutex::new(GlobalTimer::new()), workers: Mutex::new(UserWorkerTable::new()), - is_worker, start_time: Instant::now(), + resource, } } @@ -126,7 +125,7 @@ impl IsolateState { let argv = vec![String::from("./deno"), String::from("hello.js")]; // For debugging: argv.push_back(String::from("-D")); let (flags, rest_argv) = flags::set_flags(argv).unwrap(); - IsolateState::new(flags, rest_argv, None, false) + IsolateState::new(flags, rest_argv) } pub fn metrics_op_dispatched( @@ -153,8 +152,3 @@ impl IsolateState { .fetch_add(bytes_received, Ordering::SeqCst); } } - -/// Provides state getter function -pub trait IsolateStateContainer { - fn state(&self) -> Arc; -} diff --git a/cli/main.rs b/cli/main.rs index bda4f7b8f6..346bbbd890 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -20,7 +20,6 @@ mod fs; mod global_timer; mod http_body; mod http_util; -pub mod isolate; pub mod isolate_state; pub mod js_errors; pub mod modules; @@ -35,12 +34,12 @@ mod startup_data; mod tokio_util; mod tokio_write; pub mod version; -pub mod workers; +pub mod worker; use crate::cli_behavior::CliBehavior; use crate::errors::RustOrJsError; -use crate::isolate::Isolate; use crate::isolate_state::IsolateState; +use crate::worker::Worker; use futures::lazy; use futures::Future; use log::{LevelFilter, Metadata, Record}; @@ -105,27 +104,28 @@ fn main() { let should_prefetch = flags.prefetch || flags.info; let should_display_info = flags.info; - let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); + let state = Arc::new(IsolateState::new(flags, rest_argv)); let state_ = state.clone(); let cli = CliBehavior::new(state_); - let mut isolate = Isolate::new(startup_data::deno_isolate_init(), cli); + let mut main_worker = + Worker::new("main".to_string(), startup_data::deno_isolate_init(), cli); let main_future = lazy(move || { // Setup runtime. - js_check(isolate.execute("denoMain()")); + js_check(main_worker.execute("denoMain()")); // Execute main module. if let Some(main_module) = state.main_module() { debug!("main_module {}", main_module); - js_check(isolate.execute_mod(&main_module, should_prefetch)); + js_check(main_worker.execute_mod(&main_module, should_prefetch)); if should_display_info { // Display file info and exit. Do not run file - isolate.print_file_info(&main_module); + main_worker.print_file_info(&main_module); std::process::exit(0); } } - isolate.then(|result| { + main_worker.then(|result| { js_check(result); Ok(()) }) diff --git a/cli/ops.rs b/cli/ops.rs index d5656a3b62..3d43b24018 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -1,11 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use atty; use crate::ansi; +use crate::cli_behavior::CliBehavior; use crate::errors; use crate::errors::{DenoError, DenoResult, ErrorKind}; use crate::fs as deno_fs; use crate::http_util; -use crate::isolate_state::{IsolateState, IsolateStateContainer}; +use crate::isolate_state::IsolateState; use crate::js_errors::apply_source_map; use crate::js_errors::JSErrorColor; use crate::msg; @@ -19,8 +20,9 @@ use crate::startup_data; use crate::tokio_util; use crate::tokio_write; use crate::version; -use crate::workers; +use crate::worker::Worker; use deno::deno_buf; +use deno::js_check; use deno::Buf; use deno::JSError; use deno::Op; @@ -60,7 +62,7 @@ pub type OpWithError = dyn Future + Send; // TODO Ideally we wouldn't have to box the OpWithError being returned. // The box is just to make it easier to get a prototype refactor working. type OpCreator = - fn(sc: &IsolateStateContainer, base: &msg::Base<'_>, data: deno_buf) + fn(state: &Arc, base: &msg::Base<'_>, data: deno_buf) -> Box; type OpSelector = fn(inner_type: msg::Any) -> Option; @@ -75,7 +77,7 @@ fn empty_buf() -> Buf { /// control corresponds to the first argument of Deno.core.dispatch(). /// data corresponds to the second argument of Deno.core.dispatch(). pub fn dispatch_all( - sc: &IsolateStateContainer, + state: &Arc, control: &[u8], zero_copy: deno_buf, op_selector: OpSelector, @@ -92,10 +94,9 @@ pub fn dispatch_all( None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)), }; - let state = sc.state().clone(); - - let op: Box = op_func(sc, &base, zero_copy); + let op: Box = op_func(state, &base, zero_copy); + let state = state.clone(); state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); let boxed_op = Box::new( @@ -143,23 +144,6 @@ pub fn dispatch_all( (base.sync(), boxed_op) } -/// Superset of op_selector_worker for compiler isolates -pub fn op_selector_compiler(inner_type: msg::Any) -> Option { - match inner_type { - msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data), - _ => op_selector_worker(inner_type), - } -} - -/// Superset of op_selector_std for worker isolates -pub fn op_selector_worker(inner_type: msg::Any) -> Option { - match inner_type { - msg::Any::WorkerGetMessage => Some(op_worker_get_message), - msg::Any::WorkerPostMessage => Some(op_worker_post_message), - _ => op_selector_std(inner_type), - } -} - /// Standard ops set for most isolates pub fn op_selector_std(inner_type: msg::Any) -> Option { match inner_type { @@ -208,6 +192,14 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option { msg::Any::HostGetMessage => Some(op_host_get_message), msg::Any::HostPostMessage => Some(op_host_post_message), msg::Any::Write => Some(op_write), + + // TODO(ry) split these out so that only the appropriate Workers can access + // them. Only the compiler worker should be able to access + // FetchModuleMetaData. + msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data), + msg::Any::WorkerGetMessage => Some(op_worker_get_message), + msg::Any::WorkerPostMessage => Some(op_worker_post_message), + _ => None, } } @@ -217,19 +209,19 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option { // If the High precision flag is not set, the // nanoseconds are rounded on 2ms. fn op_now( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert_eq!(data.len(), 0); - let seconds = sc.state().start_time.elapsed().as_secs(); - let mut subsec_nanos = sc.state().start_time.elapsed().subsec_nanos(); + let seconds = state.start_time.elapsed().as_secs(); + let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); let reduced_time_precision = 2000000; // 2ms in nanoseconds // If the permission is not enabled // Round the nano result on 2 milliseconds // see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision - if !sc.state().permissions.allows_high_precision() { + if !state.permissions.allows_high_precision() { subsec_nanos -= subsec_nanos % reduced_time_precision } @@ -253,7 +245,7 @@ fn op_now( } fn op_is_tty( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, _data: deno_buf, ) -> Box { @@ -278,7 +270,7 @@ fn op_is_tty( } fn op_exit( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, _data: deno_buf, ) -> Box { @@ -287,14 +279,14 @@ fn op_exit( } fn op_start( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert_eq!(data.len(), 0); let mut builder = FlatBufferBuilder::new(); - let state = sc.state(); + let state = state; let argv = state.argv.iter().map(|s| s.as_str()).collect::>(); let argv_off = builder.create_vector_of_strings(argv.as_slice()); @@ -311,7 +303,7 @@ fn op_start( let deno_version = version::DENO; let deno_version_off = builder.create_string(deno_version); - let main_module = sc.state().main_module().map(|m| builder.create_string(&m)); + let main_module = state.main_module().map(|m| builder.create_string(&m)); let inner = msg::StartRes::create( &mut builder, @@ -320,9 +312,9 @@ fn op_start( pid: std::process::id(), argv: Some(argv_off), main_module, - debug_flag: sc.state().flags.log_debug, - types_flag: sc.state().flags.types, - version_flag: sc.state().flags.version, + debug_flag: state.flags.log_debug, + types_flag: state.flags.types, + version_flag: state.flags.version, v8_version: Some(v8_version_off), deno_version: Some(deno_version_off), no_color: !ansi::use_color(), @@ -343,7 +335,7 @@ fn op_start( } fn op_format_error( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -352,7 +344,7 @@ fn op_format_error( let orig_error = String::from(inner.error().unwrap()); let js_error = JSError::from_v8_exception(&orig_error).unwrap(); - let js_error_mapped = apply_source_map(&js_error, &sc.state().dir); + let js_error_mapped = apply_source_map(&js_error, &state.dir); let js_error_string = JSErrorColor(&js_error_mapped).to_string(); let mut builder = FlatBufferBuilder::new(); @@ -402,7 +394,7 @@ pub fn odd_future(err: DenoError) -> Box { // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 fn op_fetch_module_meta_data( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -412,19 +404,14 @@ fn op_fetch_module_meta_data( let specifier = inner.specifier().unwrap(); let referrer = inner.referrer().unwrap(); - assert_eq!( - sc.state().dir.root.join("gen"), - sc.state().dir.gen, - "Sanity check" - ); + assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check"); - let use_cache = !sc.state().flags.reload; + let use_cache = !state.flags.reload; Box::new(futures::future::result(|| -> OpResult { let builder = &mut FlatBufferBuilder::new(); // TODO(ry) Use fetch_module_meta_data_async. - let out = sc - .state() + let out = state .dir .fetch_module_meta_data(specifier, referrer, use_cache)?; let data_off = builder.create_vector(out.source_code.as_slice()); @@ -448,7 +435,7 @@ fn op_fetch_module_meta_data( } fn op_chdir( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -462,20 +449,20 @@ fn op_chdir( } fn op_global_timer_stop( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert!(base.sync()); assert_eq!(data.len(), 0); - let state = sc.state(); + let state = state; let mut t = state.global_timer.lock().unwrap(); t.cancel(); ok_future(empty_buf()) } fn op_global_timer( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -486,7 +473,7 @@ fn op_global_timer( let val = inner.timeout(); assert!(val >= 0); - let state = sc.state(); + let state = state; let mut t = state.global_timer.lock().unwrap(); let deadline = Instant::now() + Duration::from_millis(val as u64); let f = t.new_timeout(deadline); @@ -508,7 +495,7 @@ fn op_global_timer( } fn op_set_env( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -516,7 +503,7 @@ fn op_set_env( let inner = base.inner_as_set_env().unwrap(); let key = inner.key().unwrap(); let value = inner.value().unwrap(); - if let Err(e) = sc.state().check_env() { + if let Err(e) = state.check_env() { return odd_future(e); } std::env::set_var(key, value); @@ -524,14 +511,14 @@ fn op_set_env( } fn op_env( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert_eq!(data.len(), 0); let cmd_id = base.cmd_id(); - if let Err(e) = sc.state().check_env() { + if let Err(e) = state.check_env() { return odd_future(e); } @@ -556,7 +543,7 @@ fn op_env( } fn op_permissions( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -566,12 +553,12 @@ fn op_permissions( let inner = msg::PermissionsRes::create( builder, &msg::PermissionsResArgs { - run: sc.state().permissions.allows_run(), - read: sc.state().permissions.allows_read(), - write: sc.state().permissions.allows_write(), - net: sc.state().permissions.allows_net(), - env: sc.state().permissions.allows_env(), - high_precision: sc.state().permissions.allows_high_precision(), + run: state.permissions.allows_run(), + read: state.permissions.allows_read(), + write: state.permissions.allows_write(), + net: state.permissions.allows_net(), + env: state.permissions.allows_env(), + high_precision: state.permissions.allows_high_precision(), }, ); ok_future(serialize_response( @@ -586,7 +573,7 @@ fn op_permissions( } fn op_revoke_permission( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -594,12 +581,12 @@ fn op_revoke_permission( let inner = base.inner_as_permission_revoke().unwrap(); let permission = inner.permission().unwrap(); let result = match permission { - "run" => sc.state().permissions.revoke_run(), - "read" => sc.state().permissions.revoke_read(), - "write" => sc.state().permissions.revoke_write(), - "net" => sc.state().permissions.revoke_net(), - "env" => sc.state().permissions.revoke_env(), - "highPrecision" => sc.state().permissions.revoke_high_precision(), + "run" => state.permissions.revoke_run(), + "read" => state.permissions.revoke_read(), + "write" => state.permissions.revoke_write(), + "net" => state.permissions.revoke_net(), + "env" => state.permissions.revoke_env(), + "highPrecision" => state.permissions.revoke_high_precision(), _ => Ok(()), }; if let Err(e) = result { @@ -609,7 +596,7 @@ fn op_revoke_permission( } fn op_fetch( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -632,7 +619,7 @@ fn op_fetch( } let req = maybe_req.unwrap(); - if let Err(e) = sc.state().check_net(url) { + if let Err(e) = state.check_net(url) { return odd_future(e); } @@ -696,7 +683,7 @@ where } fn op_make_temp_dir( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -706,7 +693,7 @@ fn op_make_temp_dir( let cmd_id = base.cmd_id(); // FIXME - if let Err(e) = sc.state().check_write("make_temp") { + if let Err(e) = state.check_write("make_temp") { return odd_future(e); } @@ -745,7 +732,7 @@ fn op_make_temp_dir( } fn op_mkdir( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -755,7 +742,7 @@ fn op_mkdir( let recursive = inner.recursive(); let mode = inner.mode(); - if let Err(e) = sc.state().check_write(&path) { + if let Err(e) = state.check_write(&path) { return odd_future(e); } @@ -767,7 +754,7 @@ fn op_mkdir( } fn op_chmod( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -776,7 +763,7 @@ fn op_chmod( let _mode = inner.mode(); let path = String::from(inner.path().unwrap()); - if let Err(e) = sc.state().check_write(&path) { + if let Err(e) = state.check_write(&path) { return odd_future(e); } @@ -806,7 +793,7 @@ fn op_chmod( } fn op_open( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -855,20 +842,20 @@ fn op_open( match mode { "r" => { - if let Err(e) = sc.state().check_read(&filename_str) { + if let Err(e) = state.check_read(&filename_str) { return odd_future(e); } } "w" | "a" | "x" => { - if let Err(e) = sc.state().check_write(&filename_str) { + if let Err(e) = state.check_write(&filename_str) { return odd_future(e); } } &_ => { - if let Err(e) = sc.state().check_read(&filename_str) { + if let Err(e) = state.check_read(&filename_str) { return odd_future(e); } - if let Err(e) = sc.state().check_write(&filename_str) { + if let Err(e) = state.check_write(&filename_str) { return odd_future(e); } } @@ -896,7 +883,7 @@ fn op_open( } fn op_close( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -913,7 +900,7 @@ fn op_close( } fn op_shutdown( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -939,7 +926,7 @@ fn op_shutdown( } fn op_read( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -977,7 +964,7 @@ fn op_read( } fn op_write( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1014,7 +1001,7 @@ fn op_write( } fn op_seek( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1036,7 +1023,7 @@ fn op_seek( } fn op_remove( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1046,7 +1033,7 @@ fn op_remove( let path = PathBuf::from(path_); let recursive = inner.recursive(); - if let Err(e) = sc.state().check_write(path.to_str().unwrap()) { + if let Err(e) = state.check_write(path.to_str().unwrap()) { return odd_future(e); } @@ -1065,7 +1052,7 @@ fn op_remove( } fn op_copy_file( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1076,10 +1063,10 @@ fn op_copy_file( let to_ = inner.to().unwrap(); let to = PathBuf::from(to_); - if let Err(e) = sc.state().check_read(&from_) { + if let Err(e) = state.check_read(&from_) { return odd_future(e); } - if let Err(e) = sc.state().check_write(&to_) { + if let Err(e) = state.check_write(&to_) { return odd_future(e); } @@ -1121,7 +1108,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 { } fn op_cwd( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1147,7 +1134,7 @@ fn op_cwd( } fn op_stat( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1158,7 +1145,7 @@ fn op_stat( let filename = PathBuf::from(filename_); let lstat = inner.lstat(); - if let Err(e) = sc.state().check_read(&filename_) { + if let Err(e) = state.check_read(&filename_) { return odd_future(e); } @@ -1199,7 +1186,7 @@ fn op_stat( } fn op_read_dir( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1208,7 +1195,7 @@ fn op_read_dir( let cmd_id = base.cmd_id(); let path = String::from(inner.path().unwrap()); - if let Err(e) = sc.state().check_read(&path) { + if let Err(e) = state.check_read(&path) { return odd_future(e); } @@ -1260,7 +1247,7 @@ fn op_read_dir( } fn op_rename( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1269,7 +1256,7 @@ fn op_rename( let oldpath = PathBuf::from(inner.oldpath().unwrap()); let newpath_ = inner.newpath().unwrap(); let newpath = PathBuf::from(newpath_); - if let Err(e) = sc.state().check_write(&newpath_) { + if let Err(e) = state.check_write(&newpath_) { return odd_future(e); } blocking(base.sync(), move || -> OpResult { @@ -1280,7 +1267,7 @@ fn op_rename( } fn op_link( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1290,7 +1277,7 @@ fn op_link( let newname_ = inner.newname().unwrap(); let newname = PathBuf::from(newname_); - if let Err(e) = sc.state().check_write(&newname_) { + if let Err(e) = state.check_write(&newname_) { return odd_future(e); } @@ -1302,7 +1289,7 @@ fn op_link( } fn op_symlink( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1312,7 +1299,7 @@ fn op_symlink( let newname_ = inner.newname().unwrap(); let newname = PathBuf::from(newname_); - if let Err(e) = sc.state().check_write(&newname_) { + if let Err(e) = state.check_write(&newname_) { return odd_future(e); } // TODO Use type for Windows. @@ -1331,7 +1318,7 @@ fn op_symlink( } fn op_read_link( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1341,7 +1328,7 @@ fn op_read_link( let name_ = inner.name().unwrap(); let name = PathBuf::from(name_); - if let Err(e) = sc.state().check_read(&name_) { + if let Err(e) = state.check_read(&name_) { return odd_future(e); } @@ -1369,7 +1356,7 @@ fn op_read_link( } fn op_repl_start( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1379,7 +1366,7 @@ fn op_repl_start( let history_file = String::from(inner.history_file().unwrap()); debug!("op_repl_start {}", history_file); - let history_path = repl::history_path(&sc.state().dir, &history_file); + let history_path = repl::history_path(&state.dir, &history_file); let repl = repl::Repl::new(history_path); let resource = resources::add_repl(repl); @@ -1400,7 +1387,7 @@ fn op_repl_start( } fn op_repl_readline( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1436,7 +1423,7 @@ fn op_repl_readline( } fn op_truncate( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1446,7 +1433,7 @@ fn op_truncate( let filename = String::from(inner.name().unwrap()); let len = inner.len(); - if let Err(e) = sc.state().check_write(&filename) { + if let Err(e) = state.check_write(&filename) { return odd_future(e); } @@ -1459,12 +1446,12 @@ fn op_truncate( } fn op_listen( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert_eq!(data.len(), 0); - if let Err(e) = sc.state().check_net("listen") { + if let Err(e) = state.check_net("listen") { return odd_future(e); } @@ -1521,12 +1508,12 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { } fn op_accept( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert_eq!(data.len(), 0); - if let Err(e) = sc.state().check_net("accept") { + if let Err(e) = state.check_net("accept") { return odd_future(e); } let cmd_id = base.cmd_id(); @@ -1547,12 +1534,12 @@ fn op_accept( } fn op_dial( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert_eq!(data.len(), 0); - if let Err(e) = sc.state().check_net("dial") { + if let Err(e) = state.check_net("dial") { return odd_future(e); } let cmd_id = base.cmd_id(); @@ -1573,7 +1560,7 @@ fn op_dial( } fn op_metrics( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1583,7 +1570,7 @@ fn op_metrics( let builder = &mut FlatBufferBuilder::new(); let inner = msg::MetricsRes::create( builder, - &msg::MetricsResArgs::from(&sc.state().metrics), + &msg::MetricsResArgs::from(&state.metrics), ); ok_future(serialize_response( cmd_id, @@ -1597,7 +1584,7 @@ fn op_metrics( } fn op_resources( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1649,14 +1636,14 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio { } fn op_run( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { assert!(base.sync()); let cmd_id = base.cmd_id(); - if let Err(e) = sc.state().check_run() { + if let Err(e) = state.check_run() { return odd_future(e); } @@ -1722,7 +1709,7 @@ fn op_run( } fn op_run_status( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1731,7 +1718,7 @@ fn op_run_status( let inner = base.inner_as_run_status().unwrap(); let rid = inner.rid(); - if let Err(e) = sc.state().check_run() { + if let Err(e) = state.check_run() { return odd_future(e); } @@ -1786,20 +1773,16 @@ impl Future for GetMessageFuture { type Error = (); fn poll(&mut self) -> Result, Self::Error> { - assert!(self.state.worker_channels.is_some()); - match self.state.worker_channels { - None => panic!("expected worker_channels"), - Some(ref wc) => { - let mut wc = wc.lock().unwrap(); - wc.1.poll() - } - } + let mut wc = self.state.worker_channels.lock().unwrap(); + wc.1 + .poll() + .map_err(|err| panic!("worker_channel recv err {:?}", err)) } } /// Get message from host as guest worker fn op_worker_get_message( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1807,7 +1790,7 @@ fn op_worker_get_message( let cmd_id = base.cmd_id(); let op = GetMessageFuture { - state: sc.state().clone(), + state: state.clone(), }; let op = op.map_err(move |_| -> DenoError { unimplemented!() }); let op = op.and_then(move |maybe_buf| -> DenoResult { @@ -1834,7 +1817,7 @@ fn op_worker_get_message( /// Post message to host as guest worker fn op_worker_post_message( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1842,13 +1825,9 @@ fn op_worker_post_message( let d = Vec::from(data.as_ref()).into_boxed_slice(); - assert!(sc.state().worker_channels.is_some()); - let tx = match sc.state().worker_channels { - None => panic!("expected worker_channels"), - Some(ref wc) => { - let wc = wc.lock().unwrap(); - wc.0.clone() - } + let tx = { + let wc = state.worker_channels.lock().unwrap(); + wc.0.clone() }; let op = tx.send(d); let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); @@ -1868,7 +1847,7 @@ fn op_worker_post_message( /// Create worker as the host fn op_create_worker( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1878,20 +1857,24 @@ fn op_create_worker( let specifier = inner.specifier().unwrap(); Box::new(futures::future::result(move || -> OpResult { - let parent_state = sc.state().clone(); - let behavior = workers::UserWorkerBehavior::new( + let parent_state = state.clone(); + + let child_state = Arc::new(IsolateState::new( parent_state.flags.clone(), parent_state.argv.clone(), - ); - match workers::spawn( - startup_data::deno_isolate_init(), - behavior, - &format!("USER-WORKER-{}", specifier), - workers::WorkerInit::Module(specifier.to_string()), - ) { - Ok(worker) => { + )); + let rid = child_state.resource.rid; + let behavior = CliBehavior::new(child_state); + let name = format!("USER-WORKER-{}", specifier); + + let mut worker = + Worker::new(name, startup_data::deno_isolate_init(), behavior); + js_check(worker.execute("denoMain()")); + js_check(worker.execute("workerMain()")); + let result = worker.execute_mod(specifier, false); + match result { + Ok(_) => { let mut workers_tl = parent_state.workers.lock().unwrap(); - let rid = worker.resource.rid; workers_tl.insert(rid, worker.shared()); let builder = &mut FlatBufferBuilder::new(); let msg_inner = msg::CreateWorkerRes::create( @@ -1916,7 +1899,7 @@ fn op_create_worker( /// Return when the worker closes fn op_host_get_worker_closed( - sc: &IsolateStateContainer, + state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1924,7 +1907,7 @@ fn op_host_get_worker_closed( let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_worker_closed().unwrap(); let rid = inner.rid(); - let state = sc.state().clone(); + let state = state.clone(); let shared_worker_future = { let workers_tl = state.workers.lock().unwrap(); @@ -1947,7 +1930,7 @@ fn op_host_get_worker_closed( /// Get message from guest worker as host fn op_host_get_message( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1981,7 +1964,7 @@ fn op_host_get_message( /// Post message to guest worker as host fn op_host_post_message( - _sc: &IsolateStateContainer, + _state: &Arc, base: &msg::Base<'_>, data: deno_buf, ) -> Box { diff --git a/cli/resources.rs b/cli/resources.rs index 701d5a937e..b1689c8905 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -35,6 +35,7 @@ use std::sync::{Arc, Mutex}; use tokio; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio_process; pub type ResourceId = u32; // Sometimes referred to RID. @@ -309,7 +310,7 @@ pub fn add_worker(wc: WorkerChannels) -> Resource { pub fn post_message_to_worker( rid: ResourceId, buf: Buf, -) -> futures::sink::Send> { +) -> futures::sink::Send> { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&rid); match maybe_repr { @@ -334,9 +335,10 @@ impl Future for WorkerReceiver { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&self.rid); match maybe_repr { - Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| { - errors::new(errors::ErrorKind::Other, "recv msg error".to_string()) - }), + Some(Repr::Worker(ref mut wc)) => wc + .1 + .poll() + .map_err(|err| errors::new(errors::ErrorKind::Other, err.to_string())), _ => Err(bad_resource()), } } @@ -359,9 +361,10 @@ impl Stream for WorkerReceiverStream { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&self.rid); match maybe_repr { - Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| { - errors::new(errors::ErrorKind::Other, "recv msg error".to_string()) - }), + Some(Repr::Worker(ref mut wc)) => wc + .1 + .poll() + .map_err(|err| errors::new(errors::ErrorKind::Other, err.to_string())), _ => Err(bad_resource()), } } diff --git a/cli/isolate.rs b/cli/worker.rs similarity index 68% rename from cli/isolate.rs rename to cli/worker.rs index 2e6d69d87a..6609c50059 100644 --- a/cli/isolate.rs +++ b/cli/worker.rs @@ -1,17 +1,16 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::cli_behavior::CliBehavior; use crate::compiler::compile_async; use crate::compiler::ModuleMetaData; use crate::errors::DenoError; use crate::errors::RustOrJsError; use crate::isolate_state::IsolateState; -use crate::isolate_state::IsolateStateContainer; use crate::js_errors; use crate::js_errors::JSErrorColor; use crate::msg; use crate::tokio_util; use deno; use deno::deno_mod; -use deno::Behavior; use deno::JSError; use deno::StartupData; use futures::future::Either; @@ -20,23 +19,22 @@ use futures::Future; use std::sync::atomic::Ordering; use std::sync::Arc; -pub trait DenoBehavior: Behavior + IsolateStateContainer + Send {} -impl DenoBehavior for T where T: Behavior + IsolateStateContainer + Send {} - -type CoreIsolate = deno::Isolate; - /// Wraps deno::Isolate to provide source maps, ops for the CLI, and /// high-level module loading -pub struct Isolate { - inner: CoreIsolate, +pub struct Worker { + inner: deno::Isolate, state: Arc, } -impl Isolate { - pub fn new(startup_data: StartupData, behavior: B) -> Isolate { - let state = behavior.state().clone(); +impl Worker { + pub fn new( + _name: String, + startup_data: StartupData, + behavior: CliBehavior, + ) -> Worker { + let state = behavior.state.clone(); Self { - inner: CoreIsolate::new(startup_data, behavior), + inner: deno::Isolate::new(startup_data, behavior), state, } } @@ -196,7 +194,7 @@ impl Isolate { } } -impl Future for Isolate { +impl Future for Worker { type Item = (); type Error = JSError; @@ -255,8 +253,14 @@ mod tests { use super::*; use crate::cli_behavior::CliBehavior; use crate::flags; + use crate::isolate_state::IsolateState; + use crate::resources; + use crate::startup_data; + use crate::tokio_util; + use deno::js_check; use futures::future::lazy; use std::sync::atomic::Ordering; + use std::thread; #[test] fn execute_mod() { @@ -268,15 +272,15 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); + let state = Arc::new(IsolateState::new(flags, rest_argv)); let state_ = state.clone(); tokio_util::run(lazy(move || { let cli = CliBehavior::new(state.clone()); - let mut isolate = Isolate::new(StartupData::None, cli); - if let Err(err) = isolate.execute_mod(&filename, false) { + let mut worker = Worker::new("TEST".to_string(), StartupData::None, cli); + if let Err(err) = worker.execute_mod(&filename, false) { eprintln!("execute_mod err {:?}", err); } - tokio_util::panic_on_error(isolate) + tokio_util::panic_on_error(worker) })); let metrics = &state_.metrics; @@ -291,18 +295,113 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); + let state = Arc::new(IsolateState::new(flags, rest_argv)); let state_ = state.clone(); tokio_util::run(lazy(move || { let cli = CliBehavior::new(state.clone()); - let mut isolate = Isolate::new(StartupData::None, cli); - if let Err(err) = isolate.execute_mod(&filename, false) { + let mut worker = Worker::new("TEST".to_string(), StartupData::None, cli); + if let Err(err) = worker.execute_mod(&filename, false) { eprintln!("execute_mod err {:?}", err); } - tokio_util::panic_on_error(isolate) + tokio_util::panic_on_error(worker) })); let metrics = &state_.metrics; assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); } + + fn create_test_worker() -> Worker { + let state = Arc::new(IsolateState::mock()); + let cli = CliBehavior::new(state.clone()); + let mut worker = + Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), cli); + js_check(worker.execute("denoMain()")); + js_check(worker.execute("workerMain()")); + worker + } + + #[test] + fn test_worker_messages() { + tokio_util::init(|| { + let mut worker = create_test_worker(); + let source = r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + close(); + return; + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); + } + "#; + js_check(worker.execute(source)); + + let resource = worker.state.resource.clone(); + let resource_ = resource.clone(); + + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + js_check(r); + Ok(()) + }) + })); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + + let maybe_msg = resources::get_message_from_worker(resource.rid) + .wait() + .unwrap(); + assert!(maybe_msg.is_some()); + // Check if message received is [1, 2, 3] in json + assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + }) + } + + #[test] + fn removed_from_resource_table_on_close() { + tokio_util::init(|| { + let mut worker = create_test_worker(); + js_check( + worker.execute("onmessage = () => { delete window['onmessage']; }"), + ); + + let resource = worker.state.resource.clone(); + let rid = resource.rid; + + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource.close(); + println!("workers.rs after resource close"); + js_check(r); + Ok(()) + }) + })); + + assert_eq!(resources::get_type(rid), Some("worker".to_string())); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = resources::post_message_to_worker(rid, msg).wait(); + assert!(r.is_ok()); + debug!("rid {:?}", rid); + + // TODO Need a way to get a future for when a resource closes. + // For now, just sleep for a bit. + thread::sleep(std::time::Duration::from_millis(1000)); + assert_eq!(resources::get_type(rid), None); + }) + } } diff --git a/cli/workers.rs b/cli/workers.rs deleted file mode 100644 index 7c39a5d2e8..0000000000 --- a/cli/workers.rs +++ /dev/null @@ -1,284 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::errors::*; -use crate::flags::DenoFlags; -use crate::isolate::{DenoBehavior, Isolate}; -use crate::isolate_state::IsolateState; -use crate::isolate_state::IsolateStateContainer; -use crate::isolate_state::WorkerChannels; -use crate::ops; -use crate::resources; -use deno::deno_buf; -use deno::Behavior; -use deno::Buf; -use deno::JSError; -use deno::Op; -use deno::StartupData; -use futures::sync::mpsc; -use futures::Future; -use futures::Poll; -use std::sync::Arc; - -pub struct UserWorkerBehavior { - pub state: Arc, -} - -impl UserWorkerBehavior { - pub fn new(flags: DenoFlags, argv_rest: Vec) -> Self { - Self { - state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), - } - } -} - -impl IsolateStateContainer for UserWorkerBehavior { - fn state(&self) -> Arc { - self.state.clone() - } -} - -impl IsolateStateContainer for &UserWorkerBehavior { - fn state(&self) -> Arc { - self.state.clone() - } -} - -impl Behavior for UserWorkerBehavior { - fn dispatch( - &mut self, - control: &[u8], - zero_copy: deno_buf, - ) -> (bool, Box) { - ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker) - } -} - -impl WorkerBehavior for UserWorkerBehavior { - fn set_internal_channels(&mut self, worker_channels: WorkerChannels) { - self.state = Arc::new(IsolateState::new( - self.state.flags.clone(), - self.state.argv.clone(), - Some(worker_channels), - true, - )); - } -} - -/// Behavior trait specific to workers -pub trait WorkerBehavior: DenoBehavior { - /// Used to setup internal channels at worker creation. - /// This is intended to be temporary fix. - /// TODO(afinch7) come up with a better solution to set worker channels - fn set_internal_channels(&mut self, worker_channels: WorkerChannels); -} - -/// Rust interface for WebWorkers. -pub struct Worker { - isolate: Isolate, - pub resource: resources::Resource, -} - -impl Worker { - pub fn new(startup_data: StartupData, mut behavior: B) -> Self { - let (worker_in_tx, worker_in_rx) = mpsc::channel::(1); - let (worker_out_tx, worker_out_rx) = mpsc::channel::(1); - - let internal_channels = (worker_out_tx, worker_in_rx); - let external_channels = (worker_in_tx, worker_out_rx); - - behavior.set_internal_channels(internal_channels); - - let isolate = Isolate::new(startup_data, behavior); - - Worker { - isolate, - resource: resources::add_worker(external_channels), - } - } - - pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> { - self.isolate.execute(js_source) - } - - pub fn execute_mod( - &mut self, - js_filename: &str, - is_prefetch: bool, - ) -> Result<(), RustOrJsError> { - self.isolate.execute_mod(js_filename, is_prefetch) - } -} - -impl Future for Worker { - type Item = (); - type Error = JSError; - - fn poll(&mut self) -> Poll<(), JSError> { - self.isolate.poll() - } -} - -/// Method and data used to initalize a worker -pub enum WorkerInit { - Script(String), - Module(String), -} - -pub fn spawn( - startup_data: StartupData, - behavior: B, - worker_debug_name: &str, - init: WorkerInit, -) -> Result, RustOrJsError> { - let state = behavior.state().clone(); - let mut worker = Worker::new(startup_data, behavior); - - worker - .execute(&format!("denoMain('{}')", worker_debug_name)) - .expect("worker workerInit failed"); - - worker - .execute("workerMain()") - .expect("worker workerMain failed"); - - let init_result = match init { - WorkerInit::Script(script) => match worker.execute(&script) { - Ok(_) => Ok(()), - Err(e) => Err(RustOrJsError::Js(e)), - }, - WorkerInit::Module(specifier) => { - let should_prefetch = state.flags.prefetch || state.flags.info; - match state.dir.resolve_module_url(&specifier, ".") { - Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))), - Ok(module_url) => { - worker.execute_mod(&module_url.to_string(), should_prefetch) - } - } - } - }; - - match init_result { - Ok(_) => Ok(worker), - Err(err) => Err(err), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::compiler::CompilerBehavior; - use crate::isolate_state::IsolateState; - use crate::js_errors::JSErrorColor; - use crate::startup_data; - use crate::tokio_util; - use futures::future::lazy; - use std::thread; - - #[test] - fn test_spawn() { - tokio_util::init(|| { - let worker_result = spawn( - startup_data::compiler_isolate_init(), - CompilerBehavior::new( - IsolateState::mock().flags.clone(), - IsolateState::mock().argv.clone(), - ), - "TEST", - WorkerInit::Script( - r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - close(); - return; - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#.into(), - ), - ); - assert!(worker_result.is_ok()); - let worker = worker_result.unwrap(); - let resource = worker.resource.clone(); - let resource_ = resource.clone(); - - tokio::spawn(lazy(move || { - worker.then(move |r| -> Result<(), ()> { - resource_.close(); - debug!("workers.rs after resource close"); - if let Err(err) = r { - panic!("{}", JSErrorColor(&err).to_string()); - } - Ok(()) - }) - })); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - - let r = resources::post_message_to_worker(resource.rid, msg).wait(); - assert!(r.is_ok()); - - let maybe_msg = resources::get_message_from_worker(resource.rid) - .wait() - .unwrap(); - assert!(maybe_msg.is_some()); - // Check if message received is [1, 2, 3] in json - assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg).wait(); - assert!(r.is_ok()); - }) - } - - #[test] - fn removed_from_resource_table_on_close() { - tokio_util::init(|| { - let worker_result = spawn( - startup_data::compiler_isolate_init(), - CompilerBehavior::new( - IsolateState::mock().flags.clone(), - IsolateState::mock().argv.clone(), - ), - "TEST", - WorkerInit::Script("onmessage = () => close();".into()), - ); - assert!(worker_result.is_ok()); - let worker = worker_result.unwrap(); - let resource = worker.resource.clone(); - let resource_ = resource.clone(); - - tokio::spawn(lazy(move || { - worker.then(move |r| -> Result<(), ()> { - resource_.close(); - debug!("workers.rs after resource close"); - if let Err(err) = r { - panic!("{}", JSErrorColor(&err).to_string()); - } - Ok(()) - }) - })); - - assert_eq!( - resources::get_type(resource.rid), - Some("worker".to_string()) - ); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg).wait(); - assert!(r.is_ok()); - println!("rid {:?}", resource.rid); - - // TODO Need a way to get a future for when a resource closes. - // For now, just sleep for a bit. - // resource.close(); - thread::sleep(std::time::Duration::from_millis(1000)); - assert_eq!(resources::get_type(resource.rid), None); - }) - } -} diff --git a/js/compiler.ts b/js/compiler.ts index 6172b614f9..1a85de5f55 100644 --- a/js/compiler.ts +++ b/js/compiler.ts @@ -46,7 +46,6 @@ type SourceMap = string; interface CompilerLookup { specifier: ModuleSpecifier; referrer: ContainingFile; - isWorker: boolean; cmdId: number; } diff --git a/js/workers.ts b/js/workers.ts index 6b768bf087..456449d483 100644 --- a/js/workers.ts +++ b/js/workers.ts @@ -129,10 +129,13 @@ export async function workerMain(): Promise { log("workerMain got null message. quitting."); break; } + if (window["onmessage"]) { const event = { data }; window.onmessage(event); - } else { + } + + if (!window["onmessage"]) { break; } }