diff --git a/cli/dispatch_minimal.rs b/cli/dispatch_minimal.rs index 643fdaed9a..0e20d3b1ae 100644 --- a/cli/dispatch_minimal.rs +++ b/cli/dispatch_minimal.rs @@ -6,7 +6,6 @@ //! message or a "minimal" message. use crate::state::ThreadSafeState; use deno::Buf; -use deno::CoreOp; use deno::Op; use deno::PinnedBuf; use futures::Future; @@ -18,6 +17,7 @@ const OP_WRITE: i32 = 2; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. pub struct Record { + pub promise_id: i32, pub op_id: i32, pub arg: i32, pub result: i32, @@ -25,9 +25,15 @@ pub struct Record { impl Into for Record { fn into(self) -> Buf { - let vec = vec![DISPATCH_MINIMAL_TOKEN, self.op_id, self.arg, self.result]; + let vec = vec![ + DISPATCH_MINIMAL_TOKEN, + self.promise_id, + self.op_id, + self.arg, + self.result, + ]; let buf32 = vec.into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 4 * 4]; + let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4]; unsafe { Box::from_raw(ptr) } } } @@ -39,32 +45,36 @@ pub fn parse_min_record(bytes: &[u8]) -> Option { let p = bytes.as_ptr(); #[allow(clippy::cast_ptr_alignment)] let p32 = p as *const i32; - let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 3) }; + let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }; - if s.len() < 4 { + if s.len() < 5 { return None; } let ptr = s.as_ptr(); - let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; + let ints = unsafe { std::slice::from_raw_parts(ptr, 5) }; if ints[0] != DISPATCH_MINIMAL_TOKEN { return None; } Some(Record { - op_id: ints[1], - arg: ints[2], - result: ints[3], + promise_id: ints[1], + op_id: ints[2], + arg: ints[3], + result: ints[4], }) } #[test] fn test_parse_min_record() { - let buf = vec![0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]; + let buf = vec![ + 0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, + ]; assert_eq!( parse_min_record(&buf), Some(Record { - op_id: 1, - arg: 2, - result: 3, + promise_id: 1, + op_id: 2, + arg: 3, + result: 4, }) ); @@ -79,7 +89,8 @@ pub fn dispatch_minimal( state: &ThreadSafeState, mut record: Record, zero_copy: Option, -) -> CoreOp { +) -> Op { + let is_sync = record.promise_id == 0; let min_op = match record.op_id { OP_READ => ops::read(record.arg, zero_copy), OP_WRITE => ops::write(record.arg, zero_copy), @@ -104,7 +115,11 @@ pub fn dispatch_minimal( state.metrics_op_completed(buf.len()); Ok(buf) })); - Op::Async(fut) + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } } mod ops { diff --git a/cli/errors.rs b/cli/errors.rs index 67eb54ea79..eb0fc7d276 100644 --- a/cli/errors.rs +++ b/cli/errors.rs @@ -243,20 +243,6 @@ pub fn no_buffer_specified() -> DenoError { new(ErrorKind::InvalidInput, String::from("no buffer specified")) } -pub fn no_async_support() -> DenoError { - new( - ErrorKind::NoAsyncSupport, - String::from("op doesn't support async calls"), - ) -} - -pub fn no_sync_support() -> DenoError { - new( - ErrorKind::NoSyncSupport, - String::from("op doesn't support sync calls"), - ) -} - #[derive(Debug)] pub enum RustOrJsError { Rust(DenoError), diff --git a/cli/msg.fbs b/cli/msg.fbs index e034aa687a..56410097c3 100644 --- a/cli/msg.fbs +++ b/cli/msg.fbs @@ -136,8 +136,6 @@ enum ErrorKind: byte { OpNotAvaiable, WorkerInitFailed, UnixError, - NoAsyncSupport, - NoSyncSupport, ImportMapError, } @@ -155,6 +153,7 @@ enum MediaType: byte { } table Base { + cmd_id: uint32; sync: bool = false; error_kind: ErrorKind = NoError; error: string; diff --git a/cli/ops.rs b/cli/ops.rs index b14b6b1c64..e8fa47aada 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -27,14 +27,13 @@ use crate::version; use crate::worker::Worker; use deno::js_check; use deno::Buf; -use deno::CoreOp; use deno::JSError; use deno::ModuleSpecifier; use deno::Op; -use deno::OpResult; use deno::PinnedBuf; use flatbuffers::FlatBufferBuilder; use futures; +use futures::future; use futures::Async; use futures::Poll; use futures::Sink; @@ -62,13 +61,17 @@ use std::os::unix::fs::PermissionsExt; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; -type CliOpResult = OpResult; +type OpResult = DenoResult; -type CliDispatchFn = +pub type OpWithError = dyn Future + Send; + +// TODO Ideally we wouldn't have to box the OpWithError being returned. +// The box is just to make it easier to get a prototype refactor working. +type OpCreator = fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option) - -> CliOpResult; + -> Box; -pub type OpSelector = fn(inner_type: msg::Any) -> Option; +pub type OpSelector = fn(inner_type: msg::Any) -> Option; #[inline] fn empty_buf() -> Buf { @@ -80,7 +83,7 @@ pub fn dispatch_all( control: &[u8], zero_copy: Option, op_selector: OpSelector, -) -> CoreOp { +) -> Op { let bytes_sent_control = control.len(); let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); let op = if let Some(min_record) = parse_min_record(control) { @@ -101,90 +104,81 @@ pub fn dispatch_all_legacy( control: &[u8], zero_copy: Option, op_selector: OpSelector, -) -> CoreOp { +) -> Op { let base = msg::get_root_as_base(&control); - let inner_type = base.inner_type(); let is_sync = base.sync(); + let inner_type = base.inner_type(); + let cmd_id = base.cmd_id(); - debug!( - "msg_from_js {} sync {}", - msg::enum_name_any(inner_type), - is_sync - ); - - let op_func: CliDispatchFn = match op_selector(inner_type) { + let op_func: OpCreator = match op_selector(inner_type) { Some(v) => v, None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)), }; - let op_result = op_func(state, &base, zero_copy); + let op: Box = op_func(state, &base, zero_copy); let state = state.clone(); - match op_result { - Ok(Op::Sync(buf)) => { - state.metrics_op_completed(buf.len()); - Op::Sync(buf) - } - Ok(Op::Async(fut)) => { - let result_fut = Box::new( - fut.or_else(move |err: DenoError| -> Result { - debug!("op err {}", err); - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a Buf. - let builder = &mut FlatBufferBuilder::new(); - let errmsg_offset = builder.create_string(&format!("{}", err)); - Ok(serialize_response( - builder, - msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - )) - }).and_then(move |buf: Buf| -> Result { - // Handle empty responses. For sync responses we just want - // to send null. For async we want to send a small message - // with the cmd_id. - let buf = if buf.len() > 0 { - buf - } else { - let builder = &mut FlatBufferBuilder::new(); - serialize_response( - builder, - msg::BaseArgs { - ..Default::default() - }, - ) - }; - state.metrics_op_completed(buf.len()); - Ok(buf) - }).map_err(|err| panic!("unexpected error {:?}", err)), - ); - Op::Async(result_fut) - } - Err(err) => { + let fut = Box::new( + op.or_else(move |err: DenoError| -> Result { debug!("op err {}", err); // No matter whether we got an Err or Ok, we want a serialized message to // send back. So transform the DenoError into a Buf. let builder = &mut FlatBufferBuilder::new(); let errmsg_offset = builder.create_string(&format!("{}", err)); - let response_buf = serialize_response( + Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { error: Some(errmsg_offset), error_kind: err.kind(), ..Default::default() }, - ); - state.metrics_op_completed(response_buf.len()); - Op::Sync(response_buf) - } + )) + }).and_then(move |buf: Buf| -> Result { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if is_sync || buf.len() > 0 { + buf + } else { + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) + }; + state.metrics_op_completed(buf.len()); + Ok(buf) + }).map_err(|err| panic!("unexpected error {:?}", err)), + ); + + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(inner_type), + base.sync() + ); + + if base.sync() { + // TODO(ry) This is not correct! If the sync op is not actually synchronous + // (like in the case of op_fetch_module_meta_data) this wait() will block + // a thread in the Tokio runtime. Depending on the size of the runtime's + // thread pool, this may result in a dead lock! + // + // The solution is that ops should return an Op directly. Op::Sync contains + // the result value, so if its returned directly from the OpCreator, we + // know it has actually be evaluated synchronously. + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) } } /// Standard ops set for most isolates -pub fn op_selector_std(inner_type: msg::Any) -> Option { +pub fn op_selector_std(inner_type: msg::Any) -> Option { match inner_type { msg::Any::Accept => Some(op_accept), msg::Any::Cache => Some(op_cache), @@ -253,9 +247,9 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option { // nanoseconds are rounded on 2ms. fn op_now( state: &ThreadSafeState, - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let seconds = state.start_time.elapsed().as_secs(); let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); @@ -276,8 +270,8 @@ fn op_now( subsec_nanos, }, ); - - ok_buf(serialize_response( + ok_future(serialize_response( + base.cmd_id(), builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -289,10 +283,9 @@ fn op_now( fn op_is_tty( _state: &ThreadSafeState, - - _base: &msg::Base<'_>, + base: &msg::Base<'_>, _data: Option, -) -> CliOpResult { +) -> Box { let builder = &mut FlatBufferBuilder::new(); let inner = msg::IsTTYRes::create( builder, @@ -302,7 +295,8 @@ fn op_is_tty( stderr: atty::is(atty::Stream::Stderr), }, ); - ok_buf(serialize_response( + ok_future(serialize_response( + base.cmd_id(), builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -314,19 +308,18 @@ fn op_is_tty( fn op_exit( _state: &ThreadSafeState, - base: &msg::Base<'_>, _data: Option, -) -> CliOpResult { +) -> Box { let inner = base.inner_as_exit().unwrap(); std::process::exit(inner.code()) } fn op_start( state: &ThreadSafeState, - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let mut builder = FlatBufferBuilder::new(); @@ -375,7 +368,8 @@ fn op_start( }, ); - ok_buf(serialize_response( + ok_future(serialize_response( + base.cmd_id(), &mut builder, msg::BaseArgs { inner_type: msg::Any::StartRes, @@ -389,7 +383,7 @@ fn op_format_error( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_format_error().unwrap(); let orig_error = String::from(inner.error().unwrap()); @@ -408,22 +402,23 @@ fn op_format_error( }, ); - let response_buf = serialize_response( + ok_future(serialize_response( + base.cmd_id(), &mut builder, msg::BaseArgs { inner_type: msg::Any::FormatErrorRes, inner: Some(inner.as_union_value()), ..Default::default() }, - ); - - ok_buf(response_buf) + )) } fn serialize_response( + cmd_id: u32, builder: &mut FlatBufferBuilder<'_>, - args: msg::BaseArgs<'_>, + mut args: msg::BaseArgs<'_>, ) -> Buf { + args.cmd_id = cmd_id; let base = msg::Base::create(builder, &args); msg::finish_base_buffer(builder, base); let data = builder.finished_data(); @@ -432,20 +427,21 @@ fn serialize_response( } #[inline] -pub fn ok_future(buf: Buf) -> CliOpResult { - Ok(Op::Async(Box::new(futures::future::ok(buf)))) +pub fn ok_future(buf: Buf) -> Box { + Box::new(futures::future::ok(buf)) } +// Shout out to Earl Sweatshirt. #[inline] -pub fn ok_buf(buf: Buf) -> CliOpResult { - Ok(Op::Sync(buf)) +pub fn odd_future(err: DenoError) -> Box { + Box::new(futures::future::err(err)) } fn op_cache( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_cache().unwrap(); let extension = inner.extension().unwrap(); @@ -459,9 +455,11 @@ fn op_cache( // cache path. In the future, checksums will not be used in the cache // filenames and this requirement can be removed. See // https://github.com/denoland/deno/issues/2057 - let module_meta_data = state - .dir - .fetch_module_meta_data(module_id, ".", true, true)?; + let r = state.dir.fetch_module_meta_data(module_id, ".", true, true); + if let Err(err) = r { + return odd_future(err); + } + let module_meta_data = r.unwrap(); let (js_cache_path, source_map_path) = state .dir @@ -469,15 +467,21 @@ fn op_cache( if extension == ".map" { debug!("cache {:?}", source_map_path); - fs::write(source_map_path, contents).map_err(DenoError::from)?; + let r = fs::write(source_map_path, contents); + if let Err(err) = r { + return odd_future(err.into()); + } } else if extension == ".js" { debug!("cache {:?}", js_cache_path); - fs::write(js_cache_path, contents).map_err(DenoError::from)?; + let r = fs::write(js_cache_path, contents); + if let Err(err) = r { + return odd_future(err.into()); + } } else { unreachable!(); } - ok_buf(empty_buf()) + ok_future(empty_buf()) } // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 @@ -485,13 +489,10 @@ fn op_fetch_module_meta_data( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if !base.sync() { - return Err(errors::no_async_support()); - } +) -> Box { assert!(data.is_none()); let inner = base.inner_as_fetch_module_meta_data().unwrap(); - + let cmd_id = base.cmd_id(); let specifier = inner.specifier().unwrap(); let referrer = inner.referrer().unwrap(); @@ -509,7 +510,7 @@ fn op_fetch_module_meta_data( Some(module_specifier) => module_specifier.to_string(), None => specifier.to_string(), }, - Err(err) => return Err(DenoError::from(err)), + Err(err) => return odd_future(DenoError::from(err)), }, None => specifier.to_string(), }; @@ -532,6 +533,7 @@ fn op_fetch_module_meta_data( }; let inner = msg::FetchModuleMetaDataRes::create(builder, &msg_args); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -541,50 +543,52 @@ fn op_fetch_module_meta_data( )) }); + // Unfortunately TypeScript's CompilerHost interface does not leave room for + // asynchronous source code fetching. This complicates things greatly and + // requires us to use tokio_util::block_on() below. + assert!(base.sync()); + // WARNING: Here we use tokio_util::block_on() which starts a new Tokio // runtime for executing the future. This is so we don't inadvernently run // out of threads in the main runtime. - let result_buf = tokio_util::block_on(fut)?; - Ok(Op::Sync(result_buf)) + Box::new(futures::future::result(tokio_util::block_on(fut))) } fn op_chdir( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_chdir().unwrap(); let directory = inner.directory().unwrap(); - std::env::set_current_dir(&directory)?; - ok_buf(empty_buf()) + Box::new(futures::future::result(|| -> OpResult { + std::env::set_current_dir(&directory)?; + Ok(empty_buf()) + }())) } fn op_global_timer_stop( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if !base.sync() { - return Err(errors::no_async_support()); - } +) -> Box { + assert!(base.sync()); assert!(data.is_none()); let state = state; let mut t = state.global_timer.lock().unwrap(); t.cancel(); - Ok(Op::Sync(empty_buf())) + ok_future(empty_buf()) } fn op_global_timer( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if base.sync() { - return Err(errors::no_sync_support()); - } +) -> Box { + assert!(!base.sync()); assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_global_timer().unwrap(); let val = inner.timeout(); assert!(val >= 0); @@ -594,11 +598,12 @@ fn op_global_timer( let deadline = Instant::now() + Duration::from_millis(val as u64); let f = t.new_timeout(deadline); - Ok(Op::Async(Box::new(f.then(move |_| { + Box::new(f.then(move |_| { let builder = &mut FlatBufferBuilder::new(); let inner = msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {}); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -606,31 +611,36 @@ fn op_global_timer( ..Default::default() }, )) - })))) + })) } fn op_set_env( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_set_env().unwrap(); let key = inner.key().unwrap(); let value = inner.value().unwrap(); - state.check_env()?; + if let Err(e) = state.check_env() { + return odd_future(e); + } std::env::set_var(key, value); - ok_buf(empty_buf()) + ok_future(empty_buf()) } fn op_env( state: &ThreadSafeState, - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); + let cmd_id = base.cmd_id(); - state.check_env()?; + if let Err(e) = state.check_env() { + return odd_future(e); + } let builder = &mut FlatBufferBuilder::new(); let vars: Vec<_> = std::env::vars() @@ -641,24 +651,24 @@ fn op_env( builder, &msg::EnvironResArgs { map: Some(tables) }, ); - let response_buf = serialize_response( + ok_future(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), inner_type: msg::Any::EnvironRes, ..Default::default() }, - ); - ok_buf(response_buf) + )) } fn op_permissions( state: &ThreadSafeState, - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); let inner = msg::PermissionsRes::create( builder, @@ -671,26 +681,26 @@ fn op_permissions( hrtime: state.permissions.allows_hrtime(), }, ); - let response_buf = serialize_response( + ok_future(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), inner_type: msg::Any::PermissionsRes, ..Default::default() }, - ); - ok_buf(response_buf) + )) } fn op_revoke_permission( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_permission_revoke().unwrap(); let permission = inner.permission().unwrap(); - match permission { + let result = match permission { "run" => state.permissions.revoke_run(), "read" => state.permissions.revoke_read(), "write" => state.permissions.revoke_write(), @@ -698,16 +708,20 @@ fn op_revoke_permission( "env" => state.permissions.revoke_env(), "hrtime" => state.permissions.revoke_hrtime(), _ => Ok(()), - }?; - ok_buf(empty_buf()) + }; + if let Err(e) = result { + return odd_future(e); + } + ok_future(empty_buf()) } fn op_fetch( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { let inner = base.inner_as_fetch().unwrap(); + let cmd_id = base.cmd_id(); let header = inner.header().unwrap(); assert!(header.is_request()); @@ -718,10 +732,19 @@ fn op_fetch( Some(buf) => hyper::Body::from(Vec::from(&*buf)), }; - let req = msg_util::deserialize_request(header, body)?; + let maybe_req = msg_util::deserialize_request(header, body); + if let Err(e) = maybe_req { + return odd_future(e); + } + let req = maybe_req.unwrap(); - let url_ = url::Url::parse(url).map_err(DenoError::from)?; - state.check_net_url(url_)?; + let url_ = match url::Url::parse(url) { + Err(err) => return odd_future(DenoError::from(err)), + Ok(v) => v, + }; + if let Err(e) = state.check_net_url(url_) { + return odd_future(e); + } let client = http_util::get_client(); @@ -744,6 +767,7 @@ fn op_fetch( ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -752,12 +776,7 @@ fn op_fetch( }, )) }); - if base.sync() { - let result_buf = future.wait()?; - Ok(Op::Sync(result_buf)) - } else { - Ok(Op::Async(Box::new(future))) - } + Box::new(future) } // This is just type conversion. Implement From trait? @@ -775,17 +794,14 @@ where } } -fn blocking(is_sync: bool, f: F) -> CliOpResult +fn blocking(is_sync: bool, f: F) -> Box where F: 'static + Send + FnOnce() -> DenoResult, { if is_sync { - let result_buf = f()?; - Ok(Op::Sync(result_buf)) + Box::new(futures::future::result(f())) } else { - Ok(Op::Async(Box::new(tokio_util::poll_fn(move || { - convert_blocking(f) - })))) + Box::new(tokio_util::poll_fn(move || convert_blocking(f))) } } @@ -793,19 +809,22 @@ fn op_make_temp_dir( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let base = Box::new(*base); let inner = base.inner_as_make_temp_dir().unwrap(); + let cmd_id = base.cmd_id(); // FIXME - state.check_write("make_temp")?; + if let Err(e) = state.check_write("make_temp") { + return odd_future(e); + } let dir = inner.dir().map(PathBuf::from); let prefix = inner.prefix().map(String::from); let suffix = inner.suffix().map(String::from); - blocking(base.sync(), move || { + blocking(base.sync(), move || -> OpResult { // TODO(piscisaureus): use byte vector for paths, not a string. // See https://github.com/denoland/deno/issues/627. // We can't assume that paths are always valid utf8 strings. @@ -824,6 +843,7 @@ fn op_make_temp_dir( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -838,14 +858,19 @@ fn op_mkdir( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_mkdir().unwrap(); - let (path, path_) = resolve_path(inner.path().unwrap())?; + let (path, path_) = match resolve_path(inner.path().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; let recursive = inner.recursive(); let mode = inner.mode(); - state.check_write(&path_)?; + if let Err(e) = state.check_write(&path_) { + return odd_future(e); + } blocking(base.sync(), move || { debug!("op_mkdir {}", path_); @@ -858,13 +883,18 @@ fn op_chmod( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_chmod().unwrap(); let _mode = inner.mode(); - let (path, path_) = resolve_path(inner.path().unwrap())?; + let (path, path_) = match resolve_path(inner.path().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - state.check_write(&path_)?; + if let Err(e) = state.check_write(&path_) { + return odd_future(e); + } blocking(base.sync(), move || { debug!("op_chmod {}", &path_); @@ -884,14 +914,16 @@ fn op_chown( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_chown().unwrap(); let path = String::from(inner.path().unwrap()); let uid = inner.uid(); let gid = inner.gid(); - state.check_write(&path)?; + if let Err(e) = state.check_write(&path) { + return odd_future(e); + } blocking(base.sync(), move || { debug!("op_chown {}", &path); @@ -906,11 +938,14 @@ fn op_open( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_open().unwrap(); - let (filename, filename_) = resolve_path(inner.filename().unwrap())?; + let (filename, filename_) = match resolve_path(inner.filename().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; let mode = inner.mode().unwrap(); let mut open_options = tokio::fs::OpenOptions::new(); @@ -951,26 +986,35 @@ fn op_open( match mode { "r" => { - state.check_read(&filename_)?; + if let Err(e) = state.check_read(&filename_) { + return odd_future(e); + } } "w" | "a" | "x" => { - state.check_write(&filename_)?; + if let Err(e) = state.check_write(&filename_) { + return odd_future(e); + } } &_ => { - state.check_read(&filename_)?; - state.check_write(&filename_)?; + if let Err(e) = state.check_read(&filename_) { + return odd_future(e); + } + if let Err(e) = state.check_write(&filename_) { + return odd_future(e); + } } } let op = open_options .open(filename) .map_err(DenoError::from) - .and_then(move |fs_file| { + .and_then(move |fs_file| -> OpResult { let resource = resources::add_fs_file(fs_file); let builder = &mut FlatBufferBuilder::new(); let inner = msg::OpenRes::create(builder, &msg::OpenResArgs { rid: resource.rid }); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -979,27 +1023,22 @@ fn op_open( }, )) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + Box::new(op) } fn op_close( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_close().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { - None => Err(errors::bad_resource()), + None => odd_future(errors::bad_resource()), Some(resource) => { resource.close(); - ok_buf(empty_buf()) + ok_future(empty_buf()) } } } @@ -1008,14 +1047,14 @@ fn op_kill( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_kill().unwrap(); let pid = inner.pid(); let signo = inner.signo(); match kill(pid, signo) { - Ok(_) => ok_buf(empty_buf()), - Err(e) => Err(e), + Ok(_) => ok_future(empty_buf()), + Err(e) => odd_future(e), } } @@ -1023,13 +1062,13 @@ fn op_shutdown( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_shutdown().unwrap(); let rid = inner.rid(); let how = inner.how(); match resources::lookup(rid) { - None => Err(errors::bad_resource()), + None => odd_future(errors::bad_resource()), Some(mut resource) => { let shutdown_mode = match how { 0 => Shutdown::Read, @@ -1049,12 +1088,13 @@ fn op_read( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { + let cmd_id = base.cmd_id(); let inner = base.inner_as_read().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { - None => Err(errors::bad_resource()), + None => odd_future(errors::bad_resource()), Some(resource) => { let op = tokio::io::read(resource, data.unwrap()) .map_err(DenoError::from) @@ -1068,6 +1108,7 @@ fn op_read( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1076,12 +1117,7 @@ fn op_read( }, )) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + Box::new(op) } } } @@ -1090,12 +1126,13 @@ fn op_write( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { + let cmd_id = base.cmd_id(); let inner = base.inner_as_write().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { - None => Err(errors::bad_resource()), + None => odd_future(errors::bad_resource()), Some(resource) => { let op = tokio_write::write(resource, data.unwrap()) .map_err(DenoError::from) @@ -1108,6 +1145,7 @@ fn op_write( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1116,12 +1154,7 @@ fn op_write( }, )) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + Box::new(op) } } } @@ -1130,24 +1163,20 @@ fn op_seek( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); + let _cmd_id = base.cmd_id(); let inner = base.inner_as_seek().unwrap(); let rid = inner.rid(); let offset = inner.offset(); let whence = inner.whence(); match resources::lookup(rid) { - None => Err(errors::bad_resource()), + None => odd_future(errors::bad_resource()), Some(resource) => { let op = resources::seek(resource, offset, whence) .and_then(move |_| Ok(empty_buf())); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + Box::new(op) } } } @@ -1156,13 +1185,18 @@ fn op_remove( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_remove().unwrap(); - let (path, path_) = resolve_path(inner.path().unwrap())?; + let (path, path_) = match resolve_path(inner.path().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; let recursive = inner.recursive(); - state.check_write(&path_)?; + if let Err(e) = state.check_write(&path_) { + return odd_future(e); + } blocking(base.sync(), move || { debug!("op_remove {}", path.display()); @@ -1182,14 +1216,24 @@ fn op_copy_file( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_copy_file().unwrap(); - let (from, from_) = resolve_path(inner.from().unwrap())?; - let (to, to_) = resolve_path(inner.to().unwrap())?; + let (from, from_) = match resolve_path(inner.from().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; + let (to, to_) = match resolve_path(inner.to().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - state.check_read(&from_)?; - state.check_write(&to_)?; + if let Err(e) = state.check_read(&from_) { + return odd_future(e); + } + if let Err(e) = state.check_write(&to_) { + return odd_future(e); + } debug!("op_copy_file {} {}", from.display(), to.display()); blocking(base.sync(), move || { @@ -1230,40 +1274,47 @@ fn get_mode(_perm: &fs::Permissions) -> u32 { fn op_cwd( _state: &ThreadSafeState, - - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - - let path = std::env::current_dir()?; - let builder = &mut FlatBufferBuilder::new(); - let cwd = - builder.create_string(&path.into_os_string().into_string().unwrap()); - let inner = msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) }); - let response_buf = serialize_response( - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::CwdRes, - ..Default::default() - }, - ); - ok_buf(response_buf) + let cmd_id = base.cmd_id(); + Box::new(futures::future::result(|| -> OpResult { + let path = std::env::current_dir()?; + let builder = &mut FlatBufferBuilder::new(); + let cwd = + builder.create_string(&path.into_os_string().into_string().unwrap()); + let inner = + msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) }); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::CwdRes, + ..Default::default() + }, + )) + }())) } fn op_stat( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_stat().unwrap(); - - let (filename, filename_) = resolve_path(inner.filename().unwrap())?; + let cmd_id = base.cmd_id(); + let (filename, filename_) = match resolve_path(inner.filename().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; let lstat = inner.lstat(); - state.check_read(&filename_)?; + if let Err(e) = state.check_read(&filename_) { + return odd_future(e); + } blocking(base.sync(), move || { let builder = &mut FlatBufferBuilder::new(); @@ -1290,6 +1341,7 @@ fn op_stat( ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1304,14 +1356,20 @@ fn op_read_dir( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_read_dir().unwrap(); - let (path, path_) = resolve_path(inner.path().unwrap())?; + let cmd_id = base.cmd_id(); + let (path, path_) = match resolve_path(inner.path().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - state.check_read(&path_)?; + if let Err(e) = state.check_read(&path_) { + return odd_future(e); + } - blocking(base.sync(), move || { + blocking(base.sync(), move || -> OpResult { debug!("op_read_dir {}", path.display()); let builder = &mut FlatBufferBuilder::new(); let entries: Vec<_> = fs::read_dir(path)? @@ -1345,6 +1403,7 @@ fn op_read_dir( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1359,15 +1418,22 @@ fn op_rename( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_rename().unwrap(); - let (oldpath, _) = resolve_path(inner.oldpath().unwrap())?; - let (newpath, newpath_) = resolve_path(inner.newpath().unwrap())?; + let (oldpath, _) = match resolve_path(inner.oldpath().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; + let (newpath, newpath_) = match resolve_path(inner.newpath().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - state.check_write(&newpath_)?; - - blocking(base.sync(), move || { + if let Err(e) = state.check_write(&newpath_) { + return odd_future(e); + } + blocking(base.sync(), move || -> OpResult { debug!("op_rename {} {}", oldpath.display(), newpath.display()); fs::rename(&oldpath, &newpath)?; Ok(empty_buf()) @@ -1378,15 +1444,23 @@ fn op_link( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_link().unwrap(); - let (oldname, _) = resolve_path(inner.oldname().unwrap())?; - let (newname, newname_) = resolve_path(inner.newname().unwrap())?; + let (oldname, _) = match resolve_path(inner.oldname().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; + let (newname, newname_) = match resolve_path(inner.newname().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - state.check_write(&newname_)?; + if let Err(e) = state.check_write(&newname_) { + return odd_future(e); + } - blocking(base.sync(), move || { + blocking(base.sync(), move || -> OpResult { debug!("op_link {} {}", oldname.display(), newname.display()); std::fs::hard_link(&oldname, &newname)?; Ok(empty_buf()) @@ -1397,18 +1471,29 @@ fn op_symlink( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_symlink().unwrap(); - let (oldname, _) = resolve_path(inner.oldname().unwrap())?; - let (newname, newname_) = resolve_path(inner.newname().unwrap())?; + let (oldname, _) = match resolve_path(inner.oldname().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; + let (newname, newname_) = match resolve_path(inner.newname().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - state.check_write(&newname_)?; + if let Err(e) = state.check_write(&newname_) { + return odd_future(e); + } // TODO Use type for Windows. if cfg!(windows) { - return Err(errors::new(ErrorKind::Other, "Not implemented".to_string())); + return odd_future(errors::new( + ErrorKind::Other, + "Not implemented".to_string(), + )); } - blocking(base.sync(), move || { + blocking(base.sync(), move || -> OpResult { debug!("op_symlink {} {}", oldname.display(), newname.display()); #[cfg(any(unix))] std::os::unix::fs::symlink(&oldname, &newname)?; @@ -1420,15 +1505,20 @@ fn op_read_link( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_readlink().unwrap(); + let cmd_id = base.cmd_id(); + let (name, name_) = match resolve_path(inner.name().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; - let (name, name_) = resolve_path(inner.name().unwrap())?; + if let Err(e) = state.check_read(&name_) { + return odd_future(e); + } - state.check_read(&name_)?; - - blocking(base.sync(), move || { + blocking(base.sync(), move || -> OpResult { debug!("op_read_link {}", name.display()); let path = fs::read_link(&name)?; let builder = &mut FlatBufferBuilder::new(); @@ -1440,6 +1530,7 @@ fn op_read_link( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1454,10 +1545,10 @@ fn op_repl_start( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_repl_start().unwrap(); - + let cmd_id = base.cmd_id(); let history_file = String::from(inner.history_file().unwrap()); debug!("op_repl_start {}", history_file); @@ -1470,7 +1561,8 @@ fn op_repl_start( builder, &msg::ReplStartResArgs { rid: resource.rid }, ); - ok_buf(serialize_response( + ok_future(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1484,15 +1576,15 @@ fn op_repl_readline( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_repl_readline().unwrap(); - + let cmd_id = base.cmd_id(); let rid = inner.rid(); let prompt = inner.prompt().unwrap().to_owned(); debug!("op_repl_readline {} {}", rid, prompt); - blocking(base.sync(), move || { + blocking(base.sync(), move || -> OpResult { let repl = resources::get_repl(rid)?; let line = repl.lock().unwrap().readline(&prompt)?; @@ -1505,6 +1597,7 @@ fn op_repl_readline( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1519,14 +1612,19 @@ fn op_truncate( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_truncate().unwrap(); - let (filename, filename_) = resolve_path(inner.name().unwrap())?; + let (filename, filename_) = match resolve_path(inner.name().unwrap()) { + Err(err) => return odd_future(err), + Ok(v) => v, + }; let len = inner.len(); - state.check_write(&filename_)?; + if let Err(e) = state.check_write(&filename_) { + return odd_future(e); + } blocking(base.sync(), move || { debug!("op_truncate {} {}", filename_, len); @@ -1540,7 +1638,7 @@ fn op_utime( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); let inner = base.inner_as_utime().unwrap(); @@ -1548,7 +1646,9 @@ fn op_utime( let atime = inner.atime(); let mtime = inner.mtime(); - state.check_write(&filename)?; + if let Err(e) = state.check_write(&filename) { + return odd_future(e); + } blocking(base.sync(), move || { debug!("op_utimes {} {} {}", filename, atime, mtime); @@ -1561,35 +1661,41 @@ fn op_listen( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_listen().unwrap(); let network = inner.network().unwrap(); assert_eq!(network, "tcp"); let address = inner.address().unwrap(); - state.check_net(&address)?; + if let Err(e) = state.check_net(&address) { + return odd_future(e); + } - let addr = resolve_addr(address).wait()?; - let listener = TcpListener::bind(&addr)?; - let resource = resources::add_tcp_listener(listener); + Box::new(futures::future::result((move || { + let addr = resolve_addr(address).wait()?; + let listener = TcpListener::bind(&addr)?; + let resource = resources::add_tcp_listener(listener); - let builder = &mut FlatBufferBuilder::new(); - let inner = - msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid }); - let response_buf = serialize_response( - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ListenRes, - ..Default::default() - }, - ); - ok_buf(response_buf) + let builder = &mut FlatBufferBuilder::new(); + let inner = msg::ListenRes::create( + builder, + &msg::ListenResArgs { rid: resource.rid }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::ListenRes, + ..Default::default() + }, + )) + })())) } -fn new_conn(tcp_stream: TcpStream) -> DenoResult { +fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); // TODO forward socket_addr to client. @@ -1602,6 +1708,7 @@ fn new_conn(tcp_stream: TcpStream) -> DenoResult { }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1615,24 +1722,21 @@ fn op_accept( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_accept().unwrap(); let server_rid = inner.rid(); match resources::lookup(server_rid) { - None => Err(errors::bad_resource()), + None => odd_future(errors::bad_resource()), Some(server_resource) => { let op = tokio_util::accept(server_resource) .map_err(DenoError::from) - .and_then(move |(tcp_stream, _socket_addr)| new_conn(tcp_stream)); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + .and_then(move |(tcp_stream, _socket_addr)| { + new_conn(cmd_id, tcp_stream) + }); + Box::new(op) } } } @@ -1641,15 +1745,17 @@ fn op_dial( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_dial().unwrap(); let network = inner.network().unwrap(); assert_eq!(network, "tcp"); // TODO Support others. let address = inner.address().unwrap(); - state.check_net(&address)?; + if let Err(e) = state.check_net(&address) { + return odd_future(e); + } let op = resolve_addr(address) @@ -1657,30 +1763,26 @@ fn op_dial( .and_then(move |addr| { TcpStream::connect(&addr) .map_err(DenoError::from) - .and_then(new_conn) + .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + Box::new(op) } fn op_metrics( state: &ThreadSafeState, - - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); + let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); let inner = msg::MetricsRes::create( builder, &msg::MetricsResArgs::from(&state.metrics), ); - ok_buf(serialize_response( + ok_future(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1692,10 +1794,11 @@ fn op_metrics( fn op_resources( _state: &ThreadSafeState, - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); + let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); let serialized_resources = table_entries(); @@ -1722,7 +1825,8 @@ fn op_resources( }, ); - ok_buf(serialize_response( + ok_future(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1744,12 +1848,13 @@ fn op_run( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if !base.sync() { - return Err(errors::no_async_support()); - } +) -> Box { + assert!(base.sync()); + let cmd_id = base.cmd_id(); - state.check_run()?; + if let Err(e) = state.check_run() { + return odd_future(e); + } assert!(data.is_none()); let inner = base.inner_as_run().unwrap(); @@ -1773,7 +1878,12 @@ fn op_run( c.stderr(subprocess_stdio_map(inner.stderr())); // Spawn the command. - let child = c.spawn_async().map_err(DenoError::from)?; + let child = match c.spawn_async() { + Ok(v) => v, + Err(err) => { + return odd_future(err.into()); + } + }; let pid = child.id(); let resources = resources::add_child(child); @@ -1796,29 +1906,37 @@ fn op_run( let builder = &mut FlatBufferBuilder::new(); let inner = msg::RunRes::create(builder, &res_args); - Ok(Op::Sync(serialize_response( + ok_future(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), inner_type: msg::Any::RunRes, ..Default::default() }, - ))) + )) } fn op_run_status( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_run_status().unwrap(); let rid = inner.rid(); - state.check_run()?; + if let Err(e) = state.check_run() { + return odd_future(e); + } - let future = resources::child_status(rid)?; + let future = match resources::child_status(rid) { + Err(e) => { + return odd_future(e); + } + Ok(f) => f, + }; let future = future.and_then(move |run_status| { let code = run_status.code(); @@ -1843,6 +1961,7 @@ fn op_run_status( }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1851,12 +1970,7 @@ fn op_run_status( }, )) }); - if base.sync() { - let buf = future.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(future))) - } + Box::new(future) } struct GetMessageFuture { @@ -1880,11 +1994,9 @@ fn op_worker_get_message( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if base.sync() { - return Err(errors::no_sync_support()); - } +) -> Box { assert!(data.is_none()); + let cmd_id = base.cmd_id(); let op = GetMessageFuture { state: state.clone(), @@ -1900,6 +2012,7 @@ fn op_worker_get_message( &msg::WorkerGetMessageResArgs { data }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(inner.as_union_value()), @@ -1908,32 +2021,37 @@ fn op_worker_get_message( }, )) }); - Ok(Op::Async(Box::new(op))) + Box::new(op) } /// Post message to host as guest worker fn op_worker_post_message( state: &ThreadSafeState, - _base: &msg::Base<'_>, + base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { + let cmd_id = base.cmd_id(); + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let tx = { let wc = state.worker_channels.lock().unwrap(); wc.0.clone() }; - tx.send(d) - .wait() - .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?; - let builder = &mut FlatBufferBuilder::new(); + let op = tx.send(d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult { + let builder = &mut FlatBufferBuilder::new(); - ok_buf(serialize_response( - builder, - msg::BaseArgs { - ..Default::default() - }, - )) + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) } /// Create worker as the host @@ -1941,9 +2059,9 @@ fn op_create_worker( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_create_worker().unwrap(); let specifier = inner.specifier().unwrap(); @@ -1963,33 +2081,39 @@ fn op_create_worker( js_check(worker.execute("denoMain()")); js_check(worker.execute("workerMain()")); - let module_specifier = ModuleSpecifier::resolve_root(specifier)?; + let op = ModuleSpecifier::resolve_root(specifier) + .and_then(|module_specifier| { + Ok( + worker + .execute_mod_async(&module_specifier, false) + .and_then(move |()| { + let mut workers_tl = parent_state.workers.lock().unwrap(); + workers_tl.insert(rid, worker.shared()); + let builder = &mut FlatBufferBuilder::new(); + let msg_inner = msg::CreateWorkerRes::create( + builder, + &msg::CreateWorkerResArgs { rid }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::CreateWorkerRes, + ..Default::default() + }, + )) + }).map_err(|err| match err { + errors::RustOrJsError::Js(_) => errors::worker_init_failed(), + errors::RustOrJsError::Rust(err) => err, + }), + ) + }).map_err(DenoError::from); - let op = worker - .execute_mod_async(&module_specifier, false) - .and_then(move |()| { - let mut workers_tl = parent_state.workers.lock().unwrap(); - workers_tl.insert(rid, worker.shared()); - let builder = &mut FlatBufferBuilder::new(); - let msg_inner = msg::CreateWorkerRes::create( - builder, - &msg::CreateWorkerResArgs { rid }, - ); - Ok(serialize_response( - builder, - msg::BaseArgs { - inner: Some(msg_inner.as_union_value()), - inner_type: msg::Any::CreateWorkerRes, - ..Default::default() - }, - )) - }).map_err(|err| match err { - errors::RustOrJsError::Js(_) => errors::worker_init_failed(), - errors::RustOrJsError::Rust(err) => err, - }); - - let result = op.wait()?; - Ok(Op::Sync(result)) + Box::new(match op { + Ok(op) => future::Either::A(op), + Err(err) => future::Either::B(future::result(Err(err))), + }) } /// Return when the worker closes @@ -1997,12 +2121,9 @@ fn op_host_get_worker_closed( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if base.sync() { - return Err(errors::no_sync_support()); - } +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_worker_closed().unwrap(); let rid = inner.rid(); let state = state.clone(); @@ -2013,17 +2134,17 @@ fn op_host_get_worker_closed( worker.clone() }; - let op = Box::new(shared_worker_future.then(move |_result| { + Box::new(shared_worker_future.then(move |_result| { let builder = &mut FlatBufferBuilder::new(); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { ..Default::default() }, )) - })); - Ok(Op::Async(Box::new(op))) + })) } /// Get message from guest worker as host @@ -2031,12 +2152,9 @@ fn op_host_get_message( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { - if base.sync() { - return Err(errors::no_sync_support()); - } +) -> Box { assert!(data.is_none()); - + let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_message().unwrap(); let rid = inner.rid(); @@ -2051,6 +2169,7 @@ fn op_host_get_message( &msg::HostGetMessageResArgs { data }, ); Ok(serialize_response( + cmd_id, builder, msg::BaseArgs { inner: Some(msg_inner.as_union_value()), @@ -2059,7 +2178,7 @@ fn op_host_get_message( }, )) }); - Ok(Op::Async(Box::new(op))) + Box::new(op) } /// Post message to guest worker as host @@ -2067,30 +2186,34 @@ fn op_host_post_message( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { + let cmd_id = base.cmd_id(); let inner = base.inner_as_host_post_message().unwrap(); let rid = inner.rid(); let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - resources::post_message_to_worker(rid, d) - .wait() - .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?; - let builder = &mut FlatBufferBuilder::new(); + let op = resources::post_message_to_worker(rid, d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult { + let builder = &mut FlatBufferBuilder::new(); - ok_buf(serialize_response( - builder, - msg::BaseArgs { - ..Default::default() - }, - )) + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) } fn op_get_random_values( state: &ThreadSafeState, _base: &msg::Base<'_>, data: Option, -) -> CliOpResult { +) -> Box { if let Some(ref seeded_rng) = state.seeded_rng { let mut rng = seeded_rng.lock().unwrap(); rng.fill(&mut data.unwrap()[..]); @@ -2099,5 +2222,5 @@ fn op_get_random_values( rng.fill(&mut data.unwrap()[..]); } - ok_buf(empty_buf()) + Box::new(ok_future(empty_buf())) } diff --git a/cli/state.rs b/cli/state.rs index aa4690d445..f5eb8ae7ab 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -15,9 +15,9 @@ use crate::resources; use crate::resources::ResourceId; use crate::worker::Worker; use deno::Buf; -use deno::CoreOp; use deno::Loader; use deno::ModuleSpecifier; +use deno::Op; use deno::PinnedBuf; use futures::future::Either; use futures::future::Shared; @@ -106,11 +106,7 @@ impl Deref for ThreadSafeState { } impl ThreadSafeState { - pub fn dispatch( - &self, - control: &[u8], - zero_copy: Option, - ) -> CoreOp { + pub fn dispatch(&self, control: &[u8], zero_copy: Option) -> Op { ops::dispatch_all(self, control, zero_copy, self.dispatch_selector) } } diff --git a/core/core.d.ts b/core/core.d.ts index 0c530e343e..b1d1ac57f4 100644 --- a/core/core.d.ts +++ b/core/core.d.ts @@ -4,13 +4,15 @@ // Deno core. These are not intended to be used directly by runtime users of // Deno and therefore do not flow through to the runtime type library. -declare type MessageCallback = (promiseId: number, msg: Uint8Array) => void; +declare interface MessageCallback { + (msg: Uint8Array): void; +} declare interface DenoCore { dispatch( control: Uint8Array, zeroCopy?: ArrayBufferView | null - ): Uint8Array | null | number; + ): Uint8Array | null; setAsyncHandler(cb: MessageCallback): void; sharedQueue: { head(): number; diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 7e678aecdd..8eb764b55d 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -13,6 +13,7 @@ const responseBuf = new Uint8Array( .map(c => c.charCodeAt(0)) ); const promiseMap = new Map(); +let nextPromiseId = 1; function assert(cond) { if (!cond) { @@ -36,8 +37,8 @@ const scratchBytes = new Uint8Array( ); assert(scratchBytes.byteLength === 4 * 4); -function send(isSync, opId, arg, zeroCopy = null) { - scratch32[0] = isSync; +function send(promiseId, opId, arg, zeroCopy = null) { + scratch32[0] = promiseId; scratch32[1] = opId; scratch32[2] = arg; scratch32[3] = -1; @@ -46,9 +47,10 @@ function send(isSync, opId, arg, zeroCopy = null) { /** Returns Promise */ function sendAsync(opId, arg, zeroCopy = null) { - const promiseId = send(false, opId, arg, zeroCopy); + const promiseId = nextPromiseId++; const p = createResolvable(); promiseMap.set(promiseId, p); + send(promiseId, opId, arg, zeroCopy); return p; } @@ -56,7 +58,7 @@ function recordFromBuf(buf) { assert(buf.byteLength === 16); const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); return { - isSync: !!buf32[0], + promiseId: buf32[0], opId: buf32[1], arg: buf32[2], result: buf32[3] @@ -65,14 +67,14 @@ function recordFromBuf(buf) { /** Returns i32 number */ function sendSync(opId, arg) { - const buf = send(true, opId, arg); + const buf = send(0, opId, arg); const record = recordFromBuf(buf); return record.result; } -function handleAsyncMsgFromRust(promiseId, buf) { +function handleAsyncMsgFromRust(buf) { const record = recordFromBuf(buf); - const { result } = record; + const { promiseId, result } = record; const p = promiseMap.get(promiseId); promiseMap.delete(promiseId); p.resolve(result); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 0f0cd6a4bc..e8c5ec1b73 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -44,7 +44,7 @@ const OP_CLOSE: i32 = 5; #[derive(Clone, Debug, PartialEq)] pub struct Record { - pub is_sync: i32, + pub promise_id: i32, pub op_id: i32, pub arg: i32, pub result: i32, @@ -52,8 +52,8 @@ pub struct Record { impl Into for Record { fn into(self) -> Buf { - let buf32 = - vec![self.is_sync, self.op_id, self.arg, self.result].into_boxed_slice(); + let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result] + .into_boxed_slice(); let ptr = Box::into_raw(buf32) as *mut [u8; 16]; unsafe { Box::from_raw(ptr) } } @@ -65,7 +65,7 @@ impl From<&[u8]> for Record { let ptr = s.as_ptr() as *const i32; let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; Record { - is_sync: ints[0], + promise_id: ints[0], op_id: ints[1], arg: ints[2], result: ints[3], @@ -81,7 +81,7 @@ impl From for Record { let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; assert_eq!(ints.len(), 4); Record { - is_sync: ints[0], + promise_id: ints[0], op_id: ints[1], arg: ints[2], result: ints[3], @@ -92,7 +92,7 @@ impl From for Record { #[test] fn test_record_from() { let r = Record { - is_sync: 1, + promise_id: 1, op_id: 2, arg: 3, result: 4, @@ -111,9 +111,9 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future + Send; -fn dispatch(control: &[u8], zero_copy_buf: Option) -> CoreOp { +fn dispatch(control: &[u8], zero_copy_buf: Option) -> Op { let record = Record::from(control); - let is_sync = record.is_sync == 1; + let is_sync = record.promise_id == 0; let http_bench_op = match record.op_id { OP_LISTEN => { assert!(is_sync); diff --git a/core/isolate.rs b/core/isolate.rs index ae14c00407..14e1b88aad 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -21,29 +21,21 @@ use futures::Async::*; use futures::Future; use futures::Poll; use libc::c_char; -use libc::c_int; use libc::c_void; use std::ffi::CStr; use std::ffi::CString; use std::ptr::null; -use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture = Box + Send>; +pub type OpAsyncFuture = Box + Send>; -pub enum Op { +pub enum Op { Sync(Buf), - Async(OpAsyncFuture), + Async(OpAsyncFuture), } -pub type CoreError = (); - -type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>; - -pub type CoreOp = Op; - /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -76,9 +68,7 @@ pub enum StartupData<'a> { None, } -pub type OpResult = Result, E>; - -type CoreDispatchFn = Fn(&[u8], Option) -> CoreOp; +type DispatchFn = Fn(&[u8], Option) -> Op; pub type DynImportFuture = Box + Send>; type DynImportFn = Fn(&str, &str) -> DynImportFuture; @@ -103,12 +93,6 @@ impl Future for DynImport { } } -enum ResponseData { - None, - Buffer(deno_buf), - PromiseId(c_int), -} - /// A single execution context of JavaScript. Corresponds roughly to the "Web /// Worker" concept in the DOM. An Isolate is a Future that can be used with /// Tokio. The Isolate future complete when there is an error or when all @@ -120,15 +104,14 @@ enum ResponseData { pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc>>, - dispatch: Option>, + dispatch: Option>, dyn_import: Option>, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered, + pending_ops: FuturesUnordered, pending_dyn_imports: FuturesUnordered, have_unpolled_ops: bool, startup_script: Option, - next_promise_id: AtomicI32, } unsafe impl Send for Isolate {} @@ -193,7 +176,6 @@ impl Isolate { have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, - next_promise_id: AtomicI32::new(1), } } @@ -202,7 +184,7 @@ impl Isolate { /// corresponds to the second argument of Deno.core.dispatch(). pub fn set_dispatch(&mut self, f: F) where - F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, + F: Fn(&[u8], Option) -> Op + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -257,10 +239,6 @@ impl Isolate { } } - pub fn get_next_promise_id(&self) -> i32 { - self.next_promise_id.fetch_add(1, Ordering::SeqCst) - } - extern "C" fn pre_dispatch( user_data: *mut c_void, control_argv0: deno_buf, @@ -301,17 +279,9 @@ impl Isolate { // return value. // TODO(ry) check that if JSError thrown during respond(), that it will be // picked up. - let _ = - isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref()))); + let _ = isolate.respond(Some(&buf)); } Op::Async(fut) => { - let promise_id = isolate.get_next_promise_id(); - let _ = isolate.respond(ResponseData::PromiseId(promise_id)); - let fut = Box::new(fut.and_then( - move |buf| -> Result<(c_int, Buf), CoreError> { - Ok((promise_id, buf)) - }, - )); isolate.pending_ops.push(fut); isolate.have_unpolled_ops = true; } @@ -370,34 +340,14 @@ impl Isolate { } } - // the result type is a placeholder for a more specific enum type - fn respond(&mut self, data: ResponseData) -> Result<(), JSError> { - match data { - ResponseData::PromiseId(pid) => unsafe { - libdeno::deno_respond( - self.libdeno_isolate, - self.as_raw_ptr(), - deno_buf::empty(), - &pid, - ) - }, - ResponseData::Buffer(r) => unsafe { - libdeno::deno_respond( - self.libdeno_isolate, - self.as_raw_ptr(), - r, - null(), - ) - }, - ResponseData::None => unsafe { - libdeno::deno_respond( - self.libdeno_isolate, - self.as_raw_ptr(), - deno_buf::empty(), - null(), - ) - }, + fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> { + let buf = match maybe_buf { + None => deno_buf::empty(), + Some(r) => deno_buf::from(r), }; + unsafe { + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) + } if let Some(err) = self.last_exception() { Err(err) } else { @@ -575,7 +525,7 @@ impl Future for Isolate { self.shared_init(); - let mut overflow_response: Option<(c_int, Buf)> = None; + let mut overflow_response: Option = None; loop { // If there are any pending dyn_import futures, do those first. @@ -596,13 +546,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(op))) => { - let successful_push = self.shared.push(op.0, &op.1); + Ok(Ready(Some(buf))) => { + let successful_push = self.shared.push(&buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(op); + overflow_response = Some(buf); break; } } @@ -610,16 +560,14 @@ impl Future for Isolate { } if self.shared.size() > 0 { - self.respond(ResponseData::None)?; + self.respond(None)?; // The other side should have shifted off all the messages. assert_eq!(self.shared.size(), 0); } if overflow_response.is_some() { - let op = overflow_response.take().unwrap(); - let promise_id_bytes = op.0.to_be_bytes(); - let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into(); - self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?; + let buf = overflow_response.take().unwrap(); + self.respond(Some(&buf))?; } self.check_promise_errors(); @@ -716,7 +664,7 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |control, _| -> CoreOp { + isolate.set_dispatch(move |control, _| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { @@ -886,7 +834,7 @@ pub mod tests { "setup2.js", r#" let nrecv = 0; - Deno.core.setAsyncHandler((promiseId, buf) => { + Deno.core.setAsyncHandler((buf) => { assert(buf.byteLength === 1); assert(buf[0] === 43); nrecv++; @@ -1077,7 +1025,7 @@ pub mod tests { "overflow_req_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(control); @@ -1099,7 +1047,7 @@ pub mod tests { "overflow_res_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); @@ -1120,7 +1068,7 @@ pub mod tests { "overflow_req_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((cmdId, buf) => { + Deno.core.setAsyncHandler((buf) => { assert(buf.byteLength === 1); assert(buf[0] === 43); asyncRecv++; @@ -1128,8 +1076,8 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(control); - // Async messages always have number type response. - assert(typeof response == "number"); + // Async messages always have null response. + assert(response == null); assert(asyncRecv == 0); "#, )); @@ -1149,7 +1097,7 @@ pub mod tests { "overflow_res_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((cmdId, buf) => { + Deno.core.setAsyncHandler((buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; @@ -1157,7 +1105,7 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); - assert(typeof response == "number"); + assert(response == null); assert(asyncRecv == 0); "#, )); @@ -1177,7 +1125,7 @@ pub mod tests { "overflow_res_multiple_dispatch_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((cmdId, buf) => { + Deno.core.setAsyncHandler((buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; @@ -1185,7 +1133,7 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); - assert(typeof response == "number"); + assert(response == null); assert(asyncRecv == 0); // Dispatch another message to verify that pending ops // are done even if shared space overflows diff --git a/core/libdeno.rs b/core/libdeno.rs index a17a8e521e..84f21e89e7 100644 --- a/core/libdeno.rs +++ b/core/libdeno.rs @@ -267,7 +267,6 @@ extern "C" { i: *const isolate, user_data: *const c_void, buf: deno_buf, - promise_id: *const c_int, ); pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf); pub fn deno_execute( diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc index 30f82b6cca..8a3a561566 100644 --- a/core/libdeno/api.cc +++ b/core/libdeno/api.cc @@ -153,15 +153,11 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) { auto _ = deno::PinnedBuf(buf); } -void deno_respond(Deno* d_, void* user_data, deno_buf buf, int* promise_id) { +void deno_respond(Deno* d_, void* user_data, deno_buf buf) { auto* d = unwrap(d_); if (d->current_args_ != nullptr) { // Synchronous response. - if (promise_id != nullptr) { - auto number = v8::Number::New(d->isolate_, *promise_id); - d->current_args_->GetReturnValue().Set(number); - } else { - CHECK_NOT_NULL(buf.data_ptr); + if (buf.data_ptr != nullptr) { auto ab = deno::ImportBuf(d, buf); d->current_args_->GetReturnValue().Set(ab); } diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h index 4f29f2c7a8..7452855546 100644 --- a/core/libdeno/deno.h +++ b/core/libdeno/deno.h @@ -81,10 +81,8 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, // deno_respond sends up to one message back for every deno_recv_cb made. // // If this is called during deno_recv_cb, the issuing libdeno.send() in -// javascript will synchronously return the specified promise_id(number) -// or buf(Uint8Array) (or null if buf and promise_id are both null/empty). -// Calling with non-null for both buf and promise_id will result in the -// promise_id being returned. +// javascript will synchronously return the specified buf as an ArrayBuffer (or +// null if buf is empty). // // If this is called after deno_recv_cb has returned, the deno_respond // will call into the JS callback specified by libdeno.recv(). @@ -94,7 +92,7 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, // releasing its memory.) // // If a JS exception was encountered, deno_last_exception() will be non-NULL. -void deno_respond(Deno* d, void* user_data, deno_buf buf, int* promise_id); +void deno_respond(Deno* d, void* user_data, deno_buf buf); // consumes zero_copy void deno_pinned_buf_delete(deno_pinned_buf* buf); diff --git a/core/libdeno/libdeno.d.ts b/core/libdeno/libdeno.d.ts index 093e846ab1..1bc7367d95 100644 --- a/core/libdeno/libdeno.d.ts +++ b/core/libdeno/libdeno.d.ts @@ -12,13 +12,14 @@ interface EvalErrorInfo { thrown: any; } -declare type MessageCallbackInternal = (msg: Uint8Array) => void; +declare interface MessageCallback { + (msg: Uint8Array): void; +} declare interface DenoCore { - recv(cb: MessageCallbackInternal): void; + recv(cb: MessageCallback): void; send( - cmdId: number, control: null | ArrayBufferView, data?: ArrayBufferView ): null | Uint8Array; diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc index b72a8e0982..485c95bff2 100644 --- a/core/libdeno/libdeno_test.cc +++ b/core/libdeno/libdeno_test.cc @@ -75,7 +75,7 @@ TEST(LibDenoTest, RecvReturnBar) { EXPECT_EQ(buf.data_ptr[1], 'b'); EXPECT_EQ(buf.data_ptr[2], 'c'); uint8_t response[] = {'b', 'a', 'r'}; - deno_respond(d, user_data, {response, sizeof response}, nullptr); + deno_respond(d, user_data, {response, sizeof response}); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr}); deno_execute(d, d, "a.js", "RecvReturnBar()"); diff --git a/core/shared_queue.js b/core/shared_queue.js index b413f011ef..75f370ce44 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -151,27 +151,14 @@ SharedQueue Binary Layout function handleAsyncMsgFromRust(buf) { if (buf) { - handleAsyncMsgFromRustInner(buf); + asyncHandler(buf); } else { while ((buf = shift()) != null) { - handleAsyncMsgFromRustInner(buf); + asyncHandler(buf); } } } - function handleAsyncMsgFromRustInner(buf) { - // DataView to extract cmdId value. - const dataView = new DataView(buf.buffer, buf.byteOffset, 4); - const promiseId = dataView.getInt32(0); - // Uint8 buffer view shifted right and shortened 4 bytes to remove cmdId from view window. - const bufViewFinal = new Uint8Array( - buf.buffer, - buf.byteOffset + 4, - buf.byteLength - 4 - ); - asyncHandler(promiseId, bufViewFinal); - } - function dispatch(control, zeroCopy = null) { maybeInit(); // First try to push control to shared. diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 1460fb1723..c33a37b90a 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -17,7 +17,6 @@ SharedQueue Binary Layout */ use crate::libdeno::deno_buf; -use libc::c_int; const MAX_RECORDS: usize = 100; /// Total number of records added. @@ -153,19 +152,17 @@ impl SharedQueue { Some(&self.bytes[off..end]) } - pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool { + pub fn push(&mut self, record: &[u8]) -> bool { let off = self.head(); - let end = off + record.len() + 4; + let end = off + record.len(); let index = self.num_records(); if end > self.bytes.len() || index >= MAX_RECORDS { debug!("WARNING the sharedQueue overflowed"); return false; } self.set_end(index, end); - assert_eq!(end - off, record.len() + 4); - let pid_bytes = promise_id.to_be_bytes(); - self.bytes[off..off + 4].copy_from_slice(&pid_bytes); - self.bytes[off + 4..end].copy_from_slice(record); + assert_eq!(end - off, record.len()); + self.bytes[off..end].copy_from_slice(record); let u32_slice = self.as_u32_slice_mut(); u32_slice[INDEX_NUM_RECORDS] += 1; u32_slice[INDEX_HEAD] = end as u32; @@ -192,30 +189,30 @@ mod tests { assert!(h > 0); let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); - let len = r.len() + h + 4; - assert!(q.push(1, &r)); + let len = r.len() + h; + assert!(q.push(&r)); assert_eq!(q.head(), len); let r = vec![6, 7].into_boxed_slice(); - assert!(q.push(1, &r)); + assert!(q.push(&r)); let r = vec![8, 9, 10, 11].into_boxed_slice(); - assert!(q.push(1, &r)); + assert!(q.push(&r)); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 3); let r = q.shift().unwrap(); - assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice()); + assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 2); let r = q.shift().unwrap(); - assert_eq!(&r[4..], vec![6, 7].as_slice()); + assert_eq!(r, vec![6, 7].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 1); let r = q.shift().unwrap(); - assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice()); + assert_eq!(r, vec![8, 9, 10, 11].as_slice()); assert_eq!(q.num_records(), 0); assert_eq!(q.size(), 0); @@ -235,19 +232,19 @@ mod tests { #[test] fn overflow() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2)))); + assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1))); assert_eq!(q.size(), 1); - assert!(!q.push(1, &alloc_buf(2))); + assert!(!q.push(&alloc_buf(2))); assert_eq!(q.size(), 1); - assert!(q.push(1, &alloc_buf(1))); + assert!(q.push(&alloc_buf(1))); assert_eq!(q.size(), 2); - assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4); + assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1); assert_eq!(q.size(), 1); - assert!(!q.push(1, &alloc_buf(1))); + assert!(!q.push(&alloc_buf(1))); - assert_eq!(q.shift().unwrap().len(), 1 + 4); + assert_eq!(q.shift().unwrap().len(), 1); assert_eq!(q.size(), 0); } @@ -255,11 +252,11 @@ mod tests { fn full_records() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); for _ in 0..MAX_RECORDS { - assert!(q.push(1, &alloc_buf(1))) + assert!(q.push(&alloc_buf(1))) } - assert_eq!(q.push(1, &alloc_buf(1)), false); + assert_eq!(q.push(&alloc_buf(1)), false); // Even if we shift one off, we still cannot push a new record. - assert_eq!(q.shift().unwrap().len(), 1 + 4); - assert_eq!(q.push(1, &alloc_buf(1)), false); + assert_eq!(q.shift().unwrap().len(), 1); + assert_eq!(q.push(&alloc_buf(1)), false); } } diff --git a/js/dispatch.ts b/js/dispatch.ts index 36f97363f5..0c6e707091 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -5,30 +5,36 @@ import * as msg from "gen/cli/msg_generated"; import * as errors from "./errors"; import * as util from "./util"; import { + nextPromiseId, recordFromBufMinimal, handleAsyncMsgFromRustMinimal } from "./dispatch_minimal"; const promiseTable = new Map>(); -function flatbufferRecordFromBuf(buf: Uint8Array): msg.Base { - const bb = new flatbuffers.ByteBuffer(buf); - const base = msg.Base.getRootAsBase(bb); - return base; +interface FlatbufferRecord { + promiseId: number; + base: msg.Base; } -export function handleAsyncMsgFromRust( - promiseId: number, - ui8: Uint8Array -): void { +function flatbufferRecordFromBuf(buf: Uint8Array): FlatbufferRecord { + const bb = new flatbuffers.ByteBuffer(buf); + const base = msg.Base.getRootAsBase(bb); + return { + promiseId: base.cmdId(), + base + }; +} + +export function handleAsyncMsgFromRust(ui8: Uint8Array): void { const buf32 = new Int32Array(ui8.buffer, ui8.byteOffset, ui8.byteLength / 4); const recordMin = recordFromBufMinimal(buf32); if (recordMin) { // Fast and new - handleAsyncMsgFromRustMinimal(promiseId, ui8, recordMin); + handleAsyncMsgFromRustMinimal(ui8, recordMin); } else { // Legacy - let base = flatbufferRecordFromBuf(ui8); + let { promiseId, base } = flatbufferRecordFromBuf(ui8); const promise = promiseTable.get(promiseId); util.assert(promise != null, `Expecting promise in table. ${promiseId}`); promiseTable.delete(promiseId); @@ -50,26 +56,14 @@ function sendInternal( innerType: msg.Any, inner: flatbuffers.Offset, zeroCopy: undefined | ArrayBufferView, - isSync: true -): Uint8Array | null; -function sendInternal( - builder: flatbuffers.Builder, - innerType: msg.Any, - inner: flatbuffers.Offset, - zeroCopy: undefined | ArrayBufferView, - isSync: false -): Promise; -function sendInternal( - builder: flatbuffers.Builder, - innerType: msg.Any, - inner: flatbuffers.Offset, - zeroCopy: undefined | ArrayBufferView, - isSync: boolean -): Promise | Uint8Array | null { + sync = true +): [number, null | Uint8Array] { + const cmdId = nextPromiseId(); msg.Base.startBase(builder); - msg.Base.addSync(builder, isSync); msg.Base.addInner(builder, inner); msg.Base.addInnerType(builder, innerType); + msg.Base.addSync(builder, sync); + msg.Base.addCmdId(builder, cmdId); builder.finish(msg.Base.endBase(builder)); const control = builder.asUint8Array(); @@ -80,25 +74,7 @@ function sendInternal( ); builder.inUse = false; - - if (typeof response === "number") { - const promise = util.createResolvable(); - promiseTable.set(response, promise); - util.assert(!isSync); - return promise; - } else { - if (!isSync) { - util.assert(response !== null); - const base = flatbufferRecordFromBuf(response as Uint8Array); - const err = errors.maybeError(base); - if (err != null) { - return Promise.reject(err); - } else { - return Promise.resolve(base); - } - } - return response; - } + return [cmdId, response]; } // @internal @@ -108,7 +84,16 @@ export function sendAsync( inner: flatbuffers.Offset, data?: ArrayBufferView ): Promise { - const promise = sendInternal(builder, innerType, inner, data, false); + const [cmdId, response] = sendInternal( + builder, + innerType, + inner, + data, + false + ); + util.assert(response == null); // null indicates async. + const promise = util.createResolvable(); + promiseTable.set(cmdId, promise); return promise; } @@ -119,8 +104,10 @@ export function sendSync( inner: flatbuffers.Offset, data?: ArrayBufferView ): null | msg.Base { - const response = sendInternal(builder, innerType, inner, data, true); - if (response == null || response.length === 0) { + const [cmdId, response] = sendInternal(builder, innerType, inner, data, true); + util.assert(cmdId >= 0); + util.assert(response != null); // null indicates async. + if (response!.length === 0) { return null; } else { const bb = new flatbuffers.ByteBuffer(response!); diff --git a/js/dispatch_minimal.ts b/js/dispatch_minimal.ts index bf9065f566..17d3281107 100644 --- a/js/dispatch_minimal.ts +++ b/js/dispatch_minimal.ts @@ -5,8 +5,14 @@ import { core } from "./core"; const DISPATCH_MINIMAL_TOKEN = 0xcafe; const promiseTableMin = new Map>(); +let _nextPromiseId = 0; + +export function nextPromiseId(): number { + return _nextPromiseId++; +} export interface RecordMinimal { + promiseId: number; opId: number; arg: number; result: number; @@ -22,9 +28,10 @@ export function hasMinimalToken(i32: Int32Array): boolean { export function recordFromBufMinimal(buf32: Int32Array): null | RecordMinimal { if (hasMinimalToken(buf32)) { return { - opId: buf32[1], - arg: buf32[2], - result: buf32[3] + promiseId: buf32[1], + opId: buf32[2], + arg: buf32[3], + result: buf32[4] }; } return null; @@ -39,13 +46,12 @@ const scratchBytes = new Uint8Array( util.assert(scratchBytes.byteLength === scratch32.length * 4); export function handleAsyncMsgFromRustMinimal( - promiseId: number, ui8: Uint8Array, record: RecordMinimal ): void { // Fast and new util.log("minimal handleAsyncMsgFromRust ", ui8.length); - const { result } = record; + const { promiseId, result } = record; const promise = promiseTableMin.get(promiseId); promiseTableMin.delete(promiseId); promise!.resolve(result); @@ -56,16 +62,16 @@ export function sendAsyncMinimal( arg: number, zeroCopy: Uint8Array ): Promise { + const promiseId = nextPromiseId(); // AKA cmdId + scratch32[0] = DISPATCH_MINIMAL_TOKEN; - scratch32[1] = opId; - scratch32[2] = arg; - - const promiseId = core.dispatch(scratchBytes, zeroCopy); - - util.assert(typeof promiseId == "number"); + scratch32[1] = promiseId; + scratch32[2] = opId; + scratch32[3] = arg; const promise = util.createResolvable(); - promiseTableMin.set(promiseId as number, promise); + promiseTableMin.set(promiseId, promise); + core.dispatch(scratchBytes, zeroCopy); return promise; }