fix(inspector): allow to poll session with reentry (#16863)

This commit completely rewrites inspector session polling.

Until now, there was a single function responsible for polling inspector
sessions which could have been called when polling the "JsRuntime"
as well as from internal inspector functions. There are some cases
where it's required to have reentrant polling of sessions (eg. when 
"debugger" statement is run) which should be blocking until inspector
sends appropriate message to continue execution. This was not possible
before, because polling of sessions didn't have reentry ability.

As a consequence, session polling was split into two separate functions:
a) one to be used when polling from async context (on each tick of event
loop in "JsRuntime")
b) one to be used when polling synchronously and potentially blocking
(used by various inspector methods).

There are further cleanups and simplifications to be made in inspector
code, but this rewrite solves the problem at hand (being able to
evaluate
"debugger" JS statement and continue inspector functionality).

Co-authored-by: Bert Belder <bertbelder@gmail.com>
This commit is contained in:
Bartek Iwańczuk 2022-12-02 23:17:02 +01:00 committed by GitHub
parent 868068c847
commit e2a0c3f0dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 271 additions and 132 deletions

View file

@ -1307,4 +1307,108 @@ mod inspector {
child.kill().unwrap();
child.wait().unwrap();
}
// https://github.com/denoland/deno/issues/11570
#[tokio::test]
async fn inspector_repl_debugger_statement() {
let mut child = util::deno_cmd()
.arg("repl")
.arg(inspect_flag_with_unique_port("--inspect"))
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let stderr = child.stderr.as_mut().unwrap();
let mut stderr_lines = std::io::BufReader::new(stderr)
.lines()
.map(|r| r.unwrap())
.filter(|s| s.as_str() != "Debugger session started.");
let ws_url = extract_ws_url_from_stderr(&mut stderr_lines);
let (socket, response) =
tokio_tungstenite::connect_async(ws_url).await.unwrap();
assert_eq!(response.status(), 101); // Switching protocols.
let (mut socket_tx, socket_rx) = socket.split();
let mut socket_rx = socket_rx
.map(|msg| msg.unwrap().to_string())
.filter(|msg| {
let pass = !msg.starts_with(r#"{"method":"Debugger.scriptParsed","#);
futures::future::ready(pass)
})
.boxed_local();
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.as_mut().unwrap();
let mut stdout_lines = std::io::BufReader::new(stdout)
.lines()
.map(|r| r.unwrap())
.filter(|s| !s.starts_with("Deno "));
assert_stderr_for_inspect(&mut stderr_lines);
assert_eq!(
&stdout_lines.next().unwrap(),
"exit using ctrl+d, ctrl+c, or close()"
);
assert_inspector_messages(
&mut socket_tx,
&[
r#"{"id":1,"method":"Runtime.enable"}"#,
r#"{"id":2,"method":"Debugger.enable"}"#,
],
&mut socket_rx,
&[
r#"{"id":1,"result":{}}"#,
r#"{"id":2,"result":{"debuggerId":"#,
],
&[
r#"{"method":"Runtime.executionContextCreated","params":{"context":{"id":1,"#,
],
)
.await;
assert_inspector_messages(
&mut socket_tx,
&[
r#"{"id":3,"method":"Runtime.evaluate","params":{"expression":"debugger","objectGroup":"console","includeCommandLineAPI":true,"silent":false,"contextId":1,"returnByValue":true,"generatePreview":true,"userGesture":true,"awaitPromise":false,"replMode":true}}"#,
],
&mut socket_rx,
&[],
&[
r#"{"method":"Debugger.paused""#,
],
).await;
assert_inspector_messages(
&mut socket_tx,
&[
r#"{"id":4,"method":"Debugger.resume","params":{"terminateOnResume":false}}"#,
],
&mut socket_rx,
&[
r#"{"id":4,"result":{}}"#,
r#"{"id":3,"result":{"result":{"type":"undefined"}}}"#,
],
&[
r#"{"method":"Debugger.resumed""#,
],
).await;
assert_inspector_messages(
&mut socket_tx,
&[
r#"{"id":5,"method":"Runtime.evaluate","params":{"expression":"1","objectGroup":"console","includeCommandLineAPI":true,"silent":false,"contextId":1,"returnByValue":true,"generatePreview":true,"userGesture":true,"awaitPromise":false,"replMode":true}}"#,
],
&mut socket_rx,
&[
r#"{"id":5,"result":{"result":{"type":"number","value":1,"description":"1"}}}"#,
],
&[
],
).await;
drop(stdin);
child.wait().unwrap();
}
}

