From 68a0877f8dc4a821b3688105b24953ddfaa9e189 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Thu, 23 Nov 2023 09:39:17 -0700 Subject: [PATCH] fix(ext/http): avoid lockup in graceful shutdown (#21253) Follow-up to #20822. cc @lrowe The `httpServerExplicitResourceManagement` tests were randomly failing on CI because of a race. The `drain` waker was missing wakeup events if the listeners shut down after the last HTTP response finished. If we lost the race (rare), the server Rc would be dropped and we wouldn't poll it again. This replaces the drain waker system with a signalling Rc that always resolves when the refcount is about to become 1. Fix verified by running serve tests in a loop: ``` for i in {0..100}; do cargo run --features=__http_tracing -- test -A --unstable '/Users/matt/Documents/github/deno/deno/cli/tests/unit/ser ve_test.ts' --filter httpServerExplicitResourceManagement; done; ``` --- ext/http/http_next.rs | 13 +++- ext/http/service.rs | 146 ++++++++++++++++++++++++++++-------------- 2 files changed, 107 insertions(+), 52 deletions(-) diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 9504a6fa48..db63601a05 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,15 +10,18 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytesInner; use crate::service::handle_request; +use crate::service::http_general_trace; use crate::service::http_trace; use crate::service::HttpRecord; use crate::service::HttpRecordResponse; use crate::service::HttpRequestBodyAutocloser; use crate::service::HttpServerState; +use crate::service::SignallingRc; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; use deno_core::futures::TryFutureExt; use deno_core::op2; use deno_core::serde_v8::from_v8; @@ -924,7 +927,7 @@ where struct HttpLifetime { connection_cancel_handle: Rc, listen_cancel_handle: Rc, - server_state: Rc, + server_state: SignallingRc, } struct HttpJoinHandle { @@ -932,7 +935,7 @@ struct HttpJoinHandle { connection_cancel_handle: Rc, listen_cancel_handle: Rc, rx: AsyncRefCell>>, - server_state: Rc, + server_state: SignallingRc, } impl HttpJoinHandle { @@ -1179,6 +1182,7 @@ pub async fn op_http_close( .take::(rid)?; if graceful { + http_general_trace!("graceful shutdown"); // TODO(bartlomieju): replace with `state.feature_checker.check_or_exit` // once we phase out `check_or_exit_with_legacy_fallback` state @@ -1191,8 +1195,9 @@ pub async fn op_http_close( // In a graceful shutdown, we close the listener and allow all the remaining connections to drain join_handle.listen_cancel_handle().cancel(); - join_handle.server_state.drain().await; + poll_fn(|cx| join_handle.server_state.poll_complete(cx)).await; } else { + http_general_trace!("forceful shutdown"); // In a forceful shutdown, we close everything join_handle.listen_cancel_handle().cancel(); join_handle.connection_cancel_handle().cancel(); @@ -1200,6 +1205,8 @@ pub async fn op_http_close( tokio::task::yield_now().await; } + http_general_trace!("awaiting shutdown"); + let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) .borrow_mut() .await; diff --git a/ext/http/service.rs b/ext/http/service.rs index fbd533cacd..c232962be4 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -17,6 +17,7 @@ use hyper1::upgrade::OnUpgrade; use scopeguard::guard; use scopeguard::ScopeGuard; +use std::cell::Cell; use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; @@ -31,12 +32,34 @@ use std::task::Waker; pub type Request = hyper1::Request; pub type Response = hyper1::Response; +#[cfg(feature = "__http_tracing")] +pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +macro_rules! http_general_trace { + ($($args:expr),*) => { + #[cfg(feature = "__http_tracing")] + { + let count = $crate::service::RECORD_COUNT + .load(std::sync::atomic::Ordering::SeqCst); + + println!( + "HTTP [+{count}]: {}", + format!($($args),*), + ); + } + }; +} + macro_rules! http_trace { ($record:expr $(, $args:expr)*) => { #[cfg(feature = "__http_tracing")] { + let count = $crate::service::RECORD_COUNT + .load(std::sync::atomic::Ordering::SeqCst); + println!( - "HTTP id={:p} strong={}: {}", + "HTTP [+{count}] id={:p} strong={}: {}", $record, std::rc::Rc::strong_count(&$record), format!($($args),*), @@ -45,44 +68,83 @@ macro_rules! http_trace { }; } +pub(crate) use http_general_trace; pub(crate) use http_trace; -struct HttpServerStateInner { +pub(crate) struct HttpServerStateInner { pool: Vec<(Rc, HeaderMap)>, - drain_waker: Option, } -pub struct HttpServerState(RefCell); +/// A signalling version of `Rc` that allows one to poll for when all other references +/// to the `Rc` have been dropped. +#[repr(transparent)] +pub(crate) struct SignallingRc(Rc<(T, Cell>)>); -impl HttpServerState { - pub fn new() -> Rc { - Rc::new(Self(RefCell::new(HttpServerStateInner { - pool: Vec::new(), - drain_waker: None, - }))) +impl SignallingRc { + #[inline] + pub fn new(t: T) -> Self { + Self(Rc::new((t, Default::default()))) } - pub fn drain<'a>(self: &'a Rc) -> impl Future + 'a { - struct HttpServerStateDrain<'a>(&'a Rc); + #[inline] + pub fn strong_count(&self) -> usize { + Rc::strong_count(&self.0) + } - impl<'a> Future for HttpServerStateDrain<'a> { - type Output = (); + /// Resolves when this is the only remaining reference. + #[inline] + pub fn poll_complete(&self, cx: &mut Context<'_>) -> Poll<()> { + if Rc::strong_count(&self.0) == 1 { + Poll::Ready(()) + } else { + self.0 .1.set(Some(cx.waker().clone())); + Poll::Pending + } + } +} - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll { - let server_state = self.0; - http_trace!(server_state, "HttpServerState::drain poll"); - if Rc::strong_count(server_state) <= 1 { - return Poll::Ready(()); - } - server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone()); - Poll::Pending +impl Clone for SignallingRc { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Drop for SignallingRc { + #[inline] + fn drop(&mut self) { + // Trigger the waker iff the refcount is about to become 1. + if Rc::strong_count(&self.0) == 2 { + if let Some(waker) = self.0 .1.take() { + waker.wake(); } } + } +} - HttpServerStateDrain(self) +impl std::ops::Deref for SignallingRc { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 .0 + } +} + +pub(crate) struct HttpServerState(RefCell); + +impl HttpServerState { + pub fn new() -> SignallingRc { + SignallingRc::new(Self(RefCell::new(HttpServerStateInner { + pool: Vec::new(), + }))) + } +} + +impl std::ops::Deref for HttpServerState { + type Target = RefCell; + + fn deref(&self) -> &Self::Target { + &self.0 } } @@ -117,7 +179,7 @@ impl Drop for HttpRequestBodyAutocloser { pub async fn handle_request( request: Request, request_info: HttpConnectionProperties, - server_state: Rc, // Keep server alive for duration of this future. + server_state: SignallingRc, // Keep server alive for duration of this future. tx: tokio::sync::mpsc::Sender>, ) -> Result { // If the underlying TCP connection is closed, this future will be dropped @@ -145,7 +207,7 @@ pub async fn handle_request( } struct HttpRecordInner { - server_state: Rc, + server_state: SignallingRc, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option, @@ -163,18 +225,14 @@ struct HttpRecordInner { pub struct HttpRecord(RefCell>); -#[cfg(feature = "__http_tracing")] -pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = - std::sync::atomic::AtomicUsize::new(0); - #[cfg(feature = "__http_tracing")] impl Drop for HttpRecord { fn drop(&mut self) { - let count = RECORD_COUNT + RECORD_COUNT .fetch_sub(1, std::sync::atomic::Ordering::SeqCst) .checked_sub(1) .expect("Count went below zero"); - println!("HTTP count={count}: HttpRecord::drop"); + http_general_trace!("HttpRecord::drop"); } } @@ -182,13 +240,13 @@ impl HttpRecord { fn new( request: Request, request_info: HttpConnectionProperties, - server_state: Rc, + server_state: SignallingRc, ) -> Rc { let (request_parts, request_body) = request.into_parts(); let request_body = Some(request_body.into()); let (mut response_parts, _) = http::Response::new(()).into_parts(); let record = - if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { + if let Some((record, headers)) = server_state.borrow_mut().pool.pop() { response_parts.headers = headers; http_trace!(record, "HttpRecord::reuse"); record @@ -262,23 +320,13 @@ impl HttpRecord { .. } = self.0.borrow_mut().take().unwrap(); - let mut server_state_mut = server_state.0.borrow_mut(); - let inflight = Rc::strong_count(&server_state); + let inflight = server_state.strong_count(); http_trace!(self, "HttpRecord::recycle inflight={}", inflight); - // Server is shutting down so wake the drain future. - if let Some(waker) = server_state_mut.drain_waker.take() { - drop(server_state_mut); - drop(server_state); - http_trace!(self, "HttpRecord::recycle wake"); - waker.wake(); - return; - } - // Keep a buffer of allocations on hand to be reused by incoming requests. // Estimated target size is 16 + 1/8 the number of inflight requests. let target = 16 + (inflight >> 3); - let pool = &mut server_state_mut.pool; + let pool = &mut server_state.borrow_mut().pool; if target > pool.len() { headers.clear(); pool.push((self, headers)); @@ -634,7 +682,7 @@ mod tests { .await }, )?; - assert_eq!(Rc::strong_count(&server_state_check), 1); + assert_eq!(server_state_check.strong_count(), 1); Ok(()) } }