cpython/Lib/test/test_dtrace.py

261 lines
8 KiB
Python

import dis
import os.path
import re
import subprocess
import sys
import sysconfig
import types
import unittest
from test import support
from test.support import findfile
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
def abspath(filename):
return os.path.abspath(findfile(filename, subdir="dtracedata"))
def normalize_trace_output(output):
"""Normalize DTrace output for comparison.
DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
are concatenated. So if the operating system moves our thread around, the
straight result can be "non-causal". So we add timestamps to the probe
firing, sort by that field, then strip it from the output"""
# When compiling with '--with-pydebug', strip '[# refs]' debug output.
output = re.sub(r"\[[0-9]+ refs\]", "", output)
try:
result = [
row.split("\t")
for row in output.splitlines()
if row and not row.startswith('#')
]
result.sort(key=lambda row: int(row[0]))
result = [row[1] for row in result]
return "\n".join(result)
except (IndexError, ValueError):
raise AssertionError(
"tracer produced unparsable output:\n{}".format(output)
)
class TraceBackend:
EXTENSION = None
COMMAND = None
COMMAND_ARGS = []
def run_case(self, name, optimize_python=None):
actual_output = normalize_trace_output(self.trace_python(
script_file=abspath(name + self.EXTENSION),
python_file=abspath(name + ".py"),
optimize_python=optimize_python))
with open(abspath(name + self.EXTENSION + ".expected")) as f:
expected_output = f.read().rstrip()
return (expected_output, actual_output)
def generate_trace_command(self, script_file, subcommand=None):
command = self.COMMAND + [script_file]
if subcommand:
command += ["-c", subcommand]
return command
def trace(self, script_file, subcommand=None):
command = self.generate_trace_command(script_file, subcommand)
stdout, _ = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True).communicate()
return stdout
def trace_python(self, script_file, python_file, optimize_python=None):
python_flags = []
if optimize_python:
python_flags.extend(["-O"] * optimize_python)
subcommand = " ".join([sys.executable] + python_flags + [python_file])
return self.trace(script_file, subcommand)
def assert_usable(self):
try:
output = self.trace(abspath("assert_usable" + self.EXTENSION))
output = output.strip()
except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
output = str(fnfe)
if output != "probe: success":
raise unittest.SkipTest(
"{}(1) failed: {}".format(self.COMMAND[0], output)
)
class DTraceBackend(TraceBackend):
EXTENSION = ".d"
COMMAND = ["dtrace", "-q", "-s"]
class SystemTapBackend(TraceBackend):
EXTENSION = ".stp"
COMMAND = ["stap", "-g"]
class TraceTests:
# unittest.TestCase options
maxDiff = None
# TraceTests options
backend = None
optimize_python = 0
@classmethod
def setUpClass(self):
self.backend.assert_usable()
def run_case(self, name):
actual_output, expected_output = self.backend.run_case(
name, optimize_python=self.optimize_python)
self.assertEqual(actual_output, expected_output)
def test_function_entry_return(self):
self.run_case("call_stack")
def test_verify_call_opcodes(self):
"""Ensure our call stack test hits all function call opcodes"""
opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
with open(abspath("call_stack.py")) as f:
code_string = f.read()
def get_function_instructions(funcname):
# Recompile with appropriate optimization setting
code = compile(source=code_string,
filename="<string>",
mode="exec",
optimize=self.optimize_python)
for c in code.co_consts:
if isinstance(c, types.CodeType) and c.co_name == funcname:
return dis.get_instructions(c)
return []
for instruction in get_function_instructions('start'):
opcodes.discard(instruction.opname)
self.assertEqual(set(), opcodes)
def test_gc(self):
self.run_case("gc")
def test_line(self):
self.run_case("line")
class DTraceNormalTests(TraceTests, unittest.TestCase):
backend = DTraceBackend()
optimize_python = 0
class DTraceOptimizedTests(TraceTests, unittest.TestCase):
backend = DTraceBackend()
optimize_python = 2
class SystemTapNormalTests(TraceTests, unittest.TestCase):
backend = SystemTapBackend()
optimize_python = 0
class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
backend = SystemTapBackend()
optimize_python = 2
class CheckDtraceProbes(unittest.TestCase):
@classmethod
def setUpClass(cls):
if sysconfig.get_config_var('WITH_DTRACE'):
readelf_major_version, readelf_minor_version = cls.get_readelf_version()
if support.verbose:
print(f"readelf version: {readelf_major_version}.{readelf_minor_version}")
else:
raise unittest.SkipTest("CPython must be configured with the --with-dtrace option.")
@staticmethod
def get_readelf_version():
try:
cmd = ["readelf", "--version"]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
with proc:
version, stderr = proc.communicate()
if proc.returncode:
raise Exception(
f"Command {' '.join(cmd)!r} failed "
f"with exit code {proc.returncode}: "
f"stdout={version!r} stderr={stderr!r}"
)
except OSError:
raise unittest.SkipTest("Couldn't find readelf on the path")
# Regex to parse:
# 'GNU readelf (GNU Binutils) 2.40.0\n' -> 2.40
match = re.search(r"^(?:GNU) readelf.*?\b(\d+)\.(\d+)", version)
if match is None:
raise unittest.SkipTest(f"Unable to parse readelf version: {version}")
return int(match.group(1)), int(match.group(2))
def get_readelf_output(self):
command = ["readelf", "-n", sys.executable]
stdout, _ = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
).communicate()
return stdout
def test_check_probes(self):
readelf_output = self.get_readelf_output()
available_probe_names = [
"Name: import__find__load__done",
"Name: import__find__load__start",
"Name: audit",
"Name: gc__start",
"Name: gc__done",
]
for probe_name in available_probe_names:
with self.subTest(probe_name=probe_name):
self.assertIn(probe_name, readelf_output)
@unittest.expectedFailure
def test_missing_probes(self):
readelf_output = self.get_readelf_output()
# Missing probes will be added in the future.
missing_probe_names = [
"Name: function__entry",
"Name: function__return",
"Name: line",
]
for probe_name in missing_probe_names:
with self.subTest(probe_name=probe_name):
self.assertIn(probe_name, readelf_output)
if __name__ == '__main__':
unittest.main()