fix(ext/http): close stream on resp body error (#17126)

Previously, errored streaming response bodies did not cause the HTTP
stream to be aborted. It instead caused the stream to be closed gracefully,
which had the result that the client could not detect the difference
between a successful response and an errored response.

This commit fixes the issue by aborting the stream on error.
This commit is contained in:
Luca Casonato 2022-12-20 09:46:45 +01:00 committed by GitHub
parent 948f85216a
commit 8e947bb674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 369 additions and 28 deletions

1
Cargo.lock generated
View File

@ -1095,6 +1095,7 @@ dependencies = [
"mime",
"percent-encoding",
"phf",
"pin-project",
"ring",
"serde",
"tokio",

View File

@ -2614,6 +2614,129 @@ Deno.test({
},
});
async function httpServerWithErrorBody(
listener: Deno.Listener,
compression: boolean,
): Promise<Deno.HttpConn> {
const conn = await listener.accept();
listener.close();
const httpConn = Deno.serveHttp(conn);
const e = await httpConn.nextRequest();
assert(e);
const { respondWith } = e;
const originalErr = new Error("boom");
const rs = new ReadableStream({
async start(controller) {
controller.enqueue(new Uint8Array([65]));
await delay(1000);
controller.error(originalErr);
},
});
const init = compression ? { headers: { "content-type": "text/plain" } } : {};
const response = new Response(rs, init);
const err = await assertRejects(() => respondWith(response));
assert(err === originalErr);
return httpConn;
}
for (const compression of [true, false]) {
Deno.test({
name: `http server errors stream if response body errors (http/1.1${
compression ? " + compression" : ""
})`,
permissions: { net: true },
async fn() {
const hostname = "localhost";
const port = 4501;
const listener = Deno.listen({ hostname, port });
const server = httpServerWithErrorBody(listener, compression);
const conn = await Deno.connect({ hostname, port });
const msg = new TextEncoder().encode(
`GET / HTTP/1.1\r\nHost: ${hostname}:${port}\r\n\r\n`,
);
const nwritten = await conn.write(msg);
assertEquals(nwritten, msg.byteLength);
const buf = new Uint8Array(1024);
const nread = await conn.read(buf);
assert(nread);
const data = new TextDecoder().decode(buf.subarray(0, nread));
assert(data.endsWith("1\r\nA\r\n"));
const nread2 = await conn.read(buf); // connection should be closed now because the stream errored
assertEquals(nread2, null);
conn.close();
const httpConn = await server;
httpConn.close();
},
});
Deno.test({
name: `http server errors stream if response body errors (http/1.1 + fetch${
compression ? " + compression" : ""
})`,
permissions: { net: true },
async fn() {
const hostname = "localhost";
const port = 4501;
const listener = Deno.listen({ hostname, port });
const server = httpServerWithErrorBody(listener, compression);
const resp = await fetch(`http://${hostname}:${port}/`);
assert(resp.body);
const reader = resp.body.getReader();
const result = await reader.read();
assert(!result.done);
assertEquals(result.value, new Uint8Array([65]));
const err = await assertRejects(() => reader.read());
assert(err instanceof TypeError);
assert(err.message.includes("unexpected EOF"));
const httpConn = await server;
httpConn.close();
},
});
Deno.test({
name: `http server errors stream if response body errors (http/2 + fetch${
compression ? " + compression" : ""
}))`,
permissions: { net: true, read: true },
async fn() {
const hostname = "localhost";
const port = 4501;
const listener = Deno.listenTls({
hostname,
port,
certFile: "cli/tests/testdata/tls/localhost.crt",
keyFile: "cli/tests/testdata/tls/localhost.key",
alpnProtocols: ["h2"],
});
const server = httpServerWithErrorBody(listener, compression);
const caCert = Deno.readTextFileSync("cli/tests/testdata/tls/RootCA.pem");
const client = Deno.createHttpClient({ caCerts: [caCert] });
const resp = await fetch(`https://${hostname}:${port}/`, { client });
client.close();
assert(resp.body);
const reader = resp.body.getReader();
const result = await reader.read();
assert(!result.done);
assertEquals(result.value, new Uint8Array([65]));
const err = await assertRejects(() => reader.read());
assert(err instanceof TypeError);
assert(err.message.includes("unexpected internal error encountered"));
const httpConn = await server;
httpConn.close();
},
});
}
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);

View File

@ -263,6 +263,7 @@
}
if (isStreamingResponseBody) {
let success = false;
if (
respBody === null ||
!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody)
@ -284,6 +285,7 @@
);
if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
readableStreamClose(respBody); // Release JS lock.
success = true;
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (
@ -320,13 +322,16 @@
throw error;
}
}
success = true;
}
try {
await core.opAsync("op_http_shutdown", streamRid);
} catch (error) {
await reader.cancel(error);
throw error;
if (success) {
try {
await core.opAsync("op_http_shutdown", streamRid);
} catch (error) {
await reader.cancel(error);
throw error;
}
}
}

View File

@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r
mime = "0.3.16"
percent-encoding.workspace = true
phf = { version = "0.10", features = ["macros"] }
pin-project.workspace = true
ring.workspace = true
serde.workspace = true
tokio.workspace = true

View File

