fix #21076: turn signal module constants into enums

This commit is contained in:
Giampaolo Rodola' 2014-04-04 15:34:17 +02:00
parent bcc174615c
commit e09fb7198a
7 changed files with 138 additions and 8 deletions

View file

@ -65,6 +65,16 @@ Besides, only the main thread is allowed to set a new signal handler.
Module contents
---------------
.. versionchanged:: 3.5
signal (SIG*), handler (:const:`SIG_DFL`, :const:`SIG_IGN`) and sigmask
(:const:`SIG_BLOCK`, :const:`SIG_UNBLOCK`, :const:`SIG_SETMASK`)
related constants listed below were turned into
:class:`enums <enum.IntEnum>`.
:func:`getsignal`, :func:`pthread_sigmask`, :func:`sigpending` and
:func:`sigwait` functions return human-readable
:class:`enums <enum.IntEnum>`.
The variables defined in the :mod:`signal` module are:

View file

@ -134,6 +134,11 @@ New Modules
Improved Modules
================
* Different constants of :mod:`signal` module are now enumeration values using
the :mod:`enum` module. This allows meaningful names to be printed during
debugging, instead of integer “magic numbers”. (contribute by Giampaolo
Rodola' in :issue:`21076`)
* :class:`xmlrpc.client.ServerProxy` is now a :term:`context manager`
(contributed by Claudiu Popa in :issue:`20627`).

84
Lib/signal.py Normal file
View file

@ -0,0 +1,84 @@
import _signal
from _signal import *
from functools import wraps as _wraps
from enum import IntEnum as _IntEnum
_globals = globals()
Signals = _IntEnum(
'Signals',
{name: value for name, value in _globals.items()
if name.isupper()
and (name.startswith('SIG') and not name.startswith('SIG_'))
or name.startswith('CTRL_')})
class Handlers(_IntEnum):
SIG_DFL = _signal.SIG_DFL
SIG_IGN = _signal.SIG_IGN
_globals.update(Signals.__members__)
_globals.update(Handlers.__members__)
if 'pthread_sigmask' in _globals:
class Sigmasks(_IntEnum):
SIG_BLOCK = _signal.SIG_BLOCK
SIG_UNBLOCK = _signal.SIG_UNBLOCK
SIG_SETMASK = _signal.SIG_SETMASK
_globals.update(Sigmasks.__members__)
def _int_to_enum(value, enum_klass):
"""Convert a numeric value to an IntEnum member.
If it's not a known member, return the numeric value itself.
"""
try:
return enum_klass(value)
except ValueError:
return value
def _enum_to_int(value):
"""Convert an IntEnum member to a numeric value.
If it's not a IntEnum member return the value itself.
"""
try:
return int(value)
except (ValueError, TypeError):
return value
@_wraps(_signal.signal)
def signal(signalnum, handler):
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
return _int_to_enum(handler, Handlers)
@_wraps(_signal.getsignal)
def getsignal(signalnum):
handler = _signal.getsignal(signalnum)
return _int_to_enum(handler, Handlers)
if 'pthread_sigmask' in _globals:
@_wraps(_signal.pthread_sigmask)
def pthread_sigmask(how, mask):
sigs_set = _signal.pthread_sigmask(how, mask)
return set(_int_to_enum(x, Signals) for x in sigs_set)
pthread_sigmask.__doc__ = _signal.pthread_sigmask.__doc__
@_wraps(_signal.sigpending)
def sigpending():
sigs = _signal.sigpending()
return set(_int_to_enum(x, Signals) for x in sigs)
if 'sigwait' in _globals:
@_wraps(_signal.sigwait)
def sigwait(sigset):
retsig = _signal.sigwait(sigset)
return _int_to_enum(retsig, Signals)
sigwait.__doc__ = _signal.sigwait
del _globals, _wraps

View file

@ -2897,7 +2897,7 @@ def test_CLI(): r"""
def test_main():
# Check the doctest cases in doctest itself:
support.run_doctest(doctest, verbosity=True)
ret = support.run_doctest(doctest, verbosity=True)
# Check the doctest cases defined here:
from test import test_doctest
support.run_doctest(test_doctest, verbosity=True)

View file

@ -1,6 +1,7 @@
import unittest
from test import support
from contextlib import closing
import enum
import gc
import pickle
import select
@ -39,6 +40,22 @@ def ignoring_eintr(__func, *args, **kwargs):
return None
class GenericTests(unittest.TestCase):
def test_enums(self):
for name in dir(signal):
sig = getattr(signal, name)
if name in {'SIG_DFL', 'SIG_IGN'}:
self.assertIsInstance(sig, signal.Handlers)
elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
self.assertIsInstance(sig, signal.Sigmasks)
elif name.startswith('SIG') and not name.startswith('SIG_'):
self.assertIsInstance(sig, signal.Signals)
elif name.startswith('CTRL_'):
self.assertIsInstance(sig, signal.Signals)
self.assertEqual(sys.platform, "win32")
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
@ -195,6 +212,7 @@ def test_setting_signal_handler_to_none_raises_error(self):
def test_getsignal(self):
hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
self.assertIsInstance(hup, signal.Handlers)
self.assertEqual(signal.getsignal(signal.SIGHUP),
self.trivial_signal_handler)
signal.signal(signal.SIGHUP, hup)
@ -271,7 +289,7 @@ def check_signum(signals):
os.close(read)
os.close(write)
""".format(signals, ordered, test_body)
""".format(tuple(map(int, signals)), ordered, test_body)
assert_python_ok('-c', code)
@ -604,6 +622,8 @@ def handler(signum, frame):
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
os.kill(os.getpid(), signum)
pending = signal.sigpending()
for sig in pending:
assert isinstance(sig, signal.Signals), repr(pending)
if pending != {signum}:
raise Exception('%s != {%s}' % (pending, signum))
try:
@ -660,6 +680,7 @@ def wait_helper(self, blocked, test):
code = '''if 1:
import signal
import sys
from signal import Signals
def handler(signum, frame):
1/0
@ -702,6 +723,7 @@ def test_sigwait(self):
def test(signum):
signal.alarm(1)
received = signal.sigwait([signum])
assert isinstance(received, signal.Signals), received
if received != signum:
raise Exception('received %s, not %s' % (received, signum))
''')
@ -842,8 +864,14 @@ def handler(signum, frame):
def kill(signum):
os.kill(os.getpid(), signum)
def check_mask(mask):
for sig in mask:
assert isinstance(sig, signal.Signals), repr(sig)
def read_sigmask():
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
check_mask(sigmask)
return sigmask
signum = signal.SIGUSR1
@ -852,6 +880,7 @@ def read_sigmask():
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler
old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
check_mask(old_mask)
try:
kill(signum)
except ZeroDivisionError:
@ -861,11 +890,13 @@ def read_sigmask():
# Block and then raise SIGUSR1. The signal is blocked: the signal
# handler is not called, and the signal is now pending
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
check_mask(mask)
kill(signum)
# Check the new mask
blocked = read_sigmask()
check_mask(blocked)
if signum not in blocked:
raise Exception("%s not in %s" % (signum, blocked))
if old_mask ^ blocked != {signum}:
@ -928,7 +959,7 @@ def handler(signum, frame):
def test_main():
try:
support.run_unittest(PosixTests, InterProcessSignalTests,
support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests,
WakeupFDTests, WakeupSignalTests,
SiginterruptTest, ItimerTest, WindowsSignalTests,
PendingSignalsTests)

View file

@ -967,7 +967,7 @@ static struct PyModuleDef signalmodule = {
};
PyMODINIT_FUNC
PyInit_signal(void)
PyInit__signal(void)
{
PyObject *m, *d, *x;
int i;
@ -1380,7 +1380,7 @@ PyErr_SetInterrupt(void)
void
PyOS_InitInterrupts(void)
{
PyObject *m = PyImport_ImportModule("signal");
PyObject *m = PyImport_ImportModule("_signal");
if (m) {
Py_DECREF(m);
}

View file

@ -19,7 +19,7 @@ extern PyObject* PyInit_math(void);
extern PyObject* PyInit__md5(void);
extern PyObject* PyInit_nt(void);
extern PyObject* PyInit__operator(void);
extern PyObject* PyInit_signal(void);
extern PyObject* PyInit__signal(void);
extern PyObject* PyInit__sha1(void);
extern PyObject* PyInit__sha256(void);
extern PyObject* PyInit__sha512(void);