Use async at places, use &self instead of self: &Self (#3594)

This commit is contained in:
Gurwinder Singh 2020-01-04 15:50:52 +05:30 committed by Ry Dahl
parent 70b1be6ff4
commit 9f6bab6010
14 changed files with 160 additions and 197 deletions

View file

@ -10,7 +10,7 @@ pub struct JsCompiler {}
impl JsCompiler {
pub fn compile_async(
self: &Self,
&self,
source_file: &SourceFile,
) -> Pin<Box<CompiledModuleFuture>> {
let module = CompiledModule {

View file

@ -15,7 +15,7 @@ pub struct JsonCompiler {}
impl JsonCompiler {
pub fn compile_async(
self: &Self,
&self,
source_file: &SourceFile,
) -> Pin<Box<CompiledModuleFuture>> {
let maybe_json_value: serde_json::Result<serde_json::Value> =

View file

@ -142,7 +142,7 @@ impl CompiledFileMetadata {
None
}
pub fn to_json_string(self: &Self) -> Result<String, serde_json::Error> {
pub fn to_json_string(&self) -> Result<String, serde_json::Error> {
let mut value_map = serde_json::map::Map::new();
value_map.insert(SOURCE_PATH.to_owned(), json!(&self.source_path));
@ -246,7 +246,7 @@ impl TsCompiler {
}
pub fn bundle_async(
self: &Self,
&self,
global_state: ThreadSafeGlobalState,
module_name: String,
out_file: Option<String>,
@ -305,7 +305,7 @@ impl TsCompiler {
///
/// If compilation is required then new V8 worker is spawned with fresh TS compiler.
pub fn compile_async(
self: &Self,
&self,
global_state: ThreadSafeGlobalState,
source_file: &SourceFile,
) -> Pin<Box<CompiledModuleFuture>> {
@ -389,7 +389,7 @@ impl TsCompiler {
}
/// Get associated `CompiledFileMetadata` for given module if it exists.
pub fn get_metadata(self: &Self, url: &Url) -> Option<CompiledFileMetadata> {
pub fn get_metadata(&self, url: &Url) -> Option<CompiledFileMetadata> {
// Try to load cached version:
// 1. check if there's 'meta' file
let cache_key = self
@ -409,7 +409,7 @@ impl TsCompiler {
}
pub fn get_compiled_module(
self: &Self,
&self,
module_url: &Url,
) -> Result<CompiledModule, ErrBox> {
let compiled_source_file = self.get_compiled_source_file(module_url)?;
@ -428,7 +428,7 @@ impl TsCompiler {
// TODO: ideally we shouldn't construct SourceFile by hand, but it should be delegated to
// SourceFileFetcher
pub fn get_compiled_source_file(
self: &Self,
&self,
module_url: &Url,
) -> Result<SourceFile, ErrBox> {
let cache_key = self
@ -453,7 +453,7 @@ impl TsCompiler {
/// Along compiled file a special metadata file is saved as well containing
/// hash that can be validated to avoid unnecessary recompilation.
fn cache_compiled_file(
self: &Self,
&self,
module_specifier: &ModuleSpecifier,
contents: &str,
) -> std::io::Result<()> {
@ -495,7 +495,7 @@ impl TsCompiler {
// TODO: ideally we shouldn't construct SourceFile by hand, but it should be delegated to
// SourceFileFetcher
pub fn get_source_map_file(
self: &Self,
&self,
module_specifier: &ModuleSpecifier,
) -> Result<SourceFile, ErrBox> {
let cache_key = self
@ -517,7 +517,7 @@ impl TsCompiler {
/// Save source map file for given TS module to on-disk cache.
fn cache_source_map(
self: &Self,
&self,
module_specifier: &ModuleSpecifier,
contents: &str,
) -> std::io::Result<()> {
@ -529,7 +529,7 @@ impl TsCompiler {
/// This method is called by TS compiler via an "op".
pub fn cache_compiler_output(
self: &Self,
&self,
module_specifier: &ModuleSpecifier,
extension: &str,
contents: &str,
@ -564,7 +564,7 @@ impl SourceMapGetter for TsCompiler {
// `SourceMapGetter` related methods
impl TsCompiler {
fn try_to_resolve(self: &Self, script_name: &str) -> Option<ModuleSpecifier> {
fn try_to_resolve(&self, script_name: &str) -> Option<ModuleSpecifier> {
// if `script_name` can't be resolved to ModuleSpecifier it's probably internal
// script (like `gen/cli/bundle/compiler.js`) so we won't be
// able to get source for it anyway

View file

@ -6,9 +6,7 @@ use crate::global_state::ThreadSafeGlobalState;
use crate::startup_data;
use crate::state::*;
use crate::worker::Worker;
use deno::Buf;
use futures::FutureExt;
use futures::TryFutureExt;
use serde_derive::Deserialize;
use serde_json;
use std::collections::HashMap;
@ -69,7 +67,7 @@ impl WasmCompiler {
}
pub fn compile_async(
self: &Self,
&self,
global_state: ThreadSafeGlobalState,
source_file: &SourceFile,
) -> Pin<Box<CompiledModuleFuture>> {
@ -86,47 +84,45 @@ impl WasmCompiler {
let worker_ = worker.clone();
let url = source_file.url.clone();
let fut = worker
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.then(|_| worker)
.then(move |result| {
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
eprintln!("{}", err.to_string());
std::process::exit(1);
}
debug!("Sent message to worker");
worker_.get_message()
})
.map_err(|_| panic!("not handled"))
.and_then(move |maybe_msg: Option<Buf>| {
debug!("Received message from worker");
let json_msg = maybe_msg.unwrap();
let module_info: WasmModuleInfo =
serde_json::from_slice(&json_msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
let code = wrap_wasm_code(
&base64_data,
&module_info.import_list,
&module_info.export_list,
);
debug!("Generated code: {}", &code);
let module = CompiledModule {
code,
name: url.to_string(),
};
{
cache_.lock().unwrap().insert(url.clone(), module.clone());
}
debug!("<<<<< wasm_compile_async END");
futures::future::ok(module)
});
fut.boxed()
Box::pin(async move {
let _ = worker
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.await;
if let Err(err) = worker.await {
// TODO(ry) Need to forward the error instead of exiting.
eprintln!("{}", err.to_string());
std::process::exit(1);
}
debug!("Sent message to worker");
let maybe_msg = worker_.get_message().await.expect("not handled");
debug!("Received message from worker");
let json_msg = maybe_msg.unwrap();
let module_info: WasmModuleInfo =
serde_json::from_slice(&json_msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
let code = wrap_wasm_code(
&base64_data,
&module_info.import_list,
&module_info.export_list,
);
debug!("Generated code: {}", &code);
let module = CompiledModule {
code,
name: url.to_string(),
};
{
cache_.lock().unwrap().insert(url.clone(), module.clone());
}
debug!("<<<<< wasm_compile_async END");
Ok(module)
})
}
}

View file

@ -21,7 +21,7 @@ impl DiskCache {
}
}
pub fn get_cache_filename(self: &Self, url: &Url) -> PathBuf {
pub fn get_cache_filename(&self, url: &Url) -> PathBuf {
let mut out = PathBuf::new();
let scheme = url.scheme();
@ -83,7 +83,7 @@ impl DiskCache {
}
pub fn get_cache_filename_with_extension(
self: &Self,
&self,
url: &Url,
extension: &str,
) -> PathBuf {
@ -99,12 +99,12 @@ impl DiskCache {
}
}
pub fn get(self: &Self, filename: &Path) -> std::io::Result<Vec<u8>> {
pub fn get(&self, filename: &Path) -> std::io::Result<Vec<u8>> {
let path = self.location.join(filename);
fs::read(&path)
}
pub fn set(self: &Self, filename: &Path, data: &[u8]) -> std::io::Result<()> {
pub fn set(&self, filename: &Path, data: &[u8]) -> std::io::Result<()> {
let path = self.location.join(filename);
match path.parent() {
Some(ref parent) => fs::create_dir_all(parent),
@ -113,7 +113,7 @@ impl DiskCache {
deno_fs::write_file(&path, data, 0o666)
}
pub fn remove(self: &Self, filename: &Path) -> std::io::Result<()> {
pub fn remove(&self, filename: &Path) -> std::io::Result<()> {
let path = self.location.join(filename);
fs::remove_file(path)
}

View file

@ -12,7 +12,6 @@ use deno::ErrBox;
use deno::ModuleSpecifier;
use futures::future::Either;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use serde_json;
use std;
use std::collections::HashMap;
@ -115,7 +114,7 @@ impl SourceFileFetcher {
/// Required for TS compiler and source maps.
pub fn fetch_cached_source_file(
self: &Self,
&self,
specifier: &ModuleSpecifier,
) -> Option<SourceFile> {
let maybe_source_file = self.source_file_cache.get(specifier.to_string());
@ -211,7 +210,7 @@ impl SourceFileFetcher {
/// If `cached_only` is true then this method will fail for remote files
/// not already cached.
fn get_source_file_async(
self: &Self,
&self,
module_url: &Url,
use_disk_cache: bool,
no_remote: bool,
@ -261,10 +260,7 @@ impl SourceFileFetcher {
}
/// Fetch local source file.
fn fetch_local_file(
self: &Self,
module_url: &Url,
) -> Result<SourceFile, ErrBox> {
fn fetch_local_file(&self, module_url: &Url) -> Result<SourceFile, ErrBox> {
let filepath = module_url.to_file_path().map_err(|()| {
ErrBox::from(DenoError::new(
ErrorKind::InvalidPath,
@ -299,7 +295,7 @@ impl SourceFileFetcher {
/// that user provides, and the final module_name is the resolved path
/// after following all redirections.
fn fetch_cached_remote_source(
self: &Self,
&self,
module_url: &Url,
) -> Result<Option<SourceFile>, ErrBox> {
let source_code_headers = self.get_source_code_headers(&module_url);
@ -351,7 +347,7 @@ impl SourceFileFetcher {
/// Asynchronously fetch remote source file specified by the URL following redirects.
fn fetch_remote_source_async(
self: &Self,
&self,
module_url: &Url,
use_disk_cache: bool,
cached_only: bool,
@ -399,8 +395,8 @@ impl SourceFileFetcher {
let module_url = module_url.clone();
// Single pass fetch, either yields code or yields redirect.
let f = http_util::fetch_string_once(&module_url).and_then(move |r| {
match r {
let f = async move {
match http_util::fetch_string_once(&module_url).await? {
FetchOnceResult::Redirect(new_module_url) => {
// If redirects, update module_name and filename for next looped call.
dir
@ -415,12 +411,14 @@ impl SourceFileFetcher {
drop(download_job);
// Recurse
Either::Left(dir.fetch_remote_source_async(
&new_module_url,
use_disk_cache,
cached_only,
redirect_limit - 1,
))
dir
.fetch_remote_source_async(
&new_module_url,
use_disk_cache,
cached_only,
redirect_limit - 1,
)
.await
}
FetchOnceResult::Code(source, maybe_content_type) => {
// We land on the code.
@ -454,10 +452,10 @@ impl SourceFileFetcher {
// Explicit drop to keep reference alive until future completes.
drop(download_job);
Either::Right(futures::future::ok(source_file))
Ok(source_file)
}
}
});
};
f.boxed()
}
@ -467,7 +465,7 @@ impl SourceFileFetcher {
/// NOTE: chances are that the source file was downloaded due to redirects.
/// In this case, the headers file provides info about where we should go and get
/// the file that redirect eventually points to.
fn get_source_code_headers(self: &Self, url: &Url) -> SourceCodeHeaders {
fn get_source_code_headers(&self, url: &Url) -> SourceCodeHeaders {
let cache_key = self
.deps_cache
.get_cache_filename_with_extension(url, "headers.json");
@ -482,11 +480,7 @@ impl SourceFileFetcher {
}
/// Save contents of downloaded remote file in on-disk cache for subsequent access.
fn save_source_code(
self: &Self,
url: &Url,
source: &str,
) -> std::io::Result<()> {
fn save_source_code(&self, url: &Url, source: &str) -> std::io::Result<()> {
let cache_key = self.deps_cache.get_cache_filename(url);
// May not exist. DON'T unwrap.
@ -503,7 +497,7 @@ impl SourceFileFetcher {
///
/// If nothing needs to be saved, the headers file is not created.
fn save_source_code_headers(
self: &Self,
&self,
url: &Url,
mime_type: Option<String>,
redirect_to: Option<String>,
@ -667,7 +661,7 @@ impl SourceCodeHeaders {
// TODO: remove this nonsense `cache_filename` param, this should be
// done when instantiating SourceCodeHeaders
pub fn to_json_string(
self: &Self,
&self,
cache_filename: &Path,
) -> Result<Option<String>, serde_json::Error> {
// TODO(kevinkassimo): consider introduce serde::Deserialize to make things simpler.

View file

@ -205,7 +205,7 @@ impl ThreadSafeGlobalState {
}
pub fn check_dyn_import(
self: &Self,
&self,
module_specifier: &ModuleSpecifier,
) -> Result<(), ErrBox> {
let u = module_specifier.as_url();

View file

@ -1,8 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::futures::future::try_join_all;
use crate::futures::future::FutureExt;
use crate::futures::future::TryFutureExt;
use crate::msg;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
@ -87,52 +85,39 @@ fn op_fetch_source_files(
let global_state = state.global_state.clone();
let future = try_join_all(futures)
.map_err(ErrBox::from)
.and_then(move |files| {
// We want to get an array of futures that resolves to
let v: Vec<_> = files
.into_iter()
.map(|file| {
// Special handling of Wasm files:
// compile them into JS first!
// This allows TS to do correct export types.
if file.media_type == msg::MediaType::Wasm {
return futures::future::Either::Left(
global_state
.wasm_compiler
.compile_async(global_state.clone(), &file)
.and_then(|compiled_mod| {
futures::future::ok((file, Some(compiled_mod.code)))
}),
);
}
futures::future::Either::Right(futures::future::ok((file, None)))
})
.collect();
try_join_all(v)
})
.and_then(move |files_with_code| {
let res = files_with_code
.into_iter()
.map(|(file, maybe_code)| {
json!({
"url": file.url.to_string(),
"filename": file.filename.to_str().unwrap(),
"mediaType": file.media_type as i32,
"sourceCode": if let Some(code) = maybe_code {
code
} else {
String::from_utf8(file.source_code).unwrap()
},
})
})
.collect();
let future = Box::pin(async move {
let files = try_join_all(futures).await?;
futures::future::ok(res)
// We want to get an array of futures that resolves to
let v = files.into_iter().map(|file| {
async {
// Special handling of Wasm files:
// compile them into JS first!
// This allows TS to do correct export types.
let source_code = match file.media_type {
msg::MediaType::Wasm => {
global_state
.wasm_compiler
.compile_async(global_state.clone(), &file)
.await?
.code
}
_ => String::from_utf8(file.source_code).unwrap(),
};
Ok::<_, ErrBox>(json!({
"url": file.url.to_string(),
"filename": file.filename.to_str().unwrap(),
"mediaType": file.media_type as i32,
"sourceCode": source_code,
}))
}
});
Ok(JsonOp::Async(future.boxed()))
let v = try_join_all(v).await?;
Ok(v.into())
});
Ok(JsonOp::Async(future))
}
#[derive(Deserialize)]

View file

@ -138,21 +138,23 @@ where
let min_op = d(rid, zero_copy);
// Convert to CoreOp
let fut = Box::new(min_op.then(move |result| match result {
Ok(r) => {
record.result = r;
futures::future::ok(record.into())
let fut = async move {
match min_op.await {
Ok(r) => {
record.result = r;
Ok(record.into())
}
Err(err) => {
let error_record = ErrorRecord {
promise_id: record.promise_id,
arg: -1,
error_code: err.kind() as i32,
error_message: err.to_string().as_bytes().to_owned(),
};
Ok(error_record.into())
}
}
Err(err) => {
let error_record = ErrorRecord {
promise_id: record.promise_id,
arg: -1,
error_code: err.kind() as i32,
error_message: err.to_string().as_bytes().to_owned(),
};
futures::future::ok(error_record.into())
}
}));
};
if is_sync {
// Warning! Possible deadlocks can occur if we try to wait for a future

View file

@ -8,7 +8,6 @@ use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use std;
use std::convert::From;
use std::future::Future;
@ -39,18 +38,18 @@ pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
Accept {
accept_state: AcceptState::Pending,
rid,
state: state.clone(),
state,
}
}
/// A future representing state of accepting a TCP connection.
pub struct Accept {
pub struct Accept<'a> {
accept_state: AcceptState,
rid: ResourceId,
state: ThreadSafeState,
state: &'a ThreadSafeState,
}
impl Future for Accept {
impl Future for Accept<'_> {
type Output = Result<(TcpStream, SocketAddr), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@ -109,29 +108,19 @@ fn op_accept(
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
let op = accept(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = match tcp_stream.local_addr() {
Ok(v) => v,
Err(e) => return futures::future::err(ErrBox::from(e)),
};
let remote_addr = match tcp_stream.peer_addr() {
Ok(v) => v,
Err(e) => return futures::future::err(ErrBox::from(e)),
};
let mut table = state_.lock_resource_table();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
futures::future::ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
.and_then(move |(rid, local_addr, remote_addr)| {
futures::future::ok(json!({
"rid": rid,
"localAddr": local_addr.to_string(),
"remoteAddr": remote_addr.to_string(),
}))
});
let op = async move {
let (tcp_stream, _socket_addr) = accept(&state_, rid).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok(json!({
"rid": rid,
"localAddr": local_addr.to_string(),
"remoteAddr": remote_addr.to_string(),
}))
};
Ok(JsonOp::Async(op.boxed()))
}

View file

@ -220,7 +220,8 @@ fn op_run_status(
state: state.clone(),
};
let future = future.and_then(move |run_status| {
let future = async move {
let run_status = future.await?;
let code = run_status.code();
#[cfg(unix)]
@ -233,12 +234,12 @@ fn op_run_status(
.expect("Should have either an exit code or a signal.");
let got_signal = signal.is_some();
futures::future::ok(json!({
Ok(json!({
"gotSignal": got_signal,
"exitCode": code.unwrap_or(-1),
"exitSignal": signal.unwrap_or(-1),
}))
});
};
let pool = futures::executor::ThreadPool::new().unwrap();
let handle = pool.spawn_with_handle(future).unwrap();

View file

@ -77,13 +77,11 @@ fn op_worker_get_message(
state: state.clone(),
};
let op = op.then(move |maybe_buf| {
let op = async move {
let maybe_buf = op.await;
debug!("op_worker_get_message");
futures::future::ok(json!({
"data": maybe_buf.map(|buf| buf)
}))
});
Ok(json!({ "data": maybe_buf }))
};
Ok(JsonOp::Async(op.boxed()))
}
@ -255,14 +253,12 @@ fn op_host_get_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
let op = worker
.get_message()
.map_err(move |_| -> ErrBox { unimplemented!() })
.and_then(move |maybe_buf| {
futures::future::ok(json!({
"data": maybe_buf.map(|buf| buf)
}))
});
let fut = worker.get_message();
let op = async move {
let maybe_buf = fut.await.unwrap();
Ok(json!({ "data": maybe_buf }))
};
Ok(JsonOp::Async(op.boxed()))
}

View file

@ -301,7 +301,7 @@ impl ThreadSafeState {
}
pub fn check_dyn_import(
self: &Self,
&self,
module_specifier: &ModuleSpecifier,
) -> Result<(), ErrBox> {
let u = module_specifier.as_url();

View file

@ -157,7 +157,7 @@ impl Worker {
///
/// This method blocks current thread.
pub fn post_message(
self: &Self,
&self,
buf: Buf,
) -> impl Future<Output = Result<(), ErrBox>> {
let channels = self.external_channels.lock().unwrap();
@ -170,7 +170,7 @@ impl Worker {
}
/// Get message from worker as a host.
pub fn get_message(self: &Self) -> WorkerReceiver {
pub fn get_message(&self) -> WorkerReceiver {
WorkerReceiver {
channels: self.external_channels.clone(),
}