fix(ext/http): avoid lockup in graceful shutdown (#21253)

Follow-up to #20822. cc @lrowe 

The `httpServerExplicitResourceManagement` tests were randomly failing
on CI because of a race.

The `drain` waker was missing wakeup events if the listeners shut down
after the last HTTP response finished. If we lost the race (rare), the
server Rc would be dropped and we wouldn't poll it again.

This replaces the drain waker system with a signalling Rc that always
resolves when the refcount is about to become 1.

Fix verified by running serve tests in a loop:

```
for i in {0..100}; do cargo run --features=__http_tracing -- test
 -A --unstable '/Users/matt/Documents/github/deno/deno/cli/tests/unit/ser
ve_test.ts' --filter httpServerExplicitResourceManagement; done;
```
This commit is contained in:
Matt Mastracci 2023-11-23 09:39:17 -07:00 committed by GitHub
parent eda3850f84
commit 68a0877f8d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 52 deletions

View file

@ -10,15 +10,18 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytesInner;
use crate::service::handle_request;
use crate::service::http_general_trace;
use crate::service::http_trace;
use crate::service::HttpRecord;
use crate::service::HttpRecordResponse;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::HttpServerState;
use crate::service::SignallingRc;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::serde_v8::from_v8;
@ -924,7 +927,7 @@ where
struct HttpLifetime {
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
server_state: Rc<HttpServerState>,
server_state: SignallingRc<HttpServerState>,
}
struct HttpJoinHandle {
@ -932,7 +935,7 @@ struct HttpJoinHandle {
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
server_state: Rc<HttpServerState>,
server_state: SignallingRc<HttpServerState>,
}
impl HttpJoinHandle {
@ -1179,6 +1182,7 @@ pub async fn op_http_close(
.take::<HttpJoinHandle>(rid)?;
if graceful {
http_general_trace!("graceful shutdown");
// TODO(bartlomieju): replace with `state.feature_checker.check_or_exit`
// once we phase out `check_or_exit_with_legacy_fallback`
state
@ -1191,8 +1195,9 @@ pub async fn op_http_close(
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
join_handle.server_state.drain().await;
poll_fn(|cx| join_handle.server_state.poll_complete(cx)).await;
} else {
http_general_trace!("forceful shutdown");
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
@ -1200,6 +1205,8 @@ pub async fn op_http_close(
tokio::task::yield_now().await;
}
http_general_trace!("awaiting shutdown");
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
.borrow_mut()
.await;

View file

@ -17,6 +17,7 @@ use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
use scopeguard::ScopeGuard;
use std::cell::Cell;
use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
@ -31,12 +32,34 @@ use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<HttpRecordResponse>;
#[cfg(feature = "__http_tracing")]
pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
macro_rules! http_general_trace {
($($args:expr),*) => {
#[cfg(feature = "__http_tracing")]
{
let count = $crate::service::RECORD_COUNT
.load(std::sync::atomic::Ordering::SeqCst);
println!(
"HTTP [+{count}]: {}",
format!($($args),*),
);
}
};
}
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
#[cfg(feature = "__http_tracing")]
{
let count = $crate::service::RECORD_COUNT
.load(std::sync::atomic::Ordering::SeqCst);
println!(
"HTTP id={:p} strong={}: {}",
"HTTP [+{count}] id={:p} strong={}: {}",
$record,
std::rc::Rc::strong_count(&$record),
format!($($args),*),
@ -45,44 +68,83 @@ macro_rules! http_trace {
};
}
pub(crate) use http_general_trace;
pub(crate) use http_trace;
struct HttpServerStateInner {
pub(crate) struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
drain_waker: Option<Waker>,
}
pub struct HttpServerState(RefCell<HttpServerStateInner>);
/// A signalling version of `Rc` that allows one to poll for when all other references
/// to the `Rc` have been dropped.
#[repr(transparent)]
pub(crate) struct SignallingRc<T>(Rc<(T, Cell<Option<Waker>>)>);
impl HttpServerState {
pub fn new() -> Rc<Self> {
Rc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
drain_waker: None,
})))
impl<T> SignallingRc<T> {
#[inline]
pub fn new(t: T) -> Self {
Self(Rc::new((t, Default::default())))
}
pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
#[inline]
pub fn strong_count(&self) -> usize {
Rc::strong_count(&self.0)
}
impl<'a> Future for HttpServerStateDrain<'a> {
type Output = ();
/// Resolves when this is the only remaining reference.
#[inline]
pub fn poll_complete(&self, cx: &mut Context<'_>) -> Poll<()> {
if Rc::strong_count(&self.0) == 1 {
Poll::Ready(())
} else {
self.0 .1.set(Some(cx.waker().clone()));
Poll::Pending
}
}
}
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let server_state = self.0;
http_trace!(server_state, "HttpServerState::drain poll");
if Rc::strong_count(server_state) <= 1 {
return Poll::Ready(());
}
server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
Poll::Pending
impl<T> Clone for SignallingRc<T> {
#[inline]
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> Drop for SignallingRc<T> {
#[inline]
fn drop(&mut self) {
// Trigger the waker iff the refcount is about to become 1.
if Rc::strong_count(&self.0) == 2 {
if let Some(waker) = self.0 .1.take() {
waker.wake();
}
}
}
}
HttpServerStateDrain(self)
impl<T> std::ops::Deref for SignallingRc<T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0 .0
}
}
pub(crate) struct HttpServerState(RefCell<HttpServerStateInner>);
impl HttpServerState {
pub fn new() -> SignallingRc<Self> {
SignallingRc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
})))
}
}
impl std::ops::Deref for HttpServerState {
type Target = RefCell<HttpServerStateInner>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
@ -117,7 +179,7 @@ impl Drop for HttpRequestBodyAutocloser {
pub async fn handle_request(
request: Request,
request_info: HttpConnectionProperties,
server_state: Rc<HttpServerState>, // Keep server alive for duration of this future.
server_state: SignallingRc<HttpServerState>, // Keep server alive for duration of this future.
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> Result<Response, hyper::Error> {
// If the underlying TCP connection is closed, this future will be dropped
@ -145,7 +207,7 @@ pub async fn handle_request(
}
struct HttpRecordInner {
server_state: Rc<HttpServerState>,
server_state: SignallingRc<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@ -163,18 +225,14 @@ struct HttpRecordInner {
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
#[cfg(feature = "__http_tracing")]
pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(feature = "__http_tracing")]
impl Drop for HttpRecord {
fn drop(&mut self) {
let count = RECORD_COUNT
RECORD_COUNT
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
.checked_sub(1)
.expect("Count went below zero");
println!("HTTP count={count}: HttpRecord::drop");
http_general_trace!("HttpRecord::drop");
}
}
@ -182,13 +240,13 @@ impl HttpRecord {
fn new(
request: Request,
request_info: HttpConnectionProperties,
server_state: Rc<HttpServerState>,
server_state: SignallingRc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let request_body = Some(request_body.into());
let (mut response_parts, _) = http::Response::new(()).into_parts();
let record =
if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
if let Some((record, headers)) = server_state.borrow_mut().pool.pop() {
response_parts.headers = headers;
http_trace!(record, "HttpRecord::reuse");
record
@ -262,23 +320,13 @@ impl HttpRecord {
..
} = self.0.borrow_mut().take().unwrap();
let mut server_state_mut = server_state.0.borrow_mut();
let inflight = Rc::strong_count(&server_state);
let inflight = server_state.strong_count();
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
// Server is shutting down so wake the drain future.
if let Some(waker) = server_state_mut.drain_waker.take() {
drop(server_state_mut);
drop(server_state);
http_trace!(self, "HttpRecord::recycle wake");
waker.wake();
return;
}
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
let target = 16 + (inflight >> 3);
let pool = &mut server_state_mut.pool;
let pool = &mut server_state.borrow_mut().pool;
if target > pool.len() {
headers.clear();
pool.push((self, headers));
@ -634,7 +682,7 @@ mod tests {
.await
},
)?;
assert_eq!(Rc::strong_count(&server_state_check), 1);
assert_eq!(server_state_check.strong_count(), 1);
Ok(())
}
}