diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs index 611310538c..a024393bb1 100644 --- a/cli/tools/test/channel.rs +++ b/cli/tools/test/channel.rs @@ -227,10 +227,10 @@ impl TestEventSenderFactory { /// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream. pub fn worker(&self) -> TestEventWorkerSender { let id = self.worker_id.fetch_add(1, Ordering::AcqRel); - let (stdout_reader, mut stdout_writer) = pipe().unwrap(); - let (stderr_reader, mut stderr_writer) = pipe().unwrap(); + let (stdout_reader, stdout_writer) = pipe().unwrap(); + let (stderr_reader, stderr_writer) = pipe().unwrap(); let (sync_sender, mut sync_receiver) = - tokio::sync::mpsc::unbounded_channel::(); + tokio::sync::mpsc::unbounded_channel::<(SendMutex, SendMutex)>(); let stdout = stdout_writer.try_clone().unwrap(); let stderr = stderr_writer.try_clone().unwrap(); let sender = self.sender.clone(); @@ -281,17 +281,17 @@ impl TestEventSenderFactory { // If the channel closed, we assume that all important data from the streams was synced, // so we just end this task immediately. None => { break }, - Some(mutex) => { - // If we fail to write the sync marker for flush (likely in the case where the runtime is shutting down), - // we instead just release the mutex and bail. - let success = stdout_writer.write_all(SYNC_MARKER).is_ok() - && stderr_writer.write_all(SYNC_MARKER).is_ok(); - if success { - for stream in [&mut test_stdout, &mut test_stderr] { + Some((mutex1, mutex2)) => { + // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for + // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock + // when the pipe is too full to accept the sync marker. + drop(mutex1); + for stream in [&mut test_stdout, &mut test_stderr] { + if stream.is_alive() { stream.read_until_sync_marker().await; } } - drop(mutex); + drop(mutex2); } } } @@ -313,6 +313,8 @@ impl TestEventSenderFactory { ref_count: Default::default(), sender: self.sender.clone(), sync_sender, + stdout_writer, + stderr_writer, }; TestEventWorkerSender { @@ -373,7 +375,9 @@ pub struct TestEventSender { pub id: usize, ref_count: Arc<()>, sender: UnboundedSender<(usize, TestEvent)>, - sync_sender: UnboundedSender, + sync_sender: UnboundedSender<(SendMutex, SendMutex)>, + stdout_writer: PipeWrite, + stderr_writer: PipeWrite, } impl Clone for TestEventSender { @@ -383,6 +387,8 @@ impl Clone for TestEventSender { ref_count: self.ref_count.clone(), sender: self.sender.clone(), sync_sender: self.sync_sender.clone(), + stdout_writer: self.stdout_writer.try_clone().unwrap(), + stderr_writer: self.stderr_writer.try_clone().unwrap(), } } } @@ -400,12 +406,27 @@ impl TestEventSender { /// Ensure that all output has been fully flushed by writing a sync marker into the /// stdout and stderr streams and waiting for it on the other side. pub fn flush(&mut self) -> Result<(), ChannelClosedError> { - let mutex = parking_lot::RawMutex::INIT; - mutex.lock(); - self.sync_sender.send(SendMutex(&mutex as _))?; - if !mutex.try_lock_for(Duration::from_secs(30)) { + // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for + // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock + // when the pipe is too full to accept the sync marker. + let mutex1 = parking_lot::RawMutex::INIT; + mutex1.lock(); + let mutex2 = parking_lot::RawMutex::INIT; + mutex2.lock(); + self + .sync_sender + .send((SendMutex(&mutex1 as _), SendMutex(&mutex2 as _)))?; + if !mutex1.try_lock_for(Duration::from_secs(30)) { panic!( - "Test flush deadlock, sender closed = {}", + "Test flush deadlock 1, sender closed = {}", + self.sync_sender.is_closed() + ); + } + _ = self.stdout_writer.write_all(SYNC_MARKER); + _ = self.stderr_writer.write_all(SYNC_MARKER); + if !mutex2.try_lock_for(Duration::from_secs(30)) { + panic!( + "Test flush deadlock 2, sender closed = {}", self.sync_sender.is_closed() ); } @@ -415,9 +436,8 @@ impl TestEventSender { #[cfg(test)] mod tests { - use crate::tools::test::TestResult; - use super::*; + use crate::tools::test::TestResult; use deno_core::unsync::spawn; use deno_core::unsync::spawn_blocking; @@ -499,6 +519,85 @@ mod tests { assert_eq!(messages.len(), 100000); } + /// Test that flushing a large number of times doesn't hang. + #[tokio::test] + async fn test_flush_large() { + test_util::timeout!(240); + let (mut worker, mut receiver) = create_single_test_event_channel(); + let recv_handle = spawn(async move { + let mut queue = vec![]; + while let Some((_, message)) = receiver.recv().await { + if let TestEvent::StepWait(..) = message { + queue.push(()); + } + } + eprintln!("Receiver closed"); + queue + }); + let send_handle = spawn_blocking(move || { + for _ in 0..25000 { + // Write one pipe buffer's worth of message here. We try a few different sizes of potentially + // blocking writes. + worker.stderr.write_all(&[0; 4 * 1024]).unwrap(); + worker.sender.send(TestEvent::StepWait(1)).unwrap(); + worker.stderr.write_all(&[0; 16 * 1024]).unwrap(); + worker.sender.send(TestEvent::StepWait(1)).unwrap(); + worker.stderr.write_all(&[0; 64 * 1024]).unwrap(); + worker.sender.send(TestEvent::StepWait(1)).unwrap(); + worker.stderr.write_all(&[0; 128 * 1024]).unwrap(); + worker.sender.send(TestEvent::StepWait(1)).unwrap(); + } + eprintln!("Sent all messages"); + }); + send_handle.await.unwrap(); + let messages = recv_handle.await.unwrap(); + assert_eq!(messages.len(), 100000); + } + + /// Test that flushing a large number of times doesn't hang. + #[tokio::test] + async fn test_flush_with_close() { + test_util::timeout!(240); + let (worker, mut receiver) = create_single_test_event_channel(); + let TestEventWorkerSender { + mut sender, + stderr, + stdout, + } = worker; + let recv_handle = spawn(async move { + let mut queue = vec![]; + while let Some((_, _)) = receiver.recv().await { + queue.push(()); + } + eprintln!("Receiver closed"); + queue + }); + let send_handle = spawn_blocking(move || { + let mut stdout = Some(stdout); + let mut stderr = Some(stderr); + for i in 0..100000 { + if i == 20000 { + stdout.take(); + } + if i == 40000 { + stderr.take(); + } + if i % 2 == 0 { + if let Some(stdout) = &mut stdout { + stdout.write_all(b"message").unwrap(); + } + } else if let Some(stderr) = &mut stderr { + stderr.write_all(b"message").unwrap(); + } + sender.send(TestEvent::StepWait(1)).unwrap(); + } + eprintln!("Sent all messages"); + }); + send_handle.await.unwrap(); + let messages = recv_handle.await.unwrap(); + assert_eq!(messages.len(), 130000); + } + /// Test that large numbers of interleaved steps are routed properly. #[tokio::test] async fn test_interleave() {