freebsd-src/tests/atf_python/atf_pytest.py
Alexander V. Chernikov 3e5d0784b9 Testing: add framework for the kernel unit tests.
This changes intends to reduce the bar to the kernel unit-testing by
 introducing a new kernel-testing framework ("ktest") based on Netlink,
 loadable test modules and python test suite integration.

This framework provides the following features:
* Integration to the FreeBSD test suite
* Automatic test discovery
* Automatic test module loading
* Minimal boiler-plate code in both kernel and userland
* Passing any metadata to the test
* Convenient environment pre-setup using python testing framework
* Streaming messages from the kernel to the userland
* Running tests in the dedicated taskqueues
* Skipping or parametrizing tests

Differential Revision: https://reviews.freebsd.org/D39385
MFC after:	2 weeks
2023-04-14 15:47:55 +00:00

299 lines
11 KiB
Python

import types
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
from atf_python.ktest import generate_ktests
from atf_python.utils import nodeid_to_method_name
import pytest
import os
class ATFCleanupItem(pytest.Item):
def runtest(self):
"""Runs cleanup procedure for the test instead of the test itself"""
instance = self.parent.cls()
cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid))
if hasattr(instance, cleanup_name):
cleanup = getattr(instance, cleanup_name)
cleanup(self.nodeid)
elif hasattr(instance, "cleanup"):
instance.cleanup(self.nodeid)
def setup_method_noop(self, method):
"""Overrides runtest setup method"""
pass
def teardown_method_noop(self, method):
"""Overrides runtest teardown method"""
pass
class ATFTestObj(object):
def __init__(self, obj, has_cleanup):
# Use nodeid without name to properly name class-derived tests
self.ident = obj.nodeid.split("::", 1)[1]
self.description = self._get_test_description(obj)
self.has_cleanup = has_cleanup
self.obj = obj
def _get_test_description(self, obj):
"""Returns first non-empty line from func docstring or func name"""
if getattr(obj, "descr", None) is not None:
return getattr(obj, "descr")
docstr = obj.function.__doc__
if docstr:
for line in docstr.split("\n"):
if line:
return line
return obj.name
@staticmethod
def _convert_user_mark(mark, obj, ret: Dict):
username = mark.args[0]
if username == "unprivileged":
# Special unprivileged user requested.
# First, require the unprivileged-user config option presence
key = "require.config"
if key not in ret:
ret[key] = "unprivileged_user"
else:
ret[key] = "{} {}".format(ret[key], "unprivileged_user")
# Check if the framework requires root
test_cls = ATFHandler.get_test_class(obj)
if test_cls and getattr(test_cls, "NEED_ROOT", False):
# Yes, so we ask kyua to run us under root instead
# It is up to the implementation to switch back to the desired
# user
ret["require.user"] = "root"
else:
ret["require.user"] = username
def _convert_marks(self, obj) -> Dict[str, Any]:
wj_func = lambda x: " ".join(x) # noqa: E731
_map: Dict[str, Dict] = {
"require_user": {"handler": self._convert_user_mark},
"require_arch": {"name": "require.arch", "fmt": wj_func},
"require_diskspace": {"name": "require.diskspace"},
"require_files": {"name": "require.files", "fmt": wj_func},
"require_machine": {"name": "require.machine", "fmt": wj_func},
"require_memory": {"name": "require.memory"},
"require_progs": {"name": "require.progs", "fmt": wj_func},
"timeout": {},
}
ret = {}
for mark in obj.iter_markers():
if mark.name in _map:
if "handler" in _map[mark.name]:
_map[mark.name]["handler"](mark, obj, ret)
continue
name = _map[mark.name].get("name", mark.name)
if "fmt" in _map[mark.name]:
val = _map[mark.name]["fmt"](mark.args[0])
else:
val = mark.args[0]
ret[name] = val
return ret
def as_lines(self) -> List[str]:
"""Output test definition in ATF-specific format"""
ret = []
ret.append("ident: {}".format(self.ident))
ret.append("descr: {}".format(self._get_test_description(self.obj)))
if self.has_cleanup:
ret.append("has.cleanup: true")
for key, value in self._convert_marks(self.obj).items():
ret.append("{}: {}".format(key, value))
return ret
class ATFHandler(object):
class ReportState(NamedTuple):
state: str
reason: str
def __init__(self, report_file_name: Optional[str]):
self._tests_state_map: Dict[str, ReportStatus] = {}
self._report_file_name = report_file_name
self._report_file_handle = None
def setup_configure(self):
fname = self._report_file_name
if fname:
self._report_file_handle = open(fname, mode="w")
def setup_method_pre(self, item):
"""Called before actually running the test setup_method"""
# Check if we need to manually drop the privileges
for mark in item.iter_markers():
if mark.name == "require_user":
cls = self.get_test_class(item)
cls.TARGET_USER = mark.args[0]
break
def override_runtest(self, obj):
# Override basic runtest command
obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
# Override class setup/teardown
obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
@staticmethod
def get_test_class(obj):
if hasattr(obj, "parent") and obj.parent is not None:
if hasattr(obj.parent, "cls"):
return obj.parent.cls
def has_object_cleanup(self, obj):
cls = self.get_test_class(obj)
if cls is not None:
method_name = nodeid_to_method_name(obj.nodeid)
cleanup_name = "cleanup_{}".format(method_name)
if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name):
return True
return False
def _generate_test_cleanups(self, items):
new_items = []
for obj in items:
if self.has_object_cleanup(obj):
self.override_runtest(obj)
new_items.append(obj)
items.clear()
items.extend(new_items)
def expand_tests(self, collector, name, obj):
return generate_ktests(collector, name, obj)
def modify_tests(self, items, config):
if config.option.atf_cleanup:
self._generate_test_cleanups(items)
def list_tests(self, tests: List[str]):
print('Content-Type: application/X-atf-tp; version="1"')
print()
for test_obj in tests:
has_cleanup = self.has_object_cleanup(test_obj)
atf_test = ATFTestObj(test_obj, has_cleanup)
for line in atf_test.as_lines():
print(line)
print()
def set_report_state(self, test_name: str, state: str, reason: str):
self._tests_state_map[test_name] = self.ReportState(state, reason)
def _extract_report_reason(self, report):
data = report.longrepr
if data is None:
return None
if isinstance(data, Tuple):
# ('/path/to/test.py', 23, 'Skipped: unable to test')
reason = data[2]
for prefix in "Skipped: ":
if reason.startswith(prefix):
reason = reason[len(prefix):]
return reason
else:
# string/ traceback / exception report. Capture the last line
return str(data).split("\n")[-1]
return None
def add_report(self, report):
# MAP pytest report state to the atf-desired state
#
# ATF test states:
# (1) expected_death, (2) expected_exit, (3) expected_failure
# (4) expected_signal, (5) expected_timeout, (6) passed
# (7) skipped, (8) failed
#
# Note that ATF don't have the concept of "soft xfail" - xpass
# is a failure. It also calls teardown routine in a separate
# process, thus teardown states (pytest-only) are handled as
# body continuation.
# (stage, state, wasxfail)
# Just a passing test: WANT: passed
# GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
#
# Failing body test: WHAT: failed
# GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
#
# pytest.skip test decorator: WANT: skipped
# GOT: (setup,skipped, False), (teardown, passed, False)
#
# pytest.skip call inside test function: WANT: skipped
# GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
#
# mark.xfail decorator+pytest.xfail: WANT: expected_failure
# GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
#
# mark.xfail decorator+pass: WANT: failed
# GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
test_name = report.location[2]
stage = report.when
state = report.outcome
reason = self._extract_report_reason(report)
# We don't care about strict xfail - it gets translated to False
if stage == "setup":
if state in ("skipped", "failed"):
# failed init -> failed test, skipped setup -> xskip
# for the whole test
self.set_report_state(test_name, state, reason)
elif stage == "call":
# "call" stage shouldn't matter if setup failed
if test_name in self._tests_state_map:
if self._tests_state_map[test_name].state == "failed":
return
if state == "failed":
# Record failure & override "skipped" state
self.set_report_state(test_name, state, reason)
elif state == "skipped":
if hasattr(reason, "wasxfail"):
# xfail() called in the test body
state = "expected_failure"
else:
# skip inside the body
pass
self.set_report_state(test_name, state, reason)
elif state == "passed":
if hasattr(reason, "wasxfail"):
# the test was expected to fail but didn't
# mark as hard failure
state = "failed"
self.set_report_state(test_name, state, reason)
elif stage == "teardown":
if state == "failed":
# teardown should be empty, as the cleanup
# procedures should be implemented as a separate
# function/method, so mark teardown failure as
# global failure
self.set_report_state(test_name, state, reason)
def write_report(self):
if self._report_file_handle is None:
return
if self._tests_state_map:
# If we're executing in ATF mode, there has to be just one test
# Anyway, deterministically pick the first one
first_test_name = next(iter(self._tests_state_map))
test = self._tests_state_map[first_test_name]
if test.state == "passed":
line = test.state
else:
line = "{}: {}".format(test.state, test.reason)
print(line, file=self._report_file_handle)
self._report_file_handle.close()
@staticmethod
def get_atf_vars() -> Dict[str, str]:
px = "_ATF_VAR_"
return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}