add compile_async (#2053)

This commit is contained in:
andy finch 2019-04-05 00:04:06 -04:00 committed by Ryan Dahl
parent 744e56cb58
commit 07f0d077c7
2 changed files with 85 additions and 125 deletions

View file

@ -1,13 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use core::ops::Deref;
use crate::flags::DenoFlags;
use crate::isolate_state::*;
use crate::js_errors;
use crate::js_errors::JSErrorColor;
use crate::msg;
use crate::ops;
use crate::resources;
use crate::resources::ResourceId;
use crate::startup_data;
use crate::tokio_util;
use crate::workers;
use crate::workers::WorkerBehavior;
use crate::workers::WorkerInit;
@ -30,30 +31,15 @@ use std::sync::Arc;
use std::sync::Mutex;
use tokio::runtime::Runtime;
/// Used for normalization of types on internal future completions
type CompilerInnerResult = Result<ModuleMetaData, Option<JSError>>;
type WorkerErrReceiver = oneshot::Receiver<CompilerInnerResult>;
type CmdId = u32;
type ResponseSenderTable = HashMap<CmdId, oneshot::Sender<Buf>>;
/// Shared resources for used to complete compiler operations.
/// rid is the resource id for compiler worker resource used for sending it
/// compile requests
/// worker_err_receiver is a shared future that will compelete when the
/// compiler worker future completes, and send back an error if present
/// or a None if not
#[derive(Clone)]
struct CompilerShared {
pub rid: ResourceId,
pub worker_err_receiver: Shared<WorkerErrReceiver>,
}
lazy_static! {
static ref C_NEXT_CMD_ID: AtomicUsize = AtomicUsize::new(1);
// Map of response senders
static ref C_RES_SENDER_TABLE: Mutex<ResponseSenderTable> = Mutex::new(ResponseSenderTable::new());
// Shared worker resources so we can spawn
static ref C_SHARED: Mutex<Option<CompilerShared>> = Mutex::new(None);
static ref C_RID: Mutex<Option<ResourceId>> = Mutex::new(None);
// tokio runtime specifically for spawning logic that is dependent on
// completetion of the compiler worker future
static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
@ -157,8 +143,8 @@ fn parse_cmd_id(res_json: &str) -> CmdId {
}
}
fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
let mut cell = C_SHARED.lock().unwrap();
fn lazy_start(parent_state: Arc<IsolateState>) -> ResourceId {
let mut cell = C_RID.lock().unwrap();
cell
.get_or_insert_with(|| {
let worker_result = workers::spawn(
@ -172,10 +158,6 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
match worker_result {
Ok(worker) => {
let rid = worker.resource.rid;
// create oneshot channels and use the sender to pass back
// results from worker future
let (err_sender, err_receiver) =
oneshot::channel::<CompilerInnerResult>();
let mut runtime = C_RUNTIME.lock().unwrap();
runtime.spawn(lazy(move || {
let resource = worker.resource.clone();
@ -183,11 +165,11 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
// Close resource so the future created by
// handle_worker_message_stream exits
resource.close();
match result {
Err(err) => err_sender.send(Err(Some(err))).unwrap(),
_ => err_sender.send(Err(None)).unwrap(),
};
Ok(())
debug!("Compiler worker exited!");
if let Err(e) = result {
eprintln!("{}", JSErrorColor(&e).to_string());
}
std::process::exit(1);
})
}));
runtime.spawn(lazy(move || {
@ -212,10 +194,7 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
Ok(())
}).map_err(|_| ())
}));
CompilerShared {
rid,
worker_err_receiver: err_receiver.shared(),
}
rid
}
Err(err) => {
println!("{}", err.to_string());
@ -241,23 +220,22 @@ fn req(
.into_boxed_bytes()
}
pub fn compile_sync(
pub fn compile_async(
parent_state: Arc<IsolateState>,
specifier: &str,
referrer: &str,
module_meta_data: &ModuleMetaData,
) -> Result<ModuleMetaData, JSError> {
) -> impl Future<Item = ModuleMetaData, Error = JSError> {
debug!(
"Running rust part of compile_sync. specifier: {}, referrer: {}",
&specifier, &referrer
);
let cmd_id = new_cmd_id();
let req_msg = req(specifier, referrer, parent_state.is_worker, cmd_id);
let req_msg = req(&specifier, &referrer, parent_state.is_worker, cmd_id);
let module_meta_data_ = module_meta_data.clone();
let shared = lazy_start(parent_state.clone());
let compiler_rid = shared.rid;
let compiler_rid = lazy_start(parent_state.clone());
let (local_sender, local_receiver) =
oneshot::channel::<Result<ModuleMetaData, Option<JSError>>>();
@ -317,58 +295,31 @@ pub fn compile_sync(
}));
}
let worker_receiver = shared.worker_err_receiver.clone();
local_receiver
.map_err(|e| {
panic!(
"Local channel canceled before compile request could be completed: {}",
e
)
}).and_then(move |result| match result {
Ok(v) => futures::future::result(Ok(v)),
Err(Some(err)) => futures::future::result(Err(err)),
Err(None) => panic!("Failed to communicate with the compiler worker."),
})
}
let union =
futures::future::select_all(vec![worker_receiver, local_receiver.shared()]);
match union.wait() {
Ok((result, i, rest)) => {
// We got a sucessful finish before any recivers where canceled
let mut rest_mut = rest;
match ((*result.deref()).clone(), i) {
// Either receiver was completed with success.
(Ok(v), _) => Ok(v),
// Either receiver was completed with a valid error
// this should be fatal for now since it is not intended
// to be possible to recover from a uncaught error in a isolate
(Err(Some(err)), _) => Err(err),
// local_receiver finished first with a none error. This is intended
// to catch when the local logic can't complete because it is unable
// to send and/or receive messages from the compiler worker.
// Due to the way that scheduling works it is very likely that the
// compiler worker future has already or will in the near future
// complete with a valid JSError or a None.
(Err(None), 1) => {
debug!("Compiler local exited with None error!");
// While technically possible to get stuck here indefinately
// in theory it is highly unlikely.
debug!(
"Waiting on compiler worker result specifier: {} referrer: {}!",
specifier, referrer
);
let worker_result =
(*rest_mut.remove(0).wait().unwrap().deref()).clone();
debug!(
"Finished waiting on worker result specifier: {} referrer: {}!",
specifier, referrer
);
match worker_result {
Err(Some(err)) => Err(err),
Err(None) => panic!("Compiler exit for an unknown reason!"),
Ok(v) => Ok(v),
}
}
// While possible beccause the compiler worker can exit without error
// this shouldn't occurr normally and I don't intend to attempt to
// handle it right now
(_, i) => panic!("Odd compiler result for future {}!", i),
}
}
// This should always a result of a reciver being cancled
// in theory but why not give a print out just in case
Err((err, i, _)) => panic!("compile_sync {} failed: {}", i, err),
}
pub fn compile_sync(
parent_state: Arc<IsolateState>,
specifier: &str,
referrer: &str,
module_meta_data: &ModuleMetaData,
) -> Result<ModuleMetaData, JSError> {
tokio_util::block_on(compile_async(
parent_state,
specifier,
referrer,
module_meta_data,
))
}
#[cfg(test)]
@ -377,33 +328,38 @@ mod tests {
#[test]
fn test_compile_sync() {
let cwd = std::env::current_dir().unwrap();
let cwd_string = cwd.to_str().unwrap().to_owned();
tokio_util::init(|| {
let cwd = std::env::current_dir().unwrap();
let cwd_string = cwd.to_str().unwrap().to_owned();
let specifier = "./tests/002_hello.ts";
let referrer = cwd_string + "/";
let specifier = "./tests/002_hello.ts";
let referrer = cwd_string + "/";
let mut out = ModuleMetaData {
module_name: "xxx".to_owned(),
module_redirect_source_name: None,
filename: "/tests/002_hello.ts".to_owned(),
media_type: msg::MediaType::TypeScript,
source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
maybe_output_code_filename: None,
maybe_output_code: None,
maybe_source_map_filename: None,
maybe_source_map: None,
};
let mut out = ModuleMetaData {
module_name: "xxx".to_owned(),
module_redirect_source_name: None,
filename: "/tests/002_hello.ts".to_owned(),
media_type: msg::MediaType::TypeScript,
source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
maybe_output_code_filename: None,
maybe_output_code: None,
maybe_source_map_filename: None,
maybe_source_map: None,
};
out =
compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out)
.unwrap();
assert!(
out
.maybe_output_code
.unwrap()
.starts_with("console.log(\"Hello World\");".as_bytes())
);
out = compile_sync(
Arc::new(IsolateState::mock()),
specifier,
&referrer,
&out,
).unwrap();
assert!(
out
.maybe_output_code
.unwrap()
.starts_with("console.log(\"Hello World\");".as_bytes())
);
})
}
#[test]

View file

@ -1,5 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::compiler::compile_sync;
use crate::compiler::compile_async;
use crate::compiler::ModuleMetaData;
use crate::errors::DenoError;
use crate::errors::RustOrJsError;
@ -13,6 +13,7 @@ use deno;
use deno::deno_mod;
use deno::Behavior;
use deno::JSError;
use futures::future::Either;
use futures::Async;
use futures::Future;
use std::sync::atomic::Ordering;
@ -215,23 +216,26 @@ fn fetch_module_meta_data_and_maybe_compile_async(
state
.dir
.fetch_module_meta_data_async(&specifier, &referrer, use_cache)
.and_then(move |mut out| {
.and_then(move |out| {
if out.media_type == msg::MediaType::TypeScript
&& !out.has_output_code_and_source_map()
{
debug!(">>>>> compile_sync START");
out = match compile_sync(state_.clone(), &specifier, &referrer, &out) {
Ok(v) => v,
Err(e) => {
debug!("compiler error exiting!");
eprintln!("{}", JSErrorColor(&e).to_string());
std::process::exit(1);
}
};
debug!(">>>>> compile_sync END");
state_.dir.code_cache(&out)?;
Either::A(
compile_async(state_.clone(), &specifier, &referrer, &out)
.map_err(|e| {
debug!("compiler error exiting!");
eprintln!("{}", JSErrorColor(&e).to_string());
std::process::exit(1);
}).and_then(move |out| {
debug!(">>>>> compile_sync END");
state_.dir.code_cache(&out)?;
Ok(out)
}),
)
} else {
Either::B(futures::future::ok(out))
}
Ok(out)
})
}