fix(cli): TestEventSender should be !Clone (#23405)

`TestEventSender` should not be Clone so we don't end up with multiple
copies of the same writer FD. This is probably not the cause of the test
channel lockups, but it's a lot easier to reason about.
This commit is contained in:
Matt Mastracci 2024-04-16 12:54:50 -06:00 committed by GitHub
parent 760d64bc6b
commit c4d0fceec3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 158 additions and 110 deletions

View File

@ -92,7 +92,7 @@ pub async fn kernel(
permissions,
vec![
ops::jupyter::deno_jupyter::init_ops(stdio_tx.clone()),
ops::testing::deno_test::init_ops(test_event_sender.clone()),
ops::testing::deno_test::init_ops(test_event_sender),
],
// FIXME(nayeemrmn): Test output capturing currently doesn't work.
Stdio {
@ -114,7 +114,6 @@ pub async fn kernel(
resolver,
worker,
main_module,
test_event_sender,
test_event_receiver,
)
.await?;

View File

@ -172,9 +172,7 @@ pub async fn run(flags: Flags, repl_flags: ReplFlags) -> Result<i32, AnyError> {
.create_custom_worker(
main_module.clone(),
permissions,
vec![crate::ops::testing::deno_test::init_ops(
test_event_sender.clone(),
)],
vec![crate::ops::testing::deno_test::init_ops(test_event_sender)],
Default::default(),
)
.await?;
@ -186,7 +184,6 @@ pub async fn run(flags: Flags, repl_flags: ReplFlags) -> Result<i32, AnyError> {
resolver,
worker,
main_module,
test_event_sender,
test_event_receiver,
)
.await?;

View File

@ -12,10 +12,10 @@ use crate::tools::test::report_tests;
use crate::tools::test::reporters::PrettyTestReporter;
use crate::tools::test::reporters::TestReporter;
use crate::tools::test::run_tests_for_worker;
use crate::tools::test::send_test_event;
use crate::tools::test::worker_has_tests;
use crate::tools::test::TestEvent;
use crate::tools::test::TestEventReceiver;
use crate::tools::test::TestEventSender;
use deno_ast::diagnostics::Diagnostic;
use deno_ast::swc::ast as swc_ast;
@ -184,7 +184,6 @@ pub struct ReplSession {
referrer: ModuleSpecifier,
main_module: ModuleSpecifier,
test_reporter_factory: Box<dyn Fn() -> Box<dyn TestReporter>>,
test_event_sender: TestEventSender,
/// This is only optional because it's temporarily taken when evaluating.
test_event_receiver: Option<TestEventReceiver>,
jsx: ReplJsxState,
@ -198,7 +197,6 @@ impl ReplSession {
resolver: Arc<CliGraphResolver>,
mut worker: MainWorker,
main_module: ModuleSpecifier,
test_event_sender: TestEventSender,
test_event_receiver: TestEventReceiver,
) -> Result<Self, AnyError> {
let language_server = ReplLanguageServer::new_initialized().await?;
@ -278,7 +276,6 @@ impl ReplSession {
))
}),
main_module,
test_event_sender,
test_event_receiver: Some(test_event_receiver),
jsx: ReplJsxState {
factory: "React.createElement".to_string(),
@ -466,10 +463,11 @@ impl ReplSession {
)
.await
.unwrap();
self
.test_event_sender
.send(TestEvent::ForceEndReport)
.unwrap();
send_test_event(
&self.worker.js_runtime.op_state(),
TestEvent::ForceEndReport,
)
.unwrap();
self.test_event_receiver = Some(report_tests_handle.await.unwrap().1);
}

View File

@ -16,7 +16,6 @@ use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::ready;
use std::task::Poll;
use std::time::Duration;
@ -310,7 +309,6 @@ impl TestEventSenderFactory {
let sender = TestEventSender {
id,
ref_count: Default::default(),
sender: self.sender.clone(),
sync_sender,
stdout_writer,
@ -373,26 +371,12 @@ pub struct TestEventWorkerSender {
/// are not guaranteed to be sent on drop unless flush is explicitly called.
pub struct TestEventSender {
pub id: usize,
ref_count: Arc<()>,
sender: UnboundedSender<(usize, TestEvent)>,
sync_sender: UnboundedSender<(SendMutex, SendMutex)>,
stdout_writer: PipeWrite,
stderr_writer: PipeWrite,
}
impl Clone for TestEventSender {
fn clone(&self) -> Self {
Self {
id: self.id,
ref_count: self.ref_count.clone(),
sender: self.sender.clone(),
sync_sender: self.sync_sender.clone(),
stdout_writer: self.stdout_writer.try_clone().unwrap(),
stderr_writer: self.stderr_writer.try_clone().unwrap(),
}
}
}
impl TestEventSender {
pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> {
// Certain messages require us to ensure that all output has been drained to ensure proper

View File

@ -21,6 +21,7 @@ use crate::util::path::is_script_ext;
use crate::util::path::mapped_specifier_for_tsc;
use crate::util::path::matches_pattern_or_exact_path;
use crate::worker::CliMainWorkerFactory;
use crate::worker::CoverageCollector;
use deno_ast::swc::common::comments::CommentKind;
use deno_ast::MediaType;
@ -49,6 +50,7 @@ use deno_core::unsync::spawn_blocking;
use deno_core::url::Url;
use deno_core::v8;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_core::PollEventLoopOptions;
use deno_runtime::deno_io::Stdio;
use deno_runtime::deno_io::StdioPipe;
@ -66,6 +68,7 @@ use rand::SeedableRng;
use regex::Regex;
use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
@ -571,23 +574,81 @@ fn get_test_reporter(options: &TestSpecifiersOptions) -> Box<dyn TestReporter> {
reporter
}
async fn configure_main_worker(
worker_factory: Arc<CliMainWorkerFactory>,
specifier: &Url,
permissions: Permissions,
worker_sender: TestEventWorkerSender,
options: &TestSpecifierOptions,
) -> Result<(Option<Box<dyn CoverageCollector>>, MainWorker), anyhow::Error> {
let mut worker = worker_factory
.create_custom_worker(
specifier.clone(),
PermissionsContainer::new(permissions),
vec![ops::testing::deno_test::init_ops(worker_sender.sender)],
Stdio {
stdin: StdioPipe::inherit(),
stdout: StdioPipe::file(worker_sender.stdout),
stderr: StdioPipe::file(worker_sender.stderr),
},
)
.await?;
let coverage_collector = worker.maybe_setup_coverage_collector().await?;
if options.trace_leaks {
worker.execute_script_static(
located_script_name!(),
"Deno[Deno.internal].core.setLeakTracingEnabled(true);",
)?;
}
let res = worker.execute_side_module_possibly_with_npm().await;
let mut worker = worker.into_main_worker();
match res {
Ok(()) => Ok(()),
Err(error) => {
// TODO(mmastrac): It would be nice to avoid having this error pattern repeated
if error.is::<JsError>() {
send_test_event(
&worker.js_runtime.op_state(),
TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
),
)?;
Ok(())
} else {
Err(error)
}
}
}?;
Ok((coverage_collector, worker))
}
/// Test a single specifier as documentation containing test programs, an executable test module or
/// both.
pub async fn test_specifier(
worker_factory: Arc<CliMainWorkerFactory>,
permissions: Permissions,
specifier: ModuleSpecifier,
mut worker_sender: TestEventWorkerSender,
worker_sender: TestEventWorkerSender,
fail_fast_tracker: FailFastTracker,
options: TestSpecifierOptions,
) -> Result<(), AnyError> {
match test_specifier_inner(
if fail_fast_tracker.should_stop() {
return Ok(());
}
let (coverage_collector, mut worker) = configure_main_worker(
worker_factory,
&specifier,
permissions,
worker_sender,
&options,
)
.await?;
match test_specifier_inner(
&mut worker,
coverage_collector,
specifier.clone(),
&mut worker_sender.sender,
StdioPipe::file(worker_sender.stdout),
StdioPipe::file(worker_sender.stderr),
fail_fast_tracker,
options,
)
@ -595,11 +656,15 @@ pub async fn test_specifier(
{
Ok(()) => Ok(()),
Err(error) => {
// TODO(mmastrac): It would be nice to avoid having this error pattern repeated
if error.is::<JsError>() {
worker_sender.sender.send(TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
))?;
send_test_event(
&worker.js_runtime.op_state(),
TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
),
)?;
Ok(())
} else {
Err(error)
@ -612,51 +677,18 @@ pub async fn test_specifier(
/// both.
#[allow(clippy::too_many_arguments)]
async fn test_specifier_inner(
worker_factory: Arc<CliMainWorkerFactory>,
permissions: Permissions,
worker: &mut MainWorker,
mut coverage_collector: Option<Box<dyn CoverageCollector>>,
specifier: ModuleSpecifier,
sender: &mut TestEventSender,
stdout: StdioPipe,
stderr: StdioPipe,
fail_fast_tracker: FailFastTracker,
options: TestSpecifierOptions,
) -> Result<(), AnyError> {
if fail_fast_tracker.should_stop() {
return Ok(());
}
let mut worker = worker_factory
.create_custom_worker(
specifier.clone(),
PermissionsContainer::new(permissions),
vec![ops::testing::deno_test::init_ops(sender.clone())],
Stdio {
stdin: StdioPipe::inherit(),
stdout,
stderr,
},
)
.await?;
let mut coverage_collector = worker.maybe_setup_coverage_collector().await?;
if options.trace_leaks {
worker.execute_script_static(
located_script_name!(),
"Deno[Deno.internal].core.setLeakTracingEnabled(true);",
)?;
}
// We execute the main module as a side module so that import.meta.main is not set.
worker.execute_side_module_possibly_with_npm().await?;
let mut worker = worker.into_main_worker();
// Ensure that there are no pending exceptions before we start running tests
worker.run_up_to_duration(Duration::from_millis(0)).await?;
worker.dispatch_load_event()?;
run_tests_for_worker(&mut worker, &specifier, &options, &fail_fast_tracker)
run_tests_for_worker(worker, &specifier, &options, &fail_fast_tracker)
.await?;
// Ignore `defaultPrevented` of the `beforeunload` event. We don't allow the
@ -665,13 +697,18 @@ async fn test_specifier_inner(
worker.dispatch_unload_event()?;
// Ensure all output has been flushed
_ = sender.flush();
_ = worker
.js_runtime
.op_state()
.borrow_mut()
.borrow_mut::<TestEventSender>()
.flush();
// Ensure the worker has settled so we can catch any remaining unhandled rejections. We don't
// want to wait forever here.
worker.run_up_to_duration(Duration::from_millis(0)).await?;
if let Some(coverage_collector) = coverage_collector.as_mut() {
if let Some(coverage_collector) = &mut coverage_collector {
worker
.js_runtime
.with_event_loop_future(
@ -708,33 +745,42 @@ pub async fn poll_event_loop(worker: &mut MainWorker) -> Result<(), AnyError> {
.await
}
pub fn send_test_event(
op_state: &RefCell<OpState>,
event: TestEvent,
) -> Result<(), AnyError> {
Ok(
op_state
.borrow_mut()
.borrow_mut::<TestEventSender>()
.send(event)?,
)
}
pub async fn run_tests_for_worker(
worker: &mut MainWorker,
specifier: &ModuleSpecifier,
options: &TestSpecifierOptions,
fail_fast_tracker: &FailFastTracker,
) -> Result<(), AnyError> {
let (TestContainer(tests, test_functions), mut sender) = {
let state_rc = worker.js_runtime.op_state();
let mut state = state_rc.borrow_mut();
(
std::mem::take(&mut *state.borrow_mut::<TestContainer>()),
state.borrow::<TestEventSender>().clone(),
)
};
let state_rc = worker.js_runtime.op_state();
// Take whatever tests have been registered
let TestContainer(tests, test_functions) =
std::mem::take(&mut *state_rc.borrow_mut().borrow_mut::<TestContainer>());
let tests: Arc<TestDescriptions> = tests.into();
sender.send(TestEvent::Register(tests.clone()))?;
send_test_event(&state_rc, TestEvent::Register(tests.clone()))?;
let res = run_tests_for_worker_inner(
worker,
specifier,
tests,
test_functions,
&mut sender,
options,
fail_fast_tracker,
)
.await;
_ = sender.send(TestEvent::Completed);
_ = send_test_event(&state_rc, TestEvent::Completed);
res
}
@ -743,11 +789,11 @@ async fn run_tests_for_worker_inner(
specifier: &ModuleSpecifier,
tests: Arc<TestDescriptions>,
test_functions: Vec<v8::Global<v8::Function>>,
sender: &mut TestEventSender,
options: &TestSpecifierOptions,
fail_fast_tracker: &FailFastTracker,
) -> Result<(), AnyError> {
let unfiltered = tests.len();
let state_rc = worker.js_runtime.op_state();
// Build the test plan in a single pass
let mut tests_to_run = Vec::with_capacity(tests.len());
@ -775,12 +821,15 @@ async fn run_tests_for_worker_inner(
tests_to_run.shuffle(&mut SmallRng::seed_from_u64(seed));
}
sender.send(TestEvent::Plan(TestPlan {
origin: specifier.to_string(),
total: tests_to_run.len(),
filtered_out: unfiltered - tests_to_run.len(),
used_only,
}))?;
send_test_event(
&state_rc,
TestEvent::Plan(TestPlan {
origin: specifier.to_string(),
total: tests_to_run.len(),
filtered_out: unfiltered - tests_to_run.len(),
used_only,
}),
)?;
let mut had_uncaught_error = false;
let stats = worker.js_runtime.runtime_activity_stats_factory();
@ -835,14 +884,20 @@ async fn run_tests_for_worker_inner(
.try_take::<deno_runtime::deno_fetch::reqwest::Client>();
if desc.ignore {
sender.send(TestEvent::Result(desc.id, TestResult::Ignored, 0))?;
send_test_event(
&state_rc,
TestEvent::Result(desc.id, TestResult::Ignored, 0),
)?;
continue;
}
if had_uncaught_error {
sender.send(TestEvent::Result(desc.id, TestResult::Cancelled, 0))?;
send_test_event(
&state_rc,
TestEvent::Result(desc.id, TestResult::Cancelled, 0),
)?;
continue;
}
sender.send(TestEvent::Wait(desc.id))?;
send_test_event(&state_rc, TestEvent::Wait(desc.id))?;
// Poll event loop once, to allow all ops that are already resolved, but haven't
// responded to settle.
@ -863,12 +918,18 @@ async fn run_tests_for_worker_inner(
Ok(r) => r,
Err(error) => {
if error.is::<JsError>() {
sender.send(TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
))?;
send_test_event(
&state_rc,
TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
),
)?;
fail_fast_tracker.add_failure();
sender.send(TestEvent::Result(desc.id, TestResult::Cancelled, 0))?;
send_test_event(
&state_rc,
TestEvent::Result(desc.id, TestResult::Cancelled, 0),
)?;
had_uncaught_error = true;
continue;
} else {
@ -886,7 +947,10 @@ async fn run_tests_for_worker_inner(
if matches!(result, TestResult::Failed(_)) {
fail_fast_tracker.add_failure();
let elapsed = earlier.elapsed().as_millis();
sender.send(TestEvent::Result(desc.id, result, elapsed as u64))?;
send_test_event(
&state_rc,
TestEvent::Result(desc.id, result, elapsed as u64),
)?;
continue;
}
@ -907,17 +971,23 @@ async fn run_tests_for_worker_inner(
let failure = TestFailure::Leaked(formatted, trailer_notes);
fail_fast_tracker.add_failure();
let elapsed = earlier.elapsed().as_millis();
sender.send(TestEvent::Result(
desc.id,
TestResult::Failed(failure),
elapsed as u64,
))?;
send_test_event(
&state_rc,
TestEvent::Result(
desc.id,
TestResult::Failed(failure),
elapsed as u64,
),
)?;
continue;
}
}
let elapsed = earlier.elapsed().as_millis();
sender.send(TestEvent::Result(desc.id, result, elapsed as u64))?;
send_test_event(
&state_rc,
TestEvent::Result(desc.id, result, elapsed as u64),
)?;
}
Ok(())
}