refactor(ext/http): Use HttpRecord as response body to track until body completion (#20822)

Use HttpRecord as response body so requests can be tracked all the way
to response body completion.

This allows Request properties to be accessed while the response body is
streaming.

Graceful shutdown now awaits a future instead of async spinning waiting
for requests to finish.

On the minimal benchmark this refactor improves performance an
additional 2% over pooling alone for a net 3% increase over the previous
deno main branch.

Builds upon https://github.com/denoland/deno/pull/20809 and
https://github.com/denoland/deno/pull/20770.

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
Laurence Rowe 2023-11-13 11:17:31 -08:00 committed by GitHub
parent 0209f7b469
commit e5819777c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 341 additions and 336 deletions

View file

@ -43,6 +43,7 @@ const {
ObjectHasOwn,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
PromisePrototypeThen,
Symbol,
TypeError,
Uint8Array,
@ -50,6 +51,7 @@ const {
} = primordials;
const {
op_http_close_after_finish,
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
@ -386,9 +388,10 @@ class ServeHandlerInfo {
}
}
function fastSyncResponseOrStream(req, respBody, status) {
function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
if (respBody === null || respBody === undefined) {
// Don't set the body
innerRequest?.close();
op_http_set_promise_complete(req, status);
return;
}
@ -397,36 +400,43 @@ function fastSyncResponseOrStream(req, respBody, status) {
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
innerRequest?.close();
op_http_set_response_body_bytes(req, body, status);
return;
}
if (typeof body === "string") {
innerRequest?.close();
op_http_set_response_body_text(req, body, status);
return;
}
// At this point in the response it needs to be a stream
if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
innerRequest?.close();
throw TypeError("invalid response");
}
const resourceBacking = getReadableStreamResourceBacking(stream);
let rid, autoClose;
if (resourceBacking) {
op_http_set_response_body_resource(
req,
resourceBacking.rid,
resourceBacking.autoClose,
status,
);
rid = resourceBacking.rid;
autoClose = resourceBacking.autoClose;
} else {
const rid = resourceForReadableStream(stream);
rid = resourceForReadableStream(stream);
autoClose = true;
}
PromisePrototypeThen(
op_http_set_response_body_resource(
req,
rid,
true,
autoClose,
status,
);
}
),
() => {
innerRequest?.close();
op_http_close_after_finish(req);
},
);
}
/**
@ -499,8 +509,7 @@ function mapToCallback(context, callback, onError) {
}
}
innerRequest?.close();
fastSyncResponseOrStream(req, inner.body, status);
fastSyncResponseOrStream(req, inner.body, status, innerRequest);
};
}

View file

@ -8,11 +8,11 @@ use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::service::handle_request;
use crate::service::http_trace;
use crate::service::HttpRecord;
use crate::service::HttpRecordResponse;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::HttpServerState;
use crate::websocket_upgrade::WebSocketUpgrade;
@ -68,7 +68,6 @@ use std::io;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@ -182,7 +181,7 @@ pub fn op_http_upgrade_raw(
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
spawn(async move {
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
let mut upgrade_stream = WebSocketUpgrade::<()>::default();
// Stage 2: Extract the Upgraded connection
let mut buf = [0; 1024];
@ -191,7 +190,8 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
*http.response() = response;
let (response_parts, _) = response.into_parts();
*http.response_parts() = response_parts;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
@ -250,10 +250,10 @@ pub async fn op_http_upgrade_websocket_next(
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
{
let mut response = http.response();
*response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
let mut response_parts = http.response_parts();
response_parts.status = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers {
response.headers_mut().append(
response_parts.headers.append(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
@ -274,10 +274,14 @@ pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_set_promise_complete") };
set_promise_complete(http, status);
}
fn set_promise_complete(http: Rc<HttpRecord>, status: u16) {
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
*http.response().status_mut() = code;
http.response_parts().status = code;
}
http.complete();
}
@ -441,7 +445,7 @@ pub fn op_http_read_request_body(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_read_request_body") };
let rid = if let Some(incoming) = http.take_body() {
let rid = if let Some(incoming) = http.take_request_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
} else {
@ -462,8 +466,7 @@ pub fn op_http_set_response_header(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_header") };
let mut response = http.response();
let resp_headers = response.headers_mut();
let mut response_parts = http.response_parts();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@ -473,7 +476,7 @@ pub fn op_http_set_response_header(
HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec))
},
};
resp_headers.append(name, value);
response_parts.headers.append(name, value);
}
#[op2]
@ -486,12 +489,13 @@ pub fn op_http_set_response_headers(
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
let mut response = http.response();
let resp_headers = response.headers_mut();
let mut response_parts = http.response_parts();
let len = headers.length();
let header_len = len * 2;
resp_headers.reserve(header_len.try_into().unwrap());
response_parts
.headers
.reserve(header_len.try_into().unwrap());
for i in 0..len {
let item = headers.get_index(scope, i).unwrap();
@ -505,7 +509,7 @@ pub fn op_http_set_response_headers(
let header_value =
// SAFETY: These are valid latin-1 strings
unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) };
resp_headers.append(header_name, header_value);
response_parts.headers.append(header_name, header_value);
}
}
@ -525,7 +529,7 @@ pub fn op_http_set_response_trailers(
let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) };
trailer_map.append(name, value);
}
*http.trailers().borrow_mut() = Some(trailer_map);
*http.trailers() = Some(trailer_map);
}
fn is_request_compressible(
@ -652,31 +656,28 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
external: *const c_void,
http: Rc<HttpRecord>,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
// SAFETY: external is deleted before calling this op.
let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
let mut response = http.response();
let mut response_headers =
std::cell::RefMut::map(http.response_parts(), |this| &mut this.headers);
let compression =
modify_compressibility_from_response(compression, response.headers_mut());
response
.body_mut()
.initialize(response_fn(compression), resource);
modify_compressibility_from_response(compression, &mut response_headers);
drop(response_headers);
http.set_response_body(response_fn(compression));
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
*response.status_mut() = code;
http.response_parts().status = code;
}
} else if force_instantiate_body {
response_fn(Compression::None).abort();
@ -685,14 +686,20 @@ fn set_response(
http.complete();
}
#[op2(fast)]
pub fn op_http_set_response_body_resource(
/// Returned promise resolves when body streaming finishes.
/// Call [`op_http_close_after_finish`] when done with the external.
#[op2(async)]
pub async fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
// IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
// body resource so we _cannot_ hold the OpState lock longer than necessary.
@ -710,7 +717,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
external,
http.clone(),
resource.size_hint().1.map(|s| s as usize),
status,
true,
@ -719,21 +726,34 @@ pub fn op_http_set_response_body_resource(
},
);
*http.needs_close_after_finish() = true;
http.response_body_finished().await;
Ok(())
}
#[op2(fast)]
pub fn op_http_close_after_finish(external: *const c_void) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_close_after_finish") };
http.close_after_finish();
}
#[op2(fast)]
pub fn op_http_set_response_body_text(
external: *const c_void,
#[string] text: String,
status: u16,
) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_set_response_body_text") };
if !text.is_empty() {
set_response(external, Some(text.len()), status, false, |compression| {
set_response(http, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
op_http_set_promise_complete::call(external, status);
set_promise_complete(http, status);
}
}
@ -743,45 +763,21 @@ pub fn op_http_set_response_body_bytes(
#[buffer] buffer: JsBuffer,
status: u16,
) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_set_response_body_bytes") };
if !buffer.is_empty() {
set_response(external, Some(buffer.len()), status, false, |compression| {
set_response(http, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
op_http_set_promise_complete::call(external, status);
}
}
#[op2(async)]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[smi] server_rid: ResourceId,
) -> Result<(), AnyError> {
// SAFETY: op is called with external.
let http = unsafe { clone_external!(external, "op_http_track") };
let handle = http.body_promise();
let join_handle = state
.borrow_mut()
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
match handle
.or_cancel(join_handle.connection_cancel_handle())
.await
{
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
}
Err(_e) => Ok(()),
set_promise_complete(http, status);
}
}
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
@ -803,7 +799,7 @@ fn serve_http11_unconditional(
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
@ -821,7 +817,7 @@ fn serve_http2_unconditional(
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
@ -1194,14 +1190,13 @@ pub async fn op_http_close(
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
// Async spin on the server_state while we wait for everything to drain
while Rc::strong_count(&join_handle.server_state) > 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
join_handle.server_state.drain().await;
} else {
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
// Give streaming responses a tick to close
tokio::task::yield_now().await;
}
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)

View file

@ -108,6 +108,7 @@ deno_core::extension!(
op_http_write_headers,
op_http_write_resource,
op_http_write,
http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
http_next::op_http_get_request_method_and_url<HTTP>,
@ -121,7 +122,6 @@ deno_core::extension!(
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,
http_next::op_http_set_response_trailers,
http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
http_next::op_raw_write_vectored,

View file

@ -1,10 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Waker;
use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
@ -18,16 +15,13 @@ use deno_core::BufView;
use deno_core::Resource;
use flate2::write::GzEncoder;
use http::HeaderMap;
use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
use crate::service::HttpRequestBodyAutocloser;
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
enum ResponseStreamResult {
pub enum ResponseStreamResult {
/// Stream is over.
EndOfStream,
/// Stream provided non-empty data.
@ -57,53 +51,7 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
}
}
#[derive(Clone, Debug, Default)]
pub struct CompletionHandle {
inner: Rc<RefCell<CompletionHandleInner>>,
}
#[derive(Debug, Default)]
struct CompletionHandleInner {
complete: bool,
success: bool,
waker: Option<Waker>,
}
impl CompletionHandle {
pub fn complete(&self, success: bool) {
let mut mut_self = self.inner.borrow_mut();
mut_self.complete = true;
mut_self.success = success;
if let Some(waker) = mut_self.waker.take() {
drop(mut_self);
waker.wake();
}
}
#[allow(dead_code)]
pub fn is_completed(&self) -> bool {
self.inner.borrow().complete
}
}
impl Future for CompletionHandle {
type Output = bool;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut mut_self = self.inner.borrow_mut();
if mut_self.complete {
return std::task::Poll::Ready(mut_self.success);
}
mut_self.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
trait PollFrame: Unpin {
pub trait PollFrame: Unpin {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
@ -166,52 +114,6 @@ impl std::fmt::Debug for ResponseBytesInner {
}
}
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
/// required by hyper. As the API requires information about request completion (including a success/fail
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
#[derive(Default)]
pub struct ResponseBytes {
inner: ResponseBytesInner,
completion_handle: CompletionHandle,
headers: Rc<RefCell<Option<HeaderMap>>>,
res: Option<HttpRequestBodyAutocloser>,
}
impl ResponseBytes {
pub fn initialize(
&mut self,
inner: ResponseBytesInner,
req_body_resource: Option<HttpRequestBodyAutocloser>,
) {
debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
self.inner = inner;
self.res = req_body_resource;
}
pub fn completion_handle(&self) -> CompletionHandle {
self.completion_handle.clone()
}
pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
self.headers.clone()
}
fn complete(&mut self, success: bool) -> ResponseBytesInner {
if matches!(self.inner, ResponseBytesInner::Done) {
return ResponseBytesInner::Done;
}
let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
self.completion_handle.complete(success);
if success {
current
} else {
current.abort();
ResponseBytesInner::Done
}
}
}
impl ResponseBytesInner {
pub fn abort(self) {
match self {
@ -298,75 +200,6 @@ impl ResponseBytesInner {
}
}
impl Body for ResponseBytes {
type Data = BufView;
type Error = AnyError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let res = loop {
let res = match &mut self.inner {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
}
ResponseBytesInner::Bytes(..) => {
let ResponseBytesInner::Bytes(data) = self.complete(true) else {
unreachable!();
};
return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
}
ResponseBytesInner::UncompressedStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::GZipStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::BrotliStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
};
// This is where we retry the NoData response
if matches!(res, ResponseStreamResult::NoData) {
continue;
}
break res;
};
if matches!(res, ResponseStreamResult::EndOfStream) {
if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
self.complete(true);
}
std::task::Poll::Ready(res.into())
}
fn is_end_stream(&self) -> bool {
matches!(
self.inner,
ResponseBytesInner::Done | ResponseBytesInner::Empty
) && self.headers.borrow_mut().is_none()
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
self.inner.size_hint()
}
}
impl Drop for ResponseBytes {
fn drop(&mut self) {
// We won't actually poll_frame for Empty responses so this is where we return success
self.complete(matches!(self.inner, ResponseBytesInner::Empty));
}
}
pub struct ResourceBodyAdapter {
auto_close: bool,
stm: Rc<dyn Resource>,

View file

@ -1,13 +1,18 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::response_body::ResponseStreamResult;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::BufView;
use deno_core::OpState;
use deno_core::ResourceId;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::Incoming;
use hyper1::body::SizeHint;
use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
@ -16,10 +21,15 @@ use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type Response = hyper1::Response<HttpRecordResponse>;
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
@ -39,6 +49,7 @@ pub(crate) use http_trace;
struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
drain_waker: Option<Waker>,
}
pub struct HttpServerState(RefCell<HttpServerStateInner>);
@ -47,8 +58,32 @@ impl HttpServerState {
pub fn new() -> Rc<Self> {
Rc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
drain_waker: None,
})))
}
pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
impl<'a> Future for HttpServerStateDrain<'a> {
type Output = ();
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let server_state = self.0;
http_trace!(server_state, "HttpServerState::drain poll");
if Rc::strong_count(server_state) <= 1 {
return Poll::Ready(());
}
server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
Poll::Pending
}
}
HttpServerStateDrain(self)
}
}
enum RequestBodyState {
@ -102,25 +137,28 @@ pub async fn handle_request(
http_trace!(*guarded_record, "handle_request response_ready.await");
guarded_record.response_ready().await;
// Defuse the guard. Must not await after the point.
// Defuse the guard. Must not await after this point.
let record = ScopeGuard::into_inner(guarded_record);
http_trace!(record, "handle_request complete");
let response = record.take_response();
record.recycle();
let response = record.into_response();
Ok(response)
}
struct HttpRecordInner {
server_state: Rc<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: Parts,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
/// The response may get taken before we tear this down
response: Option<Response>,
response_parts: Option<http::response::Parts>,
response_ready: bool,
response_waker: Option<std::task::Waker>,
trailers: Rc<RefCell<Option<HeaderMap>>>,
response_waker: Option<Waker>,
response_body: ResponseBytesInner,
response_body_finished: bool,
response_body_waker: Option<Waker>,
trailers: Option<HeaderMap>,
been_dropped: bool,
finished: bool,
needs_close_after_finish: bool,
}
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
@ -147,43 +185,70 @@ impl HttpRecord {
server_state: Rc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let body = ResponseBytes::default();
let trailers = body.trailers();
let request_body = Some(request_body.into());
let mut response = Response::new(body);
let reuse_record =
let (mut response_parts, _) = http::Response::new(()).into_parts();
let record =
if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
*response.headers_mut() = headers;
Some(record)
response_parts.headers = headers;
http_trace!(record, "HttpRecord::reuse");
record
} else {
None
#[cfg(feature = "__http_tracing")]
{
RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
#[allow(clippy::let_and_return)]
let record = Rc::new(Self(RefCell::new(None)));
http_trace!(record, "HttpRecord::new");
record
};
let inner = HttpRecordInner {
*record.0.borrow_mut() = Some(HttpRecordInner {
server_state,
request_info,
request_parts,
request_body,
response: Some(response),
response_parts: Some(response_parts),
response_ready: false,
response_waker: None,
trailers,
response_body: ResponseBytesInner::Empty,
response_body_finished: false,
response_body_waker: None,
trailers: None,
been_dropped: false,
};
if let Some(record) = reuse_record {
*record.0.borrow_mut() = Some(inner);
http_trace!(record, "HttpRecord::reuse");
record
} else {
#[cfg(feature = "__http_tracing")]
{
RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
finished: false,
needs_close_after_finish: false,
});
record
}
#[allow(clippy::let_and_return)]
let record = Rc::new(Self(RefCell::new(Some(inner))));
http_trace!(record, "HttpRecord::new");
record
fn finish(self: Rc<Self>) {
http_trace!(self, "HttpRecord::finish");
let mut inner = self.self_mut();
inner.response_body_finished = true;
let response_body_waker = inner.response_body_waker.take();
let needs_close_after_finish = inner.needs_close_after_finish;
drop(inner);
if let Some(waker) = response_body_waker {
waker.wake();
}
if !needs_close_after_finish {
self.recycle();
}
}
pub fn close_after_finish(self: Rc<Self>) {
debug_assert!(self.self_ref().needs_close_after_finish);
let mut inner = self.self_mut();
inner.needs_close_after_finish = false;
if !inner.finished {
drop(inner);
self.recycle();
}
}
pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}
fn recycle(self: Rc<Self>) {
@ -196,12 +261,19 @@ impl HttpRecord {
request_parts: Parts { mut headers, .. },
..
} = self.0.borrow_mut().take().unwrap();
let mut server_state_mut = server_state.0.borrow_mut();
let inflight = Rc::strong_count(&server_state);
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
// TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling
// the to-drop objects off to another thread.
// Server is shutting down so wake the drain future.
if let Some(waker) = server_state_mut.drain_waker.take() {
drop(server_state_mut);
drop(server_state);
http_trace!(self, "HttpRecord::recycle wake");
waker.wake();
return;
}
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
@ -235,7 +307,7 @@ impl HttpRecord {
}
/// Take the Hyper body from this record.
pub fn take_body(&self) -> Option<Incoming> {
pub fn take_request_body(&self) -> Option<Incoming> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
@ -247,18 +319,6 @@ impl HttpRecord {
}
}
pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
Some(RequestBodyState::Resource(res)) => Some(res),
x => {
*body_holder = x;
None
}
}
}
/// Replace the request body with a resource ID and the OpState we'll need to shut it down.
/// We cannot keep just the resource itself, as JS code might be reading from the resource ID
/// to generate the response data (requiring us to keep it in the resource table).
@ -273,7 +333,7 @@ impl HttpRecord {
if inner.response_ready {
// Future dropped between wake() and async fn resuming.
drop(inner);
self.recycle();
self.finish();
return;
}
inner.been_dropped = true;
@ -291,7 +351,7 @@ impl HttpRecord {
);
if inner.been_dropped {
drop(inner);
self.recycle();
self.finish();
return;
}
inner.response_ready = true;
@ -301,25 +361,44 @@ impl HttpRecord {
}
}
fn take_response_body(&self) -> ResponseBytesInner {
let mut inner = self.self_mut();
debug_assert!(
!matches!(inner.response_body, ResponseBytesInner::Done),
"HTTP state error: response body already complete"
);
std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done)
}
/// Has the future for this record been dropped? ie, has the underlying TCP connection
/// been closed?
pub fn cancelled(&self) -> bool {
self.self_ref().been_dropped
}
/// Get a mutable reference to the response.
pub fn response(&self) -> RefMut<'_, Response> {
RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap())
/// Get a mutable reference to the response status and headers.
pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> {
RefMut::map(self.self_mut(), |inner| {
inner.response_parts.as_mut().unwrap()
})
}
/// Get a mutable reference to the trailers.
pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> {
Ref::map(self.self_ref(), |inner| &inner.trailers)
pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> {
RefMut::map(self.self_mut(), |inner| &mut inner.trailers)
}
pub fn set_response_body(&self, response_body: ResponseBytesInner) {
let mut inner = self.self_mut();
debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty));
inner.response_body = response_body;
}
/// Take the response.
fn take_response(&self) -> Response {
self.self_mut().response.take().unwrap()
fn into_response(self: Rc<Self>) -> Response {
let parts = self.self_mut().response_parts.take().unwrap();
let body = HttpRecordResponse(ManuallyDrop::new(self));
http::Response::from_parts(parts, body)
}
/// Get a reference to the connection properties.
@ -332,38 +411,131 @@ impl HttpRecord {
Ref::map(self.self_ref(), |inner| &inner.request_parts)
}
/// Get a reference to the completion handle.
/// Resolves when response head is ready.
fn response_ready(&self) -> impl Future<Output = ()> + '_ {
struct HttpRecordComplete<'a>(&'a HttpRecord);
struct HttpRecordReady<'a>(&'a HttpRecord);
impl<'a> Future for HttpRecordComplete<'a> {
impl<'a> Future for HttpRecordReady<'a> {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_ready {
return std::task::Poll::Ready(());
return Poll::Ready(());
}
mut_self.response_waker = Some(cx.waker().clone());
std::task::Poll::Pending
Poll::Pending
}
}
HttpRecordComplete(self)
HttpRecordReady(self)
}
/// Get a reference to the response body completion handle.
pub fn body_promise(&self) -> CompletionHandle {
self
.self_ref()
.response
.as_ref()
.unwrap()
.body()
.completion_handle()
/// Resolves when response body has finished streaming.
pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ {
struct HttpRecordFinished<'a>(&'a HttpRecord);
impl<'a> Future for HttpRecordFinished<'a> {
type Output = ();
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_body_finished {
return Poll::Ready(());
}
mut_self.response_body_waker = Some(cx.waker().clone());
Poll::Pending
}
}
HttpRecordFinished(self)
}
}
#[repr(transparent)]
pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
impl Body for HttpRecordResponse {
type Data = BufView;
type Error = AnyError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
use crate::response_body::PollFrame;
let record = &self.0;
let res = loop {
let mut inner = record.self_mut();
let res = match &mut inner.response_body {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
if let Some(trailers) = inner.trailers.take() {
return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
}
ResponseBytesInner::Bytes(..) => {
drop(inner);
let ResponseBytesInner::Bytes(data) = record.take_response_body()
else {
unreachable!();
};
return Poll::Ready(Some(Ok(Frame::data(data))));
}
ResponseBytesInner::UncompressedStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::GZipStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::BrotliStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
};
// This is where we retry the NoData response
if matches!(res, ResponseStreamResult::NoData) {
continue;
}
break res;
};
if matches!(res, ResponseStreamResult::EndOfStream) {
if let Some(trailers) = record.self_mut().trailers.take() {
return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
record.take_response_body();
}
Poll::Ready(res.into())
}
fn is_end_stream(&self) -> bool {
let inner = self.0.self_ref();
matches!(
inner.response_body,
ResponseBytesInner::Done | ResponseBytesInner::Empty
) && inner.trailers.is_none()
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
self.0.self_ref().response_body.size_hint()
}
}
impl Drop for HttpRecordResponse {
fn drop(&mut self) {
// SAFETY: this ManuallyDrop is not used again.
let record = unsafe { ManuallyDrop::take(&mut self.0) };
http_trace!(record, "HttpRecordResponse::drop");
record.finish();
}
}
@ -442,14 +614,10 @@ mod tests {
async move {
// JavaScript handler produces response
let record = rx.recv().await.unwrap();
let resource = record.take_resource();
record.response().body_mut().initialize(
ResponseBytesInner::from_vec(
Compression::None,
b"hello world".to_vec(),
),
resource,
);
record.set_response_body(ResponseBytesInner::from_vec(
Compression::None,
b"hello world".to_vec(),
));
record.complete();
Ok(())
},