View file

@ -23,10 +23,10 @@ use crate::serde_json::json;
use crate::serde_json::Value;
use anyhow::Error;
use parking_lot::Mutex;
use std::cell::BorrowMutError;
use std::cell::RefCell;
use std::collections::HashMap;
use std::ffi::c_void;
use std::mem::replace;
use std::mem::take;
use std::mem::MaybeUninit;
use std::pin::Pin;
@ -34,7 +34,6 @@ use std::ptr;
use std::ptr::NonNull;
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
use v8::HandleScope;
pub enum InspectorMsgKind {
@ -55,12 +54,21 @@ pub struct InspectorSessionProxy {
pub rx: SessionProxyReceiver,
}
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug, PartialEq)]
enum PollState {
// Inspector is not being polled at this moment, it's waiting for more events
// from the inspector.
Idle,
// `InspectorWaker` has been called - either explicitly by outside code
// (like WS server), or from one of the futures we were polling.
Woken,
// Inspector is being polled asynchronously from the owning runtime.
Polling,
Parked,
// Inspector is being polled synchronously, possibly in a reentrant way
// (e.g. from a callback invoked by V8).
SyncPolling,
// Inspector has been dropped already, but wakers might outlive the inspector
// so make sure nothing gets woken at this point.
Dropped,
}
@ -115,14 +123,24 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
&mut self.v8_inspector_client
}
/// This method id called when a breakpoint is triggered, eg. using `debugger` statement. In that case
/// inspector sends `Debugger.paused` notification. Nested message loop should be run and process all
/// sent protocol commands until `quit_message_loop_on_pause` is called. After that execution will
/// return to inspector and then JavaScript execution will resume.
fn run_message_loop_on_pause(&mut self, context_group_id: i32) {
assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
self.flags.borrow_mut().on_pause = true;
let _ = self.poll_sessions(None);
self.poll_sessions_sync();
assert!(
!self.flags.borrow().on_pause,
"V8InspectorClientImpl::run_message_loop_on_pause returned before quit_message_loop_on_pause was called"
);
}
fn quit_message_loop_on_pause(&mut self) {
self.flags.borrow_mut().on_pause = false;
let mut flags = self.flags.borrow_mut();
assert!(flags.on_pause);
flags.on_pause = false;
}
fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) {
@ -139,7 +157,10 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
impl Future for JsRuntimeInspector {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
self.poll_sessions(Some(cx)).unwrap()
// Here we actually want to set up waker so we are notified when new
// messages arrive. Note that other call sites might want to reenter
// and pump sessions synchronously.
self.poll_sessions(cx)
}
}
@ -206,9 +227,7 @@ impl JsRuntimeInspector {
aux_data_view,
);
// Poll the session handler so we will get notified whenever there is
// new incoming debugger activity.
let _ = self_.poll_sessions(None).unwrap();
self_.poll_sessions_sync();
drop(self_);
self__
@ -236,20 +255,55 @@ impl JsRuntimeInspector {
self.sessions.borrow().has_blocking_sessions()
}
fn poll_sessions(
&self,
mut invoker_cx: Option<&mut Context>,
) -> Result<Poll<()>, BorrowMutError> {
// The futures this function uses do not have re-entrant poll() functions.
// However it is can happpen that poll_sessions() gets re-entered, e.g.
// when an interrupt request is honored while the inspector future is polled
// by the task executor. We let the caller know by returning some error.
let mut sessions = self.sessions.try_borrow_mut()?;
fn poll_sessions_sync(&self) {
let (prev_poll_state, mut prev_task_waker) = self.waker.update(|w| {
let prev_poll_state = replace(&mut w.poll_state, PollState::SyncPolling);
assert!(prev_poll_state != PollState::SyncPolling);
let prev_task_waker = w.task_waker.take();
(prev_poll_state, prev_task_waker)
});
futures::executor::block_on(futures::future::poll_fn(|cx| {
self.poll_sessions_inner(cx);
// Block the thread if either the `on_pause` or the `waiting_for_session`.
// is set. Otherwise, return `Ready(_)` to make `block_on()` return.
let flags = self.flags.borrow();
if flags.on_pause || flags.waiting_for_session {
Poll::Pending
} else {
Poll::Ready(())
}
}));
// Restore the previous poll state.
self.waker.update(|w| {
let replaced = replace(&mut w.poll_state, prev_poll_state);
assert_eq!(replaced, PollState::SyncPolling);
});
// The `block_on(...)` call above must have created a new `Waker` that will
// now be registered with `sessions.session_rx` and `sessions.established`.
// This has the consequence that when either of those streams transitions
// from `Pending` to `Ready`, they'll wake that (stale) waker, and the
// inspector task won't get notified. To avoid a hang, explicitly wake the
// inspector task here; when it gets polled, it will re-register the right
// waker (the `InspectorWaker`) with those streams.
if let Some(waker) = prev_task_waker.take() {
waker.wake();
}
}
fn poll_sessions(&self, invoker_cx: &mut Context) -> Poll<()> {
self.waker.update(|w| {
match w.poll_state {
PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling,
_ => unreachable!(),
PollState::Idle | PollState::Woken => {
w.poll_state = PollState::Polling;
w.inspector_ptr = Some(NonNull::from(self));
}
s => unreachable!("state in poll_sessions {:#?}", s),
};
});
@ -259,94 +313,83 @@ impl JsRuntimeInspector {
let cx = &mut Context::from_waker(&waker_ref);
loop {
loop {
// Do one "handshake" with a newly connected session at a time.
if let Some(mut session) = sessions.handshake.take() {
let poll_result = session.poll_next_unpin(cx);
match poll_result {
Poll::Pending => {
sessions.established.push(session);
continue;
}
Poll::Ready(Some(session_stream_item)) => {
let (v8_session_ptr, msg) = session_stream_item;
InspectorSession::dispatch_message(v8_session_ptr, msg);
sessions.established.push(session);
continue;
}
Poll::Ready(None) => {}
}
}
self.poll_sessions_inner(cx);
// Accept new connections.
let poll_result = sessions.session_rx.poll_next_unpin(cx);
if let Poll::Ready(Some(session_proxy)) = poll_result {
let session = InspectorSession::new(
sessions.v8_inspector.clone(),
session_proxy,
false,
);
let prev = sessions.handshake.replace(session);
assert!(prev.is_none());
}
// Poll established sessions.
match sessions.established.poll_next_unpin(cx) {
Poll::Ready(Some(session_stream_item)) => {
let (v8_session_ptr, msg) = session_stream_item;
InspectorSession::dispatch_message(v8_session_ptr, msg);
continue;
}
Poll::Ready(None) => break,
Poll::Pending => break,
};
{
let flags = self.flags.borrow();
assert!(!flags.on_pause);
assert!(!flags.waiting_for_session);
}
let should_block =
self.flags.borrow().on_pause || self.flags.borrow().waiting_for_session;
let new_state = self.waker.update(|w| {
let new_poll_state = self.waker.update(|w| {
match w.poll_state {
PollState::Woken => {
// The inspector was woken while the session handler was being
// polled, so we poll it another time.
// The inspector got woken up before the last round of polling was
// even over, so we need to do another round.
w.poll_state = PollState::Polling;
}
PollState::Polling if !should_block => {
// The session handler doesn't need to be polled any longer, and
// there's no reason to block (execution is not paused), so this
// function is about to return.
PollState::Polling => {
// Since all streams were polled until they all yielded `Pending`,
// there's nothing else we can do right now.
w.poll_state = PollState::Idle;
// Register the task waker that can be used to wake the parent
// task that will poll the inspector future.
if let Some(cx) = invoker_cx.take() {
w.task_waker.replace(cx.waker().clone());
}
// Register the address of the inspector, which allows the waker
// to request an interrupt from the isolate.
w.inspector_ptr = NonNull::new(self as *const _ as *mut Self);
}
PollState::Polling if should_block => {
// Isolate execution has been paused but there are no more
// events to process, so this thread will be parked. Therefore,
// store the current thread handle in the waker so it knows
// which thread to unpark when new events arrive.
w.poll_state = PollState::Parked;
w.parked_thread.replace(thread::current());
// Capture the waker that, when used, will get the inspector polled.
w.task_waker.replace(invoker_cx.waker().clone());
}
_ => unreachable!(),
};
w.poll_state
});
match new_state {
PollState::Idle => break Ok(Poll::Pending), // Yield to task.
PollState::Polling => {} // Poll the session handler again.
PollState::Parked => thread::park(), // Park the thread.
match new_poll_state {
PollState::Idle => break Poll::Pending,
PollState::Polling => continue, // Poll the session handler again.
_ => unreachable!(),
};
}
}
/// Accepts incoming connections from inspector clients, and polls established
/// inspector sessions for messages that need to be dispatched to V8. This
/// function will repeatedly poll its innner streams and will not return until
/// they all yield `Pending` or have ended.
fn poll_sessions_inner(&self, cx: &mut Context) {
loop {
let mut sessions = self.sessions.borrow_mut();
// Accept new connections.
let poll_result = sessions.session_rx.poll_next_unpin(cx);
match poll_result {
Poll::Ready(Some(session_proxy)) => {
let session = InspectorSession::new(
self.v8_inspector.clone(),
session_proxy,
false,
);
sessions.established.push(session);
// `session_rx` needs to be polled repeatedly until it is `Pending`.
continue;
}
Poll::Ready(None) => unreachable!(), // `session_rx` should never end.
Poll::Pending => {}
}
// Poll established inspector sessions.
let poll_result = sessions.established.poll_next_unpin(cx);
if let Poll::Ready(Some(session_stream_item)) = poll_result {
let (v8_session_ptr, msg) = session_stream_item;
// Don't hold the borrow on sessions while dispatching a message, as it
// might result in a call to `poll_sessions_sync`.
drop(sessions);
InspectorSession::dispatch_message(v8_session_ptr, msg);
// Loop around. We need to keep polling established sessions and
// accepting new ones until eventually everything is `Pending`.
continue;
}
break;
}
}
/// This function blocks the thread until at least one inspector client has
/// established a websocket connection.
///
@ -356,10 +399,12 @@ impl JsRuntimeInspector {
pub fn wait_for_session_and_break_on_next_statement(&mut self) {
loop {
match self.sessions.get_mut().established.iter_mut().next() {
Some(session) => break session.break_on_next_statement(),
Some(session) => {
break session.break_on_next_statement();
}
None => {
self.flags.get_mut().waiting_for_session = true;
let _ = self.poll_sessions(None).unwrap();
self.poll_sessions_sync();
}
};
}
@ -423,7 +468,6 @@ struct InspectorFlags {
struct SessionContainer {
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
session_rx: UnboundedReceiver<InspectorSessionProxy>,
handshake: Option<Box<InspectorSession>>,
established: SelectAll<Box<InspectorSession>>,
}
@ -435,7 +479,6 @@ impl SessionContainer {
Self {
v8_inspector,
session_rx: new_session_rx,
handshake: None,
established: SelectAll::new(),
}
}
@ -446,12 +489,11 @@ impl SessionContainer {
/// all sessions before dropping the inspector instance.
fn drop_sessions(&mut self) {
self.v8_inspector = Default::default();
self.handshake.take();
self.established.clear();
}
fn has_active_sessions(&self) -> bool {
!self.established.is_empty() || self.handshake.is_some()
!self.established.is_empty()
}
fn has_blocking_sessions(&self) -> bool {
@ -467,7 +509,6 @@ impl SessionContainer {
Self {
v8_inspector: Default::default(),
session_rx: rx,
handshake: None,
established: SelectAll::new(),
}
}
@ -476,7 +517,6 @@ impl SessionContainer {
struct InspectorWakerInner {
poll_state: PollState,
task_waker: Option<task::Waker>,
parked_thread: Option<thread::Thread>,
inspector_ptr: Option<NonNull<JsRuntimeInspector>>,
isolate_handle: v8::IsolateHandle,
}
@ -491,7 +531,6 @@ impl InspectorWaker {
let inner = InspectorWakerInner {
poll_state: PollState::Idle,
task_waker: None,
parked_thread: None,
inspector_ptr: None,
isolate_handle,
};
@ -507,43 +546,38 @@ impl InspectorWaker {
}
}
extern "C" fn handle_interrupt(_isolate: &mut v8::Isolate, arg: *mut c_void) {
// SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
// pointer to the latter is valid as long as waker is alive.
let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) };
inspector.poll_sessions_sync();
}
impl task::ArcWake for InspectorWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.update(|w| {
// Determine whether, given the current poll state, waking up is possible
// and necessary. If it is, change the poll state to `Woken`.
match w.poll_state {
PollState::Idle => {
// Wake the task, if any, that has polled the Inspector future last.
if let Some(waker) = w.task_waker.take() {
waker.wake()
}
// Request an interrupt from the isolate if it's running and there's
// not unhandled interrupt request in flight.
if let Some(arg) = w
.inspector_ptr
.take()
.map(|ptr| ptr.as_ptr() as *mut c_void)
{
w.isolate_handle.request_interrupt(handle_interrupt, arg);
}
extern "C" fn handle_interrupt(
_isolate: &mut v8::Isolate,
arg: *mut c_void,
) {
// SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
// pointer to the latter is valid as long as waker is alive.
let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) };
let _ = inspector.poll_sessions(None);
}
}
PollState::Parked => {
// Unpark the isolate thread.
let parked_thread = w.parked_thread.take().unwrap();
assert_ne!(parked_thread.id(), thread::current().id());
parked_thread.unpark();
}
_ => {}
PollState::Idle | PollState::Polling => w.poll_state = PollState::Woken,
PollState::Woken | PollState::Dropped => return, // Nothing to do.
PollState::SyncPolling => panic!("wake() called while sync polling"),
};
w.poll_state = PollState::Woken;
// Wake the task, if any, that has polled the Inspector future last.
if let Some(waker) = w.task_waker.take() {
waker.wake()
}
// Request an interrupt from the isolate, if the isolate is currently
// running and there isn't already an interrupt request in flight.
if let Some(arg) = w
.inspector_ptr
.take()
.map(|ptr| ptr.cast::<c_void>().as_ptr())
{
w.isolate_handle.request_interrupt(handle_interrupt, arg);
}
});
}
}
@ -568,6 +602,7 @@ impl InspectorSession {
blocking: bool,
) -> Box<Self> {
new_box_with(move |self_ptr| {
// TODO(bartlomieju): channel should probably be a separate struct
let v8_channel = v8::inspector::ChannelBase::new::<Self>();
let mut v8_inspector = v8_inspector_rc.borrow_mut();
let v8_inspector_ptr = v8_inspector.as_mut().unwrap();