cpython/Lib/test/libregrtest/result.py
Łukasz Langa 3932b0f7b1
gh-110722: Make -m test -T -j use sys.monitoring (GH-111710)
Now all results from worker processes are aggregated and
displayed together as a summary at the end of a regrtest run.

The traditional trace is left in place for use with sequential
in-process test runs but now raises a warning that those
numbers are not precise.

`-T -j` requires `--with-pydebug` as it relies on `-Xpresite=`.
2023-11-10 18:17:45 +01:00

225 lines
7.4 KiB
Python

import dataclasses
import json
from typing import Any
from .utils import (
StrJSON, TestName, FilterTuple,
format_duration, normalize_test_name, print_warning)
@dataclasses.dataclass(slots=True)
class TestStats:
tests_run: int = 0
failures: int = 0
skipped: int = 0
@staticmethod
def from_unittest(result):
return TestStats(result.testsRun,
len(result.failures),
len(result.skipped))
@staticmethod
def from_doctest(results):
return TestStats(results.attempted,
results.failed,
results.skipped)
def accumulate(self, stats):
self.tests_run += stats.tests_run
self.failures += stats.failures
self.skipped += stats.skipped
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"
UNCAUGHT_EXC = "UNCAUGHT_EXC"
REFLEAK = "REFLEAK"
ENV_CHANGED = "ENV_CHANGED"
RESOURCE_DENIED = "RESOURCE_DENIED"
INTERRUPTED = "INTERRUPTED"
WORKER_FAILED = "WORKER_FAILED" # non-zero worker process exit code
WORKER_BUG = "WORKER_BUG" # exception when running a worker
DID_NOT_RUN = "DID_NOT_RUN"
TIMEOUT = "TIMEOUT"
@staticmethod
def is_failed(state):
return state in {
State.FAILED,
State.UNCAUGHT_EXC,
State.REFLEAK,
State.WORKER_FAILED,
State.WORKER_BUG,
State.TIMEOUT}
@staticmethod
def has_meaningful_duration(state):
# Consider that the duration is meaningless for these cases.
# For example, if a whole test file is skipped, its duration
# is unlikely to be the duration of executing its tests,
# but just the duration to execute code which skips the test.
return state not in {
State.SKIPPED,
State.RESOURCE_DENIED,
State.INTERRUPTED,
State.WORKER_FAILED,
State.WORKER_BUG,
State.DID_NOT_RUN}
@staticmethod
def must_stop(state):
return state in {
State.INTERRUPTED,
State.WORKER_BUG,
}
FileName = str
LineNo = int
Location = tuple[FileName, LineNo]
@dataclasses.dataclass(slots=True)
class TestResult:
test_name: TestName
state: str | None = None
# Test duration in seconds
duration: float | None = None
xml_data: list[str] | None = None
stats: TestStats | None = None
# errors and failures copied from support.TestFailedWithDetails
errors: list[tuple[str, str]] | None = None
failures: list[tuple[str, str]] | None = None
# partial coverage in a worker run; not used by sequential in-process runs
covered_lines: list[Location] | None = None
def is_failed(self, fail_env_changed: bool) -> bool:
if self.state == State.ENV_CHANGED:
return fail_env_changed
return State.is_failed(self.state)
def _format_failed(self):
if self.errors and self.failures:
le = len(self.errors)
lf = len(self.failures)
error_s = "error" + ("s" if le > 1 else "")
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
if self.errors:
le = len(self.errors)
error_s = "error" + ("s" if le > 1 else "")
return f"{self.test_name} failed ({le} {error_s})"
if self.failures:
lf = len(self.failures)
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.test_name} failed ({lf} {failure_s})"
return f"{self.test_name} failed"
def __str__(self) -> str:
match self.state:
case State.PASSED:
return f"{self.test_name} passed"
case State.FAILED:
return self._format_failed()
case State.SKIPPED:
return f"{self.test_name} skipped"
case State.UNCAUGHT_EXC:
return f"{self.test_name} failed (uncaught exception)"
case State.REFLEAK:
return f"{self.test_name} failed (reference leak)"
case State.ENV_CHANGED:
return f"{self.test_name} failed (env changed)"
case State.RESOURCE_DENIED:
return f"{self.test_name} skipped (resource denied)"
case State.INTERRUPTED:
return f"{self.test_name} interrupted"
case State.WORKER_FAILED:
return f"{self.test_name} worker non-zero exit code"
case State.WORKER_BUG:
return f"{self.test_name} worker bug"
case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests"
case State.TIMEOUT:
return f"{self.test_name} timed out ({format_duration(self.duration)})"
case _:
raise ValueError("unknown result state: {state!r}")
def has_meaningful_duration(self):
return State.has_meaningful_duration(self.state)
def set_env_changed(self):
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
if State.must_stop(self.state):
return True
if fail_fast and self.is_failed(fail_env_changed):
return True
return False
def get_rerun_match_tests(self) -> FilterTuple | None:
match_tests = []
errors = self.errors or []
failures = self.failures or []
for error_list, is_error in (
(errors, True),
(failures, False),
):
for full_name, *_ in error_list:
match_name = normalize_test_name(full_name, is_error=is_error)
if match_name is None:
# 'setUpModule (test.test_sys)': don't filter tests
return None
if not match_name:
error_type = "ERROR" if is_error else "FAIL"
print_warning(f"rerun failed to parse {error_type} test name: "
f"{full_name!r}: don't filter tests")
return None
match_tests.append(match_name)
if not match_tests:
return None
return tuple(match_tests)
def write_json_into(self, file) -> None:
json.dump(self, file, cls=_EncodeTestResult)
@staticmethod
def from_json(worker_json: StrJSON) -> 'TestResult':
return json.loads(worker_json, object_hook=_decode_test_result)
class _EncodeTestResult(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, TestResult):
result = dataclasses.asdict(o)
result["__test_result__"] = o.__class__.__name__
return result
else:
return super().default(o)
def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
if "__test_result__" in data:
data.pop('__test_result__')
if data['stats'] is not None:
data['stats'] = TestStats(**data['stats'])
if data['covered_lines'] is not None:
data['covered_lines'] = [
tuple(loc) for loc in data['covered_lines']
]
return TestResult(**data)
else:
return data