@ -70,9 +70,12 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
use tokio_util::io::ReaderStream;
use crate::reader_stream::ExternallyAbortableReaderStream;
use crate::reader_stream::ShutdownHandle;
pub mod compressible;
mod reader_stream;
pub fn init() -> Extension {
Extension::builder()
@ -414,8 +417,11 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream.
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
BodyUncompressed(hyper::body::Sender),
Body {
writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
shutdown_handle: ShutdownHandle,
},
BodyUncompressed(BodyUncompressedSender),
Closed,
}
@ -425,6 +431,36 @@ impl Default for HttpResponseWriter {
}
}
struct BodyUncompressedSender(Option<hyper::body::Sender>);
impl BodyUncompressedSender {
fn sender(&mut self) -> &mut hyper::body::Sender {
// This is safe because we only ever take the sender out of the option
// inside of the shutdown method.
self.0.as_mut().unwrap()
}
fn shutdown(mut self) {
// take the sender out of self so that when self is dropped at the end of
// this block, it doesn't get aborted
self.0.take();
}
}
impl From<hyper::body::Sender> for BodyUncompressedSender {
fn from(sender: hyper::body::Sender) -> Self {
BodyUncompressedSender(Some(sender))
}
}
impl Drop for BodyUncompressedSender {
fn drop(&mut self) {
if let Some(sender) = self.0.take() {
sender.abort();
}
}
}
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
@ -668,14 +704,22 @@ fn http_response(
Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
_ => unreachable!(), // forbidden by accepts_compression
};
let (stream, shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
Ok((
HttpResponseWriter::Body(writer),
Body::wrap_stream(ReaderStream::new(reader)),
HttpResponseWriter::Body {
writer,
shutdown_handle,
},
Body::wrap_stream(stream),
))
}
None => {
let (body_tx, body_rx) = Body::channel();
Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
Ok((
HttpResponseWriter::BodyUncompressed(body_tx.into()),
body_rx,
))
}
}
}
@ -768,10 +812,10 @@ async fn op_http_write_resource(
}
match &mut *wr {
HttpResponseWriter::Body(body) => {
let mut result = body.write_all(&view).await;
HttpResponseWriter::Body { writer, .. } => {
let mut result = writer.write_all(&view).await;
if result.is_ok() {
result = body.flush().await;
result = writer.flush().await;
}
if let Err(err) = result {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
@ -784,7 +828,7 @@ async fn op_http_write_resource(
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(view);
if let Err(err) = body.send_data(bytes).await {
if let Err(err) = body.sender().send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
@ -813,10 +857,10 @@ async fn op_http_write(
match &mut *wr {
HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
HttpResponseWriter::Closed => Err(http_error("response already completed")),
HttpResponseWriter::Body(body) => {
let mut result = body.write_all(&buf).await;
HttpResponseWriter::Body { writer, .. } => {
let mut result = writer.write_all(&buf).await;
if result.is_ok() {
result = body.flush().await;
result = writer.flush().await;
}
match result {
Ok(_) => Ok(()),
@ -833,7 +877,7 @@ async fn op_http_write(
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(buf);
match body.send_data(bytes).await {
match body.sender().send_data(bytes).await {
Ok(_) => Ok(()),
Err(err) => {
assert!(err.is_closed());
@ -862,17 +906,27 @@ async fn op_http_shutdown(
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
if let HttpResponseWriter::Body(mut body_writer) = wr {
match body_writer.shutdown().await {
Ok(_) => {}
Err(err) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
stream.conn.closed().await?;
match wr {
HttpResponseWriter::Body {
mut writer,
shutdown_handle,
} => {
shutdown_handle.shutdown();
match writer.shutdown().await {
Ok(_) => {}
Err(err) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
stream.conn.closed().await?;
}
}
}
}
HttpResponseWriter::BodyUncompressed(body) => {
body.shutdown();
}
_ => {}
};
Ok(())
}

157
ext/http/reader_stream.rs Normal file
View File

@ -0,0 +1,157 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
use deno_core::futures::Stream;
use pin_project::pin_project;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream].
/// It is used to bridge between the HTTP response body resource, and
/// `hyper::Body`. The stream has the special property that it errors if the
/// underlying reader is closed before an explicit EOF is sent (in the form of
/// setting the `shutdown` flag to true).
#[pin_project]
pub struct ExternallyAbortableReaderStream<R: AsyncRead> {
#[pin]
inner: ReaderStream<R>,
done: Arc<AtomicBool>,
}
pub struct ShutdownHandle(Arc<AtomicBool>);
impl ShutdownHandle {
pub fn shutdown(&self) {
self.0.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
impl<R: AsyncRead> ExternallyAbortableReaderStream<R> {
pub fn new(reader: R) -> (Self, ShutdownHandle) {
let done = Arc::new(AtomicBool::new(false));
let this = Self {
inner: ReaderStream::new(reader),
done: done.clone(),
};
(this, ShutdownHandle(done))
}
}
impl<R: AsyncRead> Stream for ExternallyAbortableReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
let val = std::task::ready!(this.inner.poll_next(cx));
match val {
None if this.done.load(Ordering::SeqCst) => Poll::Ready(None),
None => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"stream reader has shut down",
)))),
Some(val) => Poll::Ready(Some(val)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use deno_core::futures::StreamExt;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn success() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
writer.write_all(b"world").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
shutdown_handle.shutdown();
writer.shutdown().await.unwrap();
drop(writer);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn error() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, _shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
drop(writer);
assert_eq!(
stream.next().await.unwrap().unwrap_err().kind(),
std::io::ErrorKind::UnexpectedEof
);
}
#[tokio::test]
async fn error2() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, _shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
writer.shutdown().await.unwrap();
drop(writer);
assert_eq!(
stream.next().await.unwrap().unwrap_err().kind(),
std::io::ErrorKind::UnexpectedEof
);
}
#[tokio::test]
async fn write_after_shutdown() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
writer.write_all(b"world").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
shutdown_handle.shutdown();
writer.shutdown().await.unwrap();
assert!(writer.write_all(b"!").await.is_err());
drop(writer);
assert!(stream.next().await.is_none());
}
}