diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 0f23a2bb56..080b94a1d0 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -892,15 +892,13 @@ unitTest( respondPromise, ]); + httpConn.close(); listener.close(); assert(errors.length >= 1); for (const error of errors) { assertEquals(error.name, "Http"); - assertEquals( - error.message, - "connection closed before message completed", - ); + assert(error.message.includes("connection")); } }, ); @@ -975,44 +973,29 @@ unitTest( unitTest( { permissions: { net: true } }, async function droppedConnSenderNoPanic() { - async function server(listener: Deno.Listener) { + async function server() { + const listener = Deno.listen({ port: 8000 }); const conn = await listener.accept(); const http = Deno.serveHttp(conn); - - for (;;) { - const req = await http.nextRequest(); - if (req == null) break; - - nextloop() - .then(() => { - http.close(); - return req.respondWith(new Response("boom")); - }) - .catch(() => {}); - } - + const evt = await http.nextRequest(); + http.close(); try { - http.close(); + await evt!.respondWith(new Response("boom")); } catch { - "nop"; + // Ignore error. } - listener.close(); } async function client() { - const resp = await fetch("http://127.0.0.1:8000/"); - await resp.body?.cancel(); + try { + const resp = await fetch("http://127.0.0.1:8000/"); + await resp.body?.cancel(); + } catch { + // Ignore error + } } - function nextloop() { - return new Promise((resolve) => setTimeout(resolve, 0)); - } - - async function main() { - const listener = Deno.listen({ port: 8000 }); - await Promise.all([server(listener), client()]); - } - await main(); + await Promise.all([server(), client()]); }, ); diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts index a1585ce6b5..eabe26c84d 100644 --- a/cli/tests/unit/net_test.ts +++ b/cli/tests/unit/net_test.ts @@ -117,10 +117,10 @@ unitTest( const listener = Deno.listen({ port: 4501 }); const p = listener.accept(); listener.close(); + // TODO(piscisaureus): the error type should be `Interrupted` here, which + // gets thrown, but then ext/net catches it and rethrows `BadResource`. await assertRejects( - async () => { - await p; - }, + () => p, Deno.errors.BadResource, "Listener has been closed", ); @@ -141,11 +141,8 @@ unitTest( const p = listener.accept(); listener.close(); await assertRejects( - async () => { - await p; - }, - Deno.errors.BadResource, - "Listener has been closed", + () => p, + Deno.errors.Interrupted, ); }, ); @@ -173,27 +170,29 @@ unitTest( }, ); -// TODO(jsouto): Enable when tokio updates mio to v0.7! unitTest( - { ignore: true, permissions: { read: true, write: true } }, + { + ignore: Deno.build.os === "windows", + permissions: { read: true, write: true }, + }, async function netUnixConcurrentAccept() { const filePath = await Deno.makeTempFile(); const listener = Deno.listen({ transport: "unix", path: filePath }); let acceptErrCount = 0; const checkErr = (e: Error) => { - if (e.message === "Listener has been closed") { + if (e instanceof Deno.errors.Interrupted) { // "operation canceled" assertEquals(acceptErrCount, 1); - } else if (e.message === "Another accept task is ongoing") { + } else if (e instanceof Deno.errors.Busy) { // "Listener already in use" acceptErrCount++; } else { - throw new Error("Unexpected error message"); + throw e; } }; const p = listener.accept().catch(checkErr); const p1 = listener.accept().catch(checkErr); await Promise.race([p, p1]); listener.close(); - await [p, p1]; + await Promise.all([p, p1]); assertEquals(acceptErrCount, 1); }, ); @@ -500,11 +499,7 @@ unitTest( ); unitTest( - { - // FIXME(bartlomieju) - ignore: true, - permissions: { net: true }, - }, + { permissions: { net: true } }, async function netListenAsyncIterator() { const addr = { hostname: "127.0.0.1", port: 3500 }; const listener = Deno.listen(addr); @@ -590,8 +585,8 @@ unitTest( await conn.write(new Uint8Array([1, 2, 3])); } } catch (err) { - assert(!!err); - assert(err instanceof Deno.errors.BadResource); + assert(err); + assert(err instanceof Deno.errors.Interrupted); } } diff --git a/core/lib.rs b/core/lib.rs index ea7b322e25..c0419f8abd 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -68,6 +68,7 @@ pub use crate::normalize_path::normalize_path; pub use crate::ops::serialize_op_result; pub use crate::ops::Op; pub use crate::ops::OpAsyncFuture; +pub use crate::ops::OpCall; pub use crate::ops::OpFn; pub use crate::ops::OpId; pub use crate::ops::OpPayload; diff --git a/core/modules.rs b/core/modules.rs index 2af09057fd..31e03196a4 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -722,6 +722,7 @@ impl ModuleMap { #[cfg(test)] mod tests { use super::*; + use crate::ops::OpCall; use crate::serialize_op_result; use crate::JsRuntime; use crate::Op; @@ -1009,7 +1010,7 @@ mod tests { let (control, _): (u8, ()) = payload.deserialize().unwrap(); assert_eq!(control, 42); let resp = (0, 1, serialize_op_result(Ok(43), state)); - Op::Async(Box::pin(futures::future::ready(resp))) + Op::Async(OpCall::ready(resp)) }; let mut runtime = JsRuntime::new(RuntimeOptions { diff --git a/core/ops.rs b/core/ops.rs index 80bb30edaa..05b91f32f3 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -6,6 +6,11 @@ use crate::gotham_state::GothamState; use crate::ops_metrics::OpsTracker; use crate::resources::ResourceTable; use crate::runtime::GetErrorClassFn; +use futures::future::maybe_done; +use futures::future::FusedFuture; +use futures::future::MaybeDone; +use futures::ready; +use futures::task::noop_waker; use futures::Future; use indexmap::IndexMap; use rusty_v8 as v8; @@ -17,10 +22,67 @@ use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; use std::rc::Rc; +use std::task::Context; +use std::task::Poll; + +/// Wrapper around a Future, which causes that Future to be polled immediately. +/// (Background: ops are stored in a `FuturesUnordered` structure which polls +/// them, but without the `OpCall` wrapper this doesn't happen until the next +/// turn of the event loop, which is too late for certain ops.) +pub struct OpCall(MaybeDone>>>); + +impl OpCall { + /// Wraps a future, and polls the inner future immediately. + /// This should be the default choice for ops. + pub fn eager(fut: impl Future + 'static) -> Self { + let boxed = Box::pin(fut) as Pin>>; + let mut inner = maybe_done(boxed); + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let mut pinned = Pin::new(&mut inner); + let _ = pinned.as_mut().poll(&mut cx); + Self(inner) + } + + /// Wraps a future; the inner future is polled the usual way (lazily). + pub fn lazy(fut: impl Future + 'static) -> Self { + let boxed = Box::pin(fut) as Pin>>; + let inner = maybe_done(boxed); + Self(inner) + } + + /// Create a future by specifying its output. This is basically the same as + /// `async { value }` or `futures::future::ready(value)`. + pub fn ready(value: T) -> Self { + Self(MaybeDone::Done(value)) + } +} + +impl Future for OpCall { + type Output = T; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let inner = unsafe { &mut self.get_unchecked_mut().0 }; + let mut pinned = Pin::new(inner); + ready!(pinned.as_mut().poll(cx)); + Poll::Ready(pinned.as_mut().take_output().unwrap()) + } +} + +impl FusedFuture for OpCall +where + F: Future, +{ + fn is_terminated(&self) -> bool { + self.0.is_terminated() + } +} pub type PromiseId = u64; -pub type OpAsyncFuture = - Pin>>; +pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>; pub type OpFn = dyn Fn(Rc>, OpPayload) -> Op + 'static; pub type OpId = usize; diff --git a/core/ops_json.rs b/core/ops_json.rs index 0ca7e5ce4e..dca9a9a773 100644 --- a/core/ops_json.rs +++ b/core/ops_json.rs @@ -1,6 +1,7 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. use crate::error::AnyError; +use crate::ops::OpCall; use crate::serialize_op_result; use crate::Op; use crate::OpFn; @@ -35,7 +36,7 @@ pub fn void_op_async() -> Box { let op_id = payload.op_id; let pid = payload.promise_id; let op_result = serialize_op_result(Ok(()), state); - Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result)))) + Op::Async(OpCall::ready((pid, op_id, op_result))) }) } @@ -127,7 +128,7 @@ where use crate::futures::FutureExt; let fut = op_fn(state.clone(), a, b) .map(move |result| (pid, op_id, serialize_op_result(result, state))); - Op::Async(Box::pin(fut)) + Op::Async(OpCall::eager(fut)) }) } @@ -159,7 +160,7 @@ where use crate::futures::FutureExt; let fut = op_fn(state.clone(), a, b) .map(move |result| (pid, op_id, serialize_op_result(result, state))); - Op::AsyncUnref(Box::pin(fut)) + Op::AsyncUnref(OpCall::eager(fut)) }) } diff --git a/core/runtime.rs b/core/runtime.rs index 1928ff31c7..873dcd3f5f 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -28,7 +28,6 @@ use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; -use futures::Future; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; @@ -36,7 +35,6 @@ use std::convert::TryFrom; use std::ffi::c_void; use std::mem::forget; use std::option::Option; -use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::sync::Mutex; @@ -44,8 +42,7 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = - Pin>>; +type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>; pub enum Snapshot { Static(&'static [u8]), @@ -1613,6 +1610,7 @@ pub mod tests { use crate::ZeroCopyBuf; use futures::future::lazy; use std::ops::FnOnce; + use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -1645,16 +1643,15 @@ pub mod tests { Mode::Async => { assert_eq!(control, 42); let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state)); - Op::Async(Box::pin(futures::future::ready(resp))) + Op::Async(OpCall::ready(resp)) } Mode::AsyncZeroCopy(has_buffer) => { assert_eq!(buf.is_some(), has_buffer); if let Some(buf) = buf { assert_eq!(buf.len(), 1); } - - let resp = serialize_op_result(Ok(43), rc_op_state); - Op::Async(Box::pin(futures::future::ready((0, 1, resp)))) + let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state)); + Op::Async(OpCall::ready(resp)) } } } diff --git a/ext/net/01_net.js b/ext/net/01_net.js index cc10a1c0ab..39df5a9d4c 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -3,7 +3,7 @@ ((window) => { const core = window.Deno.core; - const { BadResource } = core; + const { BadResource, Interrupted } = core; const { PromiseResolve, SymbolAsyncIterator, @@ -124,7 +124,7 @@ try { conn = await this.accept(); } catch (error) { - if (error instanceof BadResource) { + if (error instanceof BadResource || error instanceof Interrupted) { return { value: undefined, done: true }; } throw error; @@ -191,7 +191,7 @@ try { yield await this.receive(); } catch (err) { - if (err instanceof BadResource) { + if (err instanceof BadResource || err instanceof Interrupted) { break; } throw err; diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index c02b373d66..1430d8327a 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -10,6 +10,7 @@ ((window) => { const core = window.Deno.core; + const { Interrupted } = core; const webidl = window.__bootstrap.webidl; const { setEventTargetData } = window.__bootstrap.eventTarget; const { defineEventHandler } = window.__bootstrap.event; @@ -134,10 +135,16 @@ this[_enabled] = true; while (true) { if (this[_id] === null) break; - const data = await core.opAsync( - "op_message_port_recv_message", - this[_id], - ); + let data; + try { + data = await core.opAsync( + "op_message_port_recv_message", + this[_id], + ); + } catch (err) { + if (err instanceof Interrupted) break; + throw err; + } if (data === null) break; let message, transferables; try {