fix(cli): fix deadlock in test writer when test pipe is full (#23210)

The tests would deadlock if we tried to write the sync marker into a
pipe that was full because one test streamed just enough data to fill
the pipe, so when we went to actually write the sync marker we blocked
when nobody was reading.

We use a two-phase lock for sync markers now: one to indicate "ready to
sync" and the second to indicate that the sync bytes have been received.
This commit is contained in:
Matt Mastracci 2024-04-04 12:06:58 -06:00 committed by GitHub
parent 207349cfb7
commit 7cc584ed79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -227,10 +227,10 @@ impl TestEventSenderFactory {
/// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream.
pub fn worker(&self) -> TestEventWorkerSender {
let id = self.worker_id.fetch_add(1, Ordering::AcqRel);
let (stdout_reader, mut stdout_writer) = pipe().unwrap();
let (stderr_reader, mut stderr_writer) = pipe().unwrap();
let (stdout_reader, stdout_writer) = pipe().unwrap();
let (stderr_reader, stderr_writer) = pipe().unwrap();
let (sync_sender, mut sync_receiver) =
tokio::sync::mpsc::unbounded_channel::<SendMutex>();
tokio::sync::mpsc::unbounded_channel::<(SendMutex, SendMutex)>();
let stdout = stdout_writer.try_clone().unwrap();
let stderr = stderr_writer.try_clone().unwrap();
let sender = self.sender.clone();
@ -281,17 +281,17 @@ impl TestEventSenderFactory {
// If the channel closed, we assume that all important data from the streams was synced,
// so we just end this task immediately.
None => { break },
Some(mutex) => {
// If we fail to write the sync marker for flush (likely in the case where the runtime is shutting down),
// we instead just release the mutex and bail.
let success = stdout_writer.write_all(SYNC_MARKER).is_ok()
&& stderr_writer.write_all(SYNC_MARKER).is_ok();
if success {
for stream in [&mut test_stdout, &mut test_stderr] {
Some((mutex1, mutex2)) => {
// Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
// the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
// when the pipe is too full to accept the sync marker.
drop(mutex1);
for stream in [&mut test_stdout, &mut test_stderr] {
if stream.is_alive() {
stream.read_until_sync_marker().await;
}
}
drop(mutex);
drop(mutex2);
}
}
}
@ -313,6 +313,8 @@ impl TestEventSenderFactory {
ref_count: Default::default(),
sender: self.sender.clone(),
sync_sender,
stdout_writer,
stderr_writer,
};
TestEventWorkerSender {
@ -373,7 +375,9 @@ pub struct TestEventSender {
pub id: usize,
ref_count: Arc<()>,
sender: UnboundedSender<(usize, TestEvent)>,
sync_sender: UnboundedSender<SendMutex>,
sync_sender: UnboundedSender<(SendMutex, SendMutex)>,
stdout_writer: PipeWrite,
stderr_writer: PipeWrite,
}
impl Clone for TestEventSender {
@ -383,6 +387,8 @@ impl Clone for TestEventSender {
ref_count: self.ref_count.clone(),
sender: self.sender.clone(),
sync_sender: self.sync_sender.clone(),
stdout_writer: self.stdout_writer.try_clone().unwrap(),
stderr_writer: self.stderr_writer.try_clone().unwrap(),
}
}
}
@ -400,12 +406,27 @@ impl TestEventSender {
/// Ensure that all output has been fully flushed by writing a sync marker into the
/// stdout and stderr streams and waiting for it on the other side.
pub fn flush(&mut self) -> Result<(), ChannelClosedError> {
let mutex = parking_lot::RawMutex::INIT;
mutex.lock();
self.sync_sender.send(SendMutex(&mutex as _))?;
if !mutex.try_lock_for(Duration::from_secs(30)) {
// Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
// the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
// when the pipe is too full to accept the sync marker.
let mutex1 = parking_lot::RawMutex::INIT;
mutex1.lock();
let mutex2 = parking_lot::RawMutex::INIT;
mutex2.lock();
self
.sync_sender
.send((SendMutex(&mutex1 as _), SendMutex(&mutex2 as _)))?;
if !mutex1.try_lock_for(Duration::from_secs(30)) {
panic!(
"Test flush deadlock, sender closed = {}",
"Test flush deadlock 1, sender closed = {}",
self.sync_sender.is_closed()
);
}
_ = self.stdout_writer.write_all(SYNC_MARKER);
_ = self.stderr_writer.write_all(SYNC_MARKER);
if !mutex2.try_lock_for(Duration::from_secs(30)) {
panic!(
"Test flush deadlock 2, sender closed = {}",
self.sync_sender.is_closed()
);
}
@ -415,9 +436,8 @@ impl TestEventSender {
#[cfg(test)]
mod tests {
use crate::tools::test::TestResult;
use super::*;
use crate::tools::test::TestResult;
use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
@ -499,6 +519,85 @@ mod tests {
assert_eq!(messages.len(), 100000);
}
/// Test that flushing a large number of times doesn't hang.
#[tokio::test]
async fn test_flush_large() {
test_util::timeout!(240);
let (mut worker, mut receiver) = create_single_test_event_channel();
let recv_handle = spawn(async move {
let mut queue = vec![];
while let Some((_, message)) = receiver.recv().await {
if let TestEvent::StepWait(..) = message {
queue.push(());
}
}
eprintln!("Receiver closed");
queue
});
let send_handle = spawn_blocking(move || {
for _ in 0..25000 {
// Write one pipe buffer's worth of message here. We try a few different sizes of potentially
// blocking writes.
worker.stderr.write_all(&[0; 4 * 1024]).unwrap();
worker.sender.send(TestEvent::StepWait(1)).unwrap();
worker.stderr.write_all(&[0; 16 * 1024]).unwrap();
worker.sender.send(TestEvent::StepWait(1)).unwrap();
worker.stderr.write_all(&[0; 64 * 1024]).unwrap();
worker.sender.send(TestEvent::StepWait(1)).unwrap();
worker.stderr.write_all(&[0; 128 * 1024]).unwrap();
worker.sender.send(TestEvent::StepWait(1)).unwrap();
}
eprintln!("Sent all messages");
});
send_handle.await.unwrap();
let messages = recv_handle.await.unwrap();
assert_eq!(messages.len(), 100000);
}
/// Test that flushing a large number of times doesn't hang.
#[tokio::test]
async fn test_flush_with_close() {
test_util::timeout!(240);
let (worker, mut receiver) = create_single_test_event_channel();
let TestEventWorkerSender {
mut sender,
stderr,
stdout,
} = worker;
let recv_handle = spawn(async move {
let mut queue = vec![];
while let Some((_, _)) = receiver.recv().await {
queue.push(());
}
eprintln!("Receiver closed");
queue
});
let send_handle = spawn_blocking(move || {
let mut stdout = Some(stdout);
let mut stderr = Some(stderr);
for i in 0..100000 {
if i == 20000 {
stdout.take();
}
if i == 40000 {
stderr.take();
}
if i % 2 == 0 {
if let Some(stdout) = &mut stdout {
stdout.write_all(b"message").unwrap();
}
} else if let Some(stderr) = &mut stderr {
stderr.write_all(b"message").unwrap();
}
sender.send(TestEvent::StepWait(1)).unwrap();
}
eprintln!("Sent all messages");
});
send_handle.await.unwrap();
let messages = recv_handle.await.unwrap();
assert_eq!(messages.len(), 130000);
}
/// Test that large numbers of interleaved steps are routed properly.
#[tokio::test]
async fn test_interleave() {