remove tokio_util::block_on (#3388)

This PR removes tokio_util::block_on - refactored compiler and file 
fetcher slightly so that we can safely block there - that's because 
only blocking path consist of only synchronous operations.

Additionally I removed excessive use of tokio_util::panic_on_error 
and tokio_util::run_in_task and moved both functions to cli/worker.rs, 
to tests module.

Closes #2960
This commit is contained in:
Bartek Iwańczuk 2019-11-22 18:46:57 +01:00 committed by Ry Dahl
parent 363b968bfc
commit c6bb3d5a10
5 changed files with 116 additions and 143 deletions

View file

@ -473,7 +473,7 @@ impl TsCompiler {
let source_file = self
.file_fetcher
.fetch_source_file(&module_specifier)
.fetch_cached_source_file(&module_specifier)
.expect("Source file not found");
let version_hash = source_code_version_hash(
@ -581,10 +581,9 @@ impl TsCompiler {
script_name: &str,
) -> Option<SourceFile> {
if let Some(module_specifier) = self.try_to_resolve(script_name) {
return match self.file_fetcher.fetch_source_file(&module_specifier) {
Ok(out) => Some(out),
Err(_) => None,
};
return self
.file_fetcher
.fetch_cached_source_file(&module_specifier);
}
None

View file

@ -8,7 +8,6 @@ use crate::http_util;
use crate::http_util::FetchOnceResult;
use crate::msg;
use crate::progress::Progress;
use crate::tokio_util;
use deno::ErrBox;
use deno::ModuleSpecifier;
use futures::future::Either;
@ -111,12 +110,25 @@ impl SourceFileFetcher {
Ok(())
}
/// Required for TS compiler.
pub fn fetch_source_file(
/// Required for TS compiler and source maps.
pub fn fetch_cached_source_file(
self: &Self,
specifier: &ModuleSpecifier,
) -> Result<SourceFile, ErrBox> {
tokio_util::block_on(self.fetch_source_file_async(specifier))
) -> Option<SourceFile> {
let maybe_source_file = self.source_file_cache.get(specifier.to_string());
if maybe_source_file.is_some() {
return maybe_source_file;
}
// If file is not in memory cache check if it can be found
// in local cache - which effectively means trying to fetch
// using "--no-fetch" flag. We can safely block on this
// future, because it doesn't do any asynchronous action
// it that path.
let fut = self.get_source_file_async(specifier.as_url(), true, true);
futures::executor::block_on(fut).ok()
}
pub fn fetch_source_file_async(
@ -663,6 +675,7 @@ impl SourceCodeHeaders {
mod tests {
use super::*;
use crate::fs as deno_fs;
use crate::tokio_util;
use tempfile::TempDir;
fn setup_file_fetcher(dir_path: &Path) -> SourceFileFetcher {
@ -987,45 +1000,45 @@ mod tests {
fn test_get_source_code_multiple_downloads_of_same_file() {
let http_server_guard = crate::test_util::http_server();
let (_temp_dir, fetcher) = test_setup();
// http_util::fetch_sync_string requires tokio
tokio_util::init(|| {
let specifier = ModuleSpecifier::resolve_url(
"http://localhost:4545/tests/subdir/mismatch_ext.ts",
)
.unwrap();
let headers_file_name = fetcher.deps_cache.location.join(
fetcher.deps_cache.get_cache_filename_with_extension(
specifier.as_url(),
"headers.json",
),
);
let specifier = ModuleSpecifier::resolve_url(
"http://localhost:4545/tests/subdir/mismatch_ext.ts",
)
.unwrap();
let headers_file_name = fetcher.deps_cache.location.join(
fetcher
.deps_cache
.get_cache_filename_with_extension(specifier.as_url(), "headers.json"),
);
// first download
let result = fetcher.fetch_source_file(&specifier);
assert!(result.is_ok());
// first download
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
futures::future::ok(())
}));
let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata = headers_file.metadata().unwrap();
let headers_file_modified = headers_file_metadata.modified().unwrap();
let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata = headers_file.metadata().unwrap();
let headers_file_modified = headers_file_metadata.modified().unwrap();
// download file again, it should use already fetched file even though `use_disk_cache` is set to
// false, this can be verified using source header file creation timestamp (should be
// the same as after first download)
let result = fetcher.fetch_source_file(&specifier);
assert!(result.is_ok());
// download file again, it should use already fetched file even though `use_disk_cache` is set to
// false, this can be verified using source header file creation timestamp (should be
// the same as after first download)
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
futures::future::ok(())
}));
let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file_2 = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata_2 = headers_file_2.metadata().unwrap();
let headers_file_modified_2 = headers_file_metadata_2.modified().unwrap();
let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file_2 = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata_2 = headers_file_2.metadata().unwrap();
let headers_file_modified_2 = headers_file_metadata_2.modified().unwrap();
assert_eq!(headers_file_modified, headers_file_modified_2);
});
assert_eq!(headers_file_modified, headers_file_modified_2);
drop(http_server_guard);
}
@ -1427,21 +1440,23 @@ mod tests {
fn test_fetch_source_file() {
let (_temp_dir, fetcher) = test_setup();
tokio_util::init(|| {
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
let r = fetcher.fetch_source_file(&specifier);
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_err());
futures::future::ok(())
}));
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
let r = fetcher.fetch_source_file(&specifier);
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
})
futures::future::ok(())
}));
}
#[test]
@ -1449,21 +1464,23 @@ mod tests {
/*recompile ts file*/
let (_temp_dir, fetcher) = test_setup();
tokio_util::init(|| {
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
let r = fetcher.fetch_source_file(&specifier);
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_err());
futures::future::ok(())
}));
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
let r = fetcher.fetch_source_file(&specifier);
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
})
futures::future::ok(())
}));
}
#[test]

View file

@ -8,7 +8,7 @@
//! only need to be able to start and cancel a single timer (or Delay, as Tokio
//! calls it) for an entire Isolate. This is what is implemented here.
use crate::tokio_util::panic_on_error;
use crate::futures::TryFutureExt;
use futures::channel::oneshot;
use futures::future::FutureExt;
use std::future::Future;
@ -43,9 +43,10 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);
let delay =
panic_on_error(futures::compat::Compat01As03::new(Delay::new(deadline)));
let rx = panic_on_error(rx);
let delay = futures::compat::Compat01As03::new(Delay::new(deadline))
.map_err(|err| panic!("Unexpected error in timeout {:?}", err));
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));
futures::future::select(delay, rx).then(|_| futures::future::ok(()))
}

View file

@ -1,5 +1,4 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use deno::ErrBox;
use futures;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
@ -29,66 +28,3 @@ where
{
tokio::runtime::current_thread::run(future.boxed().compat());
}
/// THIS IS A HACK AND SHOULD BE AVOIDED.
///
/// This spawns a new thread and creates a single-threaded tokio runtime on that thread,
/// to execute the given future.
///
/// This is useful when we want to block the main runtime to
/// resolve a future without worrying that we'll use up all the threads in the
/// main runtime.
pub fn block_on<F, R>(future: F) -> Result<R, ErrBox>
where
F: Send + 'static + Future<Output = Result<R, ErrBox>> + Unpin,
R: Send + 'static,
{
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Create a new runtime to evaluate the future asynchronously.
thread::spawn(move || {
let r = tokio::runtime::current_thread::block_on_all(future.compat());
sender
.send(r)
.expect("Unable to send blocking future result")
});
receiver
.recv()
.expect("Unable to receive blocking future result")
}
// Set the default executor so we can use tokio::spawn(). It's difficult to
// pass around mut references to the runtime, so using with_default is
// preferable. Ideally Tokio would provide this function.
#[cfg(test)]
pub fn init<F>(f: F)
where
F: FnOnce(),
{
let rt = create_threadpool_runtime().expect("Unable to create Tokio runtime");
let mut executor = rt.executor();
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
}
pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Output = Result<I, ()>>
where
F: Future<Output = Result<I, E>>,
E: std::fmt::Debug,
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}
#[cfg(test)]
pub fn run_in_task<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let fut = futures::future::lazy(move |_cx| {
f();
Ok(())
});
run(fut)
}

View file

@ -209,6 +209,26 @@ mod tests {
use futures::executor::block_on;
use std::sync::atomic::Ordering;
pub fn run_in_task<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let fut = futures::future::lazy(move |_cx| {
f();
Ok(())
});
tokio_util::run(fut)
}
pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Output = Result<I, ()>>
where
F: Future<Output = Result<I, E>>,
E: std::fmt::Debug,
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}
#[test]
fn execute_mod_esm_imports_a() {
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
@ -243,7 +263,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
tokio_util::panic_on_error(worker).await
panic_on_error(worker).await
});
let metrics = &state_.metrics;
@ -283,7 +303,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
tokio_util::panic_on_error(worker).await
panic_on_error(worker).await
});
let metrics = &state_.metrics;
@ -333,7 +353,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
tokio_util::panic_on_error(worker).await
panic_on_error(worker).await
});
assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
@ -364,7 +384,7 @@ mod tests {
#[test]
fn test_worker_messages() {
tokio_util::run_in_task(|| {
run_in_task(|| {
let mut worker = create_test_worker();
let source = r#"
onmessage = function(e) {
@ -412,7 +432,7 @@ mod tests {
#[test]
fn removed_from_resource_table_on_close() {
tokio_util::run_in_task(|| {
run_in_task(|| {
let mut worker = create_test_worker();
worker
.execute("onmessage = () => { delete window.onmessage; }")
@ -444,7 +464,7 @@ mod tests {
#[test]
fn execute_mod_resolve_error() {
tokio_util::run_in_task(|| {
run_in_task(|| {
// "foo" is not a valid module specifier so this should return an error.
let mut worker = create_test_worker();
let module_specifier =
@ -457,7 +477,7 @@ mod tests {
#[test]
fn execute_mod_002_hello() {
tokio_util::run_in_task(|| {
run_in_task(|| {
// This assumes cwd is project root (an assumption made throughout the
// tests).
let mut worker = create_test_worker();