diff --git a/Cargo.lock b/Cargo.lock index 6ae7e6b81f..665a0901ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1095,6 +1095,7 @@ dependencies = [ "mime", "percent-encoding", "phf", + "pin-project", "ring", "serde", "tokio", diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 3475513620..73bf07b68c 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -2614,6 +2614,129 @@ Deno.test({ }, }); +async function httpServerWithErrorBody( + listener: Deno.Listener, + compression: boolean, +): Promise { + const conn = await listener.accept(); + listener.close(); + const httpConn = Deno.serveHttp(conn); + const e = await httpConn.nextRequest(); + assert(e); + const { respondWith } = e; + const originalErr = new Error("boom"); + const rs = new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([65])); + await delay(1000); + controller.error(originalErr); + }, + }); + const init = compression ? { headers: { "content-type": "text/plain" } } : {}; + const response = new Response(rs, init); + const err = await assertRejects(() => respondWith(response)); + assert(err === originalErr); + return httpConn; +} + +for (const compression of [true, false]) { + Deno.test({ + name: `http server errors stream if response body errors (http/1.1${ + compression ? " + compression" : "" + })`, + permissions: { net: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const listener = Deno.listen({ hostname, port }); + const server = httpServerWithErrorBody(listener, compression); + + const conn = await Deno.connect({ hostname, port }); + const msg = new TextEncoder().encode( + `GET / HTTP/1.1\r\nHost: ${hostname}:${port}\r\n\r\n`, + ); + const nwritten = await conn.write(msg); + assertEquals(nwritten, msg.byteLength); + + const buf = new Uint8Array(1024); + const nread = await conn.read(buf); + assert(nread); + const data = new TextDecoder().decode(buf.subarray(0, nread)); + assert(data.endsWith("1\r\nA\r\n")); + const nread2 = await conn.read(buf); // connection should be closed now because the stream errored + assertEquals(nread2, null); + conn.close(); + + const httpConn = await server; + httpConn.close(); + }, + }); + + Deno.test({ + name: `http server errors stream if response body errors (http/1.1 + fetch${ + compression ? " + compression" : "" + })`, + permissions: { net: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const listener = Deno.listen({ hostname, port }); + const server = httpServerWithErrorBody(listener, compression); + + const resp = await fetch(`http://${hostname}:${port}/`); + assert(resp.body); + const reader = resp.body.getReader(); + const result = await reader.read(); + assert(!result.done); + assertEquals(result.value, new Uint8Array([65])); + const err = await assertRejects(() => reader.read()); + assert(err instanceof TypeError); + assert(err.message.includes("unexpected EOF")); + + const httpConn = await server; + httpConn.close(); + }, + }); + + Deno.test({ + name: `http server errors stream if response body errors (http/2 + fetch${ + compression ? " + compression" : "" + }))`, + permissions: { net: true, read: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const listener = Deno.listenTls({ + hostname, + port, + certFile: "cli/tests/testdata/tls/localhost.crt", + keyFile: "cli/tests/testdata/tls/localhost.key", + alpnProtocols: ["h2"], + }); + const server = httpServerWithErrorBody(listener, compression); + + const caCert = Deno.readTextFileSync("cli/tests/testdata/tls/RootCA.pem"); + const client = Deno.createHttpClient({ caCerts: [caCert] }); + const resp = await fetch(`https://${hostname}:${port}/`, { client }); + client.close(); + assert(resp.body); + const reader = resp.body.getReader(); + const result = await reader.read(); + assert(!result.done); + assertEquals(result.value, new Uint8Array([65])); + const err = await assertRejects(() => reader.read()); + assert(err instanceof TypeError); + assert(err.message.includes("unexpected internal error encountered")); + + const httpConn = await server; + httpConn.close(); + }, + }); +} + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/ext/http/01_http.js b/ext/http/01_http.js index bd740b600c..dfb0f206cf 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -263,6 +263,7 @@ } if (isStreamingResponseBody) { + let success = false; if ( respBody === null || !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) @@ -284,6 +285,7 @@ ); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); readableStreamClose(respBody); // Release JS lock. + success = true; } catch (error) { const connError = httpConn[connErrorSymbol]; if ( @@ -320,13 +322,16 @@ throw error; } } + success = true; } - try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; + if (success) { + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 2f4ae31e64..65cd4ccfef 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r mime = "0.3.16" percent-encoding.workspace = true phf = { version = "0.10", features = ["macros"] } +pin-project.workspace = true ring.workspace = true serde.workspace = true tokio.workspace = true diff --git a/ext/http/lib.rs b/ext/http/lib.rs index af117d3f92..812394d94b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -70,9 +70,12 @@ use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use tokio_util::io::ReaderStream; + +use crate::reader_stream::ExternallyAbortableReaderStream; +use crate::reader_stream::ShutdownHandle; pub mod compressible; +mod reader_stream; pub fn init() -> Extension { Extension::builder() @@ -414,8 +417,11 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender>), - Body(Pin>), - BodyUncompressed(hyper::body::Sender), + Body { + writer: Pin>, + shutdown_handle: ShutdownHandle, + }, + BodyUncompressed(BodyUncompressedSender), Closed, } @@ -425,6 +431,36 @@ impl Default for HttpResponseWriter { } } +struct BodyUncompressedSender(Option); + +impl BodyUncompressedSender { + fn sender(&mut self) -> &mut hyper::body::Sender { + // This is safe because we only ever take the sender out of the option + // inside of the shutdown method. + self.0.as_mut().unwrap() + } + + fn shutdown(mut self) { + // take the sender out of self so that when self is dropped at the end of + // this block, it doesn't get aborted + self.0.take(); + } +} + +impl From for BodyUncompressedSender { + fn from(sender: hyper::body::Sender) -> Self { + BodyUncompressedSender(Some(sender)) + } +} + +impl Drop for BodyUncompressedSender { + fn drop(&mut self) { + if let Some(sender) = self.0.take() { + sender.abort(); + } + } +} + // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -668,14 +704,22 @@ fn http_response( Encoding::Gzip => Box::pin(GzipEncoder::new(writer)), _ => unreachable!(), // forbidden by accepts_compression }; + let (stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); Ok(( - HttpResponseWriter::Body(writer), - Body::wrap_stream(ReaderStream::new(reader)), + HttpResponseWriter::Body { + writer, + shutdown_handle, + }, + Body::wrap_stream(stream), )) } None => { let (body_tx, body_rx) = Body::channel(); - Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx)) + Ok(( + HttpResponseWriter::BodyUncompressed(body_tx.into()), + body_rx, + )) } } } @@ -768,10 +812,10 @@ async fn op_http_write_resource( } match &mut *wr { - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&view).await; + HttpResponseWriter::Body { writer, .. } => { + let mut result = writer.write_all(&view).await; if result.is_ok() { - result = body.flush().await; + result = writer.flush().await; } if let Err(err) = result { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); @@ -784,7 +828,7 @@ async fn op_http_write_resource( } HttpResponseWriter::BodyUncompressed(body) => { let bytes = Bytes::from(view); - if let Err(err) = body.send_data(bytes).await { + if let Err(err) = body.sender().send_data(bytes).await { assert!(err.is_closed()); // Pull up the failure associated with the transport connection instead. http_stream.conn.closed().await?; @@ -813,10 +857,10 @@ async fn op_http_write( match &mut *wr { HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), HttpResponseWriter::Closed => Err(http_error("response already completed")), - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&buf).await; + HttpResponseWriter::Body { writer, .. } => { + let mut result = writer.write_all(&buf).await; if result.is_ok() { - result = body.flush().await; + result = writer.flush().await; } match result { Ok(_) => Ok(()), @@ -833,7 +877,7 @@ async fn op_http_write( } HttpResponseWriter::BodyUncompressed(body) => { let bytes = Bytes::from(buf); - match body.send_data(bytes).await { + match body.sender().send_data(bytes).await { Ok(_) => Ok(()), Err(err) => { assert!(err.is_closed()); @@ -862,17 +906,27 @@ async fn op_http_shutdown( .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; + match wr { + HttpResponseWriter::Body { + mut writer, + shutdown_handle, + } => { + shutdown_handle.shutdown(); + match writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + } } } - } + HttpResponseWriter::BodyUncompressed(body) => { + body.shutdown(); + } + _ => {} + }; Ok(()) } diff --git a/ext/http/reader_stream.rs b/ext/http/reader_stream.rs new file mode 100644 index 0000000000..388b8db814 --- /dev/null +++ b/ext/http/reader_stream.rs @@ -0,0 +1,157 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use bytes::Bytes; +use deno_core::futures::Stream; +use pin_project::pin_project; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; + +/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream]. +/// It is used to bridge between the HTTP response body resource, and +/// `hyper::Body`. The stream has the special property that it errors if the +/// underlying reader is closed before an explicit EOF is sent (in the form of +/// setting the `shutdown` flag to true). +#[pin_project] +pub struct ExternallyAbortableReaderStream { + #[pin] + inner: ReaderStream, + done: Arc, +} + +pub struct ShutdownHandle(Arc); + +impl ShutdownHandle { + pub fn shutdown(&self) { + self.0.store(true, std::sync::atomic::Ordering::SeqCst); + } +} + +impl ExternallyAbortableReaderStream { + pub fn new(reader: R) -> (Self, ShutdownHandle) { + let done = Arc::new(AtomicBool::new(false)); + let this = Self { + inner: ReaderStream::new(reader), + done: done.clone(), + }; + (this, ShutdownHandle(done)) + } +} + +impl Stream for ExternallyAbortableReaderStream { + type Item = std::io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let val = std::task::ready!(this.inner.poll_next(cx)); + match val { + None if this.done.load(Ordering::SeqCst) => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "stream reader has shut down", + )))), + Some(val) => Poll::Ready(Some(val)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use deno_core::futures::StreamExt; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn success() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.write_all(b"world").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + shutdown_handle.shutdown(); + writer.shutdown().await.unwrap(); + drop(writer); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn error() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, _shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + drop(writer); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn error2() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, _shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.shutdown().await.unwrap(); + drop(writer); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn write_after_shutdown() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.write_all(b"world").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + shutdown_handle.shutdown(); + writer.shutdown().await.unwrap(); + + assert!(writer.write_all(b"!").await.is_err()); + + drop(writer); + assert!(stream.next().await.is_none()); + } +}