Merged revisions 80552-80556,80564-80566,80568-80571 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r80552 | victor.stinner | 2010-04-27 23:46:03 +0200 (mar., 27 avril 2010) | 3 lines

  Issue #7449, part 1: fix test_support.py for Python compiled without thread
........
  r80553 | victor.stinner | 2010-04-27 23:47:01 +0200 (mar., 27 avril 2010) | 1 line

  Issue #7449, part 2: regrtest.py -j option requires thread support
........
  r80554 | victor.stinner | 2010-04-27 23:51:26 +0200 (mar., 27 avril 2010) | 9 lines

  Issue #7449 part 3, test_doctest: import trace module in test_coverage()

  Import trace module fail if the threading module is missing. test_coverage() is
  only used if test_doctest.py is used with the -c option. This commit allows to
  execute the test suite without thread support.

  Move "import trace" in test_coverage() and use
  test_support.import_module('trace').
........
  r80555 | victor.stinner | 2010-04-27 23:56:26 +0200 (mar., 27 avril 2010) | 6 lines

  Issue #7449, part 4: skip test_multiprocessing if thread support is disabled

  import threading after _multiprocessing to raise a more revelant error message:
  "No module named _multiprocessing". _multiprocessing is not compiled without
  thread support.
........
  r80556 | victor.stinner | 2010-04-28 00:01:24 +0200 (mer., 28 avril 2010) | 8 lines

  Issue #7449, part 5: split Test.test_open() of ctypes/test/test_errno.py

   * Split Test.test_open() in 2 functions: test_open() and test_thread_open()
   * Skip test_open() and test_thread_open() if we are unable to find the C
     library
   * Skip test_thread_open() if thread support is disabled
   * Use unittest.skipUnless(os.name == "nt", ...) on test_GetLastError()
........
  r80564 | victor.stinner | 2010-04-28 00:59:35 +0200 (mer., 28 avril 2010) | 4 lines

  Issue #7449, part 6: fix test_hashlib for missing threading module

  Move @test_support.reap_thread decorator from test_main() to test_threaded_hashing().
........
  r80565 | victor.stinner | 2010-04-28 01:01:29 +0200 (mer., 28 avril 2010) | 6 lines

  Issue #7449, part 7: simplify threading detection in test_capi

   * Skip TestPendingCalls if threading module is missing
   * Test if threading module is present or not, instead of test the presence of
     _testcapi._test_thread_state
........
  r80566 | victor.stinner | 2010-04-28 01:03:16 +0200 (mer., 28 avril 2010) | 4 lines

  Issue #7449, part 8: don't skip the whole test_asynchat if threading is missing

  TestFifo can be executed without the threading module
........
  r80568 | victor.stinner | 2010-04-28 01:14:58 +0200 (mer., 28 avril 2010) | 6 lines

  Issue #7449, part 9: fix test_xmlrpclib for missing threading module

   * Skip testcases using threads if threading module is missing
   * Use "http://" instead of URL in ServerProxyTestCase if threading is missing
     because URL is not set in this case
........
  r80569 | victor.stinner | 2010-04-28 01:33:58 +0200 (mer., 28 avril 2010) | 6 lines

  Partial revert of r80556 (Issue #7449, part 5, fix ctypes test)

  Rewrite r80556: the thread test have to be executed just after the test on
  libc_open() and so the test cannot be splitted in two functions (without
  duplicating code, and I don't want to duplicate code).
........
  r80570 | victor.stinner | 2010-04-28 01:51:16 +0200 (mer., 28 avril 2010) | 8 lines

  Issue #7449, part 10: test_cmd imports trace module using test_support.import_module()

  Use test_support.import_module() instead of import to raise a SkipTest
  exception if the import fail. Import trace fails if the threading module is
  missing.

  See also part 3: test_doctest: import trace module in test_coverage().
........
  r80571 | victor.stinner | 2010-04-28 01:55:59 +0200 (mer., 28 avril 2010) | 6 lines

  Issue #7449, last part (11): fix many tests if thread support is disabled

   * Use try/except ImportError or test_support.import_module() to import thread
     and threading modules
   * Add @unittest.skipUnless(threading, ...) to testcases using threads
........
This commit is contained in:
Victor Stinner 2010-04-28 22:31:17 +00:00
parent 480a124973
commit 45df820591
36 changed files with 282 additions and 175 deletions

View file

@ -1,27 +1,31 @@
import unittest, os, errno
from ctypes import *
from ctypes.util import find_library
import threading
try:
import threading
except ImportError:
threading = None
class Test(unittest.TestCase):
def test_open(self):
libc_name = find_library("c")
if libc_name is not None:
libc = CDLL(libc_name, use_errno=True)
if os.name == "nt":
libc_open = libc._open
else:
libc_open = libc.open
if libc_name is None:
raise unittest.SkipTest("Unable to find C library")
libc = CDLL(libc_name, use_errno=True)
if os.name == "nt":
libc_open = libc._open
else:
libc_open = libc.open
libc_open.argtypes = c_char_p, c_int
libc_open.argtypes = c_char_p, c_int
self.assertEqual(libc_open("", 0), -1)
self.assertEqual(get_errno(), errno.ENOENT)
self.assertEqual(set_errno(32), errno.ENOENT)
self.assertEqual(get_errno(), 32)
self.assertEqual(libc_open("", 0), -1)
self.assertEqual(get_errno(), errno.ENOENT)
self.assertEqual(set_errno(32), errno.ENOENT)
self.assertEqual(get_errno(), 32)
if threading:
def _worker():
set_errno(0)
@ -41,36 +45,35 @@ def _worker():
self.assertEqual(get_errno(), 32)
set_errno(0)
if os.name == "nt":
@unittest.skipUnless(os.name == "nt", 'Test specific to Windows')
def test_GetLastError(self):
dll = WinDLL("kernel32", use_last_error=True)
GetModuleHandle = dll.GetModuleHandleA
GetModuleHandle.argtypes = [c_wchar_p]
def test_GetLastError(self):
dll = WinDLL("kernel32", use_last_error=True)
GetModuleHandle = dll.GetModuleHandleA
GetModuleHandle.argtypes = [c_wchar_p]
self.assertEqual(0, GetModuleHandle("foo"))
self.assertEqual(get_last_error(), 126)
self.assertEqual(0, GetModuleHandle("foo"))
self.assertEqual(get_last_error(), 126)
self.assertEqual(set_last_error(32), 126)
self.assertEqual(get_last_error(), 32)
def _worker():
set_last_error(0)
dll = WinDLL("kernel32", use_last_error=False)
GetModuleHandle = dll.GetModuleHandleW
GetModuleHandle.argtypes = [c_wchar_p]
GetModuleHandle("bar")
self.assertEqual(get_last_error(), 0)
t = threading.Thread(target=_worker)
t.start()
t.join()
self.assertEqual(get_last_error(), 32)
self.assertEqual(set_last_error(32), 126)
self.assertEqual(get_last_error(), 32)
def _worker():
set_last_error(0)
dll = WinDLL("kernel32", use_last_error=False)
GetModuleHandle = dll.GetModuleHandleW
GetModuleHandle.argtypes = [c_wchar_p]
GetModuleHandle("bar")
self.assertEqual(get_last_error(), 0)
t = threading.Thread(target=_worker)
t.start()
t.join()
self.assertEqual(get_last_error(), 32)
set_last_error(0)
if __name__ == "__main__":
unittest.main()

View file

@ -22,8 +22,11 @@
# 3. This notice may not be removed or altered from any source distribution.
import unittest
import threading
import sqlite3 as sqlite
try:
import threading
except ImportError:
threading = None
class ModuleTests(unittest.TestCase):
def CheckAPILevel(self):
@ -460,6 +463,7 @@ class Foo: pass
except TypeError:
pass
@unittest.skipUnless(threading, 'This test requires threading.')
class ThreadTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")

View file

@ -1,6 +1,6 @@
"""This test case provides support for checking forking and wait behavior.
To test different wait behavior, overrise the wait_impl method.
To test different wait behavior, override the wait_impl method.
We want fork1() semantics -- only the forking thread survives in the
child after a fork().
@ -9,7 +9,9 @@
active threads survive in the child after a fork(); this is an error.
"""
import os, sys, time, _thread, unittest
import os, sys, time, unittest
import test.support as support
_thread = support.import_module('_thread')
LONGSLEEP = 2
SHORTSLEEP = 0.5

View file

@ -515,7 +515,11 @@ def test_forever(tests=list(selected)):
tests = iter(selected)
if use_mp:
from threading import Thread
try:
from threading import Thread
except ImportError:
print("Multiprocess option requires thread support")
sys.exit(2)
from queue import Queue
from subprocess import Popen, PIPE
debug_output_pat = re.compile(r"\[\d+ refs\]$")

View file

@ -19,6 +19,10 @@
import re
import imp
import time
try:
import _thread
except ImportError:
_thread = None
__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
@ -1111,12 +1115,14 @@ def modules_cleanup(oldmodules):
# at the end of a test run.
def threading_setup():
import _thread
return _thread._count(),
if _thread:
return _thread._count(),
else:
return 1,
def threading_cleanup(nb_threads):
import _thread
if not _thread:
return
_MAX_COUNT = 10
for count in range(_MAX_COUNT):
n = _thread._count()
@ -1126,6 +1132,13 @@ def threading_cleanup(nb_threads):
# XXX print a warning in case of failure?
def reap_threads(func):
"""Use this function when threads are being used. This will
ensure that the threads are cleaned up even when the test fails.
If threading is unavailable this function does nothing.
"""
if not _thread:
return func
@functools.wraps(func)
def decorator(*args):
key = threading_setup()

View file

@ -5,96 +5,101 @@
# If this fails, the test will be skipped.
thread = support.import_module('_thread')
import asyncore, asynchat, socket, threading, time
import asyncore, asynchat, socket, time
import unittest
import sys
try:
import threading
except ImportError:
threading = None
HOST = support.HOST
SERVER_QUIT = b'QUIT\n'
class echo_server(threading.Thread):
# parameter to determine the number of bytes passed back to the
# client each send
chunk_size = 1
if threading:
class echo_server(threading.Thread):
# parameter to determine the number of bytes passed back to the
# client each send
chunk_size = 1
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.sock)
# This will be set if the client wants us to wait before echoing data
# back.
self.start_resend_event = None
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.sock)
# This will be set if the client wants us to wait before echoing data
# back.
self.start_resend_event = None
def run(self):
self.sock.listen(1)
self.event.set()
conn, client = self.sock.accept()
self.buffer = b""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
self.buffer = self.buffer + data
def run(self):
self.sock.listen(1)
self.event.set()
conn, client = self.sock.accept()
self.buffer = b""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
self.buffer = self.buffer + data
# remove the SERVER_QUIT message
self.buffer = self.buffer.replace(SERVER_QUIT, b'')
# remove the SERVER_QUIT message
self.buffer = self.buffer.replace(SERVER_QUIT, b'')
if self.start_resend_event:
self.start_resend_event.wait()
if self.start_resend_event:
self.start_resend_event.wait()
# re-send entire set of collected data
try:
# this may fail on some tests, such as test_close_when_done, since
# the client closes the channel when it's done sending
while self.buffer:
n = conn.send(self.buffer[:self.chunk_size])
time.sleep(0.001)
self.buffer = self.buffer[n:]
except:
pass
# re-send entire set of collected data
try:
# this may fail on some tests, such as test_close_when_done, since
# the client closes the channel when it's done sending
while self.buffer:
n = conn.send(self.buffer[:self.chunk_size])
time.sleep(0.001)
self.buffer = self.buffer[n:]
except:
pass
conn.close()
self.sock.close()
conn.close()
self.sock.close()
class echo_client(asynchat.async_chat):
class echo_client(asynchat.async_chat):
def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = b""
def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = b""
def handle_connect(self):
pass
def handle_connect(self):
pass
if sys.platform == 'darwin':
# select.poll returns a select.POLLHUP at the end of the tests
# on darwin, so just ignore it
def handle_expt(self):
pass
if sys.platform == 'darwin':
# select.poll returns a select.POLLHUP at the end of the tests
# on darwin, so just ignore it
def handle_expt(self):
pass
def collect_incoming_data(self, data):
self.buffer += data
def collect_incoming_data(self, data):
self.buffer += data
def found_terminator(self):
self.contents.append(self.buffer)
self.buffer = b""
def start_echo_server():
event = threading.Event()
s = echo_server(event)
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
return s, event
def found_terminator(self):
self.contents.append(self.buffer)
self.buffer = b""
def start_echo_server():
event = threading.Event()
s = echo_server(event)
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
return s, event
@unittest.skipUnless(threading, 'Threading required for this test.')
class TestAsynchat(unittest.TestCase):
usepoll = False

View file

@ -3,7 +3,6 @@
import select
import os
import socket
import threading
import sys
import time
@ -12,6 +11,11 @@
from io import BytesIO
from io import StringIO
try:
import threading
except ImportError:
threading = None
HOST = support.HOST
class dummysocket:
@ -320,6 +324,7 @@ def setUp(self):
def tearDown(self):
asyncore.close_all()
@unittest.skipUnless(threading, 'Threading required for this test.')
@support.reap_threads
def test_send(self):
evt = threading.Event()

View file

@ -7,11 +7,14 @@
import os
import subprocess
import sys
import threading
try:
import threading
except ImportError:
threading = None
# Skip tests if the bz2 module doesn't exist.
bz2 = support.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx")
@ -283,6 +286,7 @@ def testContextProtocol(self):
else:
self.fail("1/0 didn't raise an exception")
@unittest.skipUnless(threading, 'Threading required for this test.')
def testThreading(self):
# Using a BZ2File from several threads doesn't deadlock (issue #7205).
data = b"1" * 2**20

View file

@ -6,8 +6,11 @@
import time
import random
import unittest
import threading
from test import support
try:
import threading
except ImportError:
threading = None
import _testcapi
@ -33,6 +36,7 @@ def test_instancemethod(self):
self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
@unittest.skipUnless(threading, 'Threading required for this test.')
class TestPendingCalls(unittest.TestCase):
def pendingcalls_submit(self, l, n):
@ -148,17 +152,10 @@ def callback():
raise support.TestFailed(
"Couldn't find main thread correctly in the list")
try:
_testcapi._test_thread_state
have_thread_state = True
except AttributeError:
have_thread_state = False
if have_thread_state:
if threading:
import _thread
import time
TestThreadState()
import threading
t = threading.Thread(target=TestThreadState)
t.start()
t.join()

View file

@ -7,9 +7,9 @@
import cmd
import sys
import trace
import re
from io import StringIO
from test import support
class samplecmdclass(cmd.Cmd):
"""
@ -169,11 +169,11 @@ def do_exit(self, arg):
return True
def test_main(verbose=None):
from test import support, test_cmd
from test import test_cmd
support.run_doctest(test_cmd, verbose)
def test_coverage(coverdir):
import trace
trace = support.import_module('trace')
tracer=trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix,],
trace=0, count=1)
tracer.run('reload(cmd);test_main()')

View file

@ -3,9 +3,12 @@
import sys
import tempfile
import unittest
import threading
from contextlib import * # Tests __all__
from test import support
try:
import threading
except ImportError:
threading = None
class ContextManagerTestCase(unittest.TestCase):
@ -151,6 +154,7 @@ def testWithOpen(self):
finally:
support.unlink(tfn)
@unittest.skipUnless(threading, 'Threading required for this test.')
class LockContextTestCase(unittest.TestCase):
def boilerPlate(self, lock, locked):

View file

@ -2341,8 +2341,10 @@ def test_main():
from test import test_doctest
support.run_doctest(test_doctest, verbosity=True)
import trace, sys, re, io
import sys, re, io
def test_coverage(coverdir):
trace = support.import_module('trace')
tracer = trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix,],
trace=0, count=1)
tracer.run('test_main()')

View file

@ -2,7 +2,7 @@
import http.client
import sys
from test import support
import threading
threading = support.import_module('threading')
import time
import socket
import unittest

View file

@ -6,10 +6,10 @@
import signal
import sys
import time
import threading
from test.fork_wait import ForkWait
from test.support import run_unittest, reap_children, get_attribute
from test.support import run_unittest, reap_children, get_attribute, import_module
threading = import_module('threading')
# Skip test if fork does not exist.
get_attribute(os, 'fork')

View file

@ -4,7 +4,6 @@
# environment
import ftplib
import threading
import asyncore
import asynchat
import socket
@ -19,6 +18,7 @@
from unittest import TestCase
from test import support
from test.support import HOST
threading = support.import_module('threading')
# the dummy data returned by server over the data channel when
# RETR, LIST and NLST commands are issued

View file

@ -311,10 +311,9 @@ def test_gil(self):
m = hashlib.md5(b'x' * gil_minsize)
self.assertEquals(m.hexdigest(), 'cfb767f225d58469c5de3632a8803958')
@unittest.skipUnless(threading, 'Threading required for this test.')
@support.reap_threads
def test_threaded_hashing(self):
if not threading:
raise unittest.SkipTest('No threading module.')
# Updating the same hash object from several threads at once
# using data chunk sizes containing the same byte sequences.
#
@ -349,7 +348,6 @@ def hash_in_chunks(chunk_size, event):
self.assertEqual(expected_hash, hasher.hexdigest())
@support.reap_threads
def test_main():
support.run_unittest(HashLibTestCase)

View file

@ -15,10 +15,10 @@
import urllib.parse
import http.client
import tempfile
import threading
import unittest
from test import support
threading = support.import_module('threading')
class NoLogRequestHandler:
def log_message(self, *args):

View file

@ -23,7 +23,6 @@
import sys
import time
import array
import threading
import random
import unittest
import weakref
@ -35,6 +34,10 @@
import codecs
import io # C implementation of io
import _pyio as pyio # Python implementation of io
try:
import threading
except ImportError:
threading = None
def _default_chunk_size():
@ -742,6 +745,7 @@ def test_read_all(self):
self.assertEquals(b"abcdefg", bufio.read())
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads(self):
try:
# Write out many bytes with exactly the same number of 0's,
@ -988,6 +992,7 @@ def test_truncate(self):
with self.open(support.TESTFN, "rb", buffering=0) as f:
self.assertEqual(f.read(), b"abc")
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads(self):
try:
# Write out many bytes from many threads and test they were
@ -2080,7 +2085,7 @@ def test_errors_property(self):
with self.open(support.TESTFN, "w", errors="replace") as f:
self.assertEqual(f.errors, "replace")
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()

View file

@ -41,10 +41,13 @@
from test.support import captured_stdout, run_with_locale, run_unittest,\
find_unused_port
import textwrap
import threading
import unittest
import warnings
import weakref
try:
import threading
except ImportError:
threading = None
class BaseTest(unittest.TestCase):
@ -765,6 +768,7 @@ def serve_until_stopped(self):
self.server_close()
@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketHandlerTest(BaseTest):
"""Test for SocketHandler objects."""
@ -1659,6 +1663,7 @@ def test_config12_failure(self):
def test_config13_failure(self):
self.assertRaises(Exception, self.apply_config, self.config13)
@unittest.skipUnless(threading, 'listen() needs threading to work')
def setup_via_listener(self, text):
text = text.encode("utf-8")
port = find_unused_port()

View file

@ -5,7 +5,6 @@
#
import unittest
import threading
import queue as pyqueue
import time
import io
@ -25,6 +24,10 @@
_multiprocessing = test.support.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
test.support.import_module('multiprocessing.synchronize')
# import threading after _multiprocessing to raise a more revelant error
# message: "No module named _multiprocessing". _multiprocessing is not compiled
# without thread support.
import threading
import multiprocessing.dummy
import multiprocessing.connection

View file

@ -4,7 +4,6 @@
# a real test suite
import poplib
import threading
import asyncore
import asynchat
import socket
@ -14,6 +13,7 @@
from unittest import TestCase
from test import support as test_support
threading = test_support.import_module('threading')
HOST = test_support.HOST
PORT = 0

View file

@ -1,10 +1,10 @@
# Some simple queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import queue
import threading
import time
import unittest
from test import support
threading = support.import_module('threading')
QUEUE_SIZE = 5

View file

@ -1,7 +1,6 @@
import asyncore
import email.utils
import socket
import threading
import smtpd
import smtplib
import io
@ -9,9 +8,14 @@
import time
import select
from unittest import TestCase
import unittest
from test import support
try:
import threading
except ImportError:
threading = None
HOST = support.HOST
if sys.platform == 'darwin':
@ -44,7 +48,8 @@ def server(evt, buf, serv):
serv.close()
evt.set()
class GeneralTests(TestCase):
@unittest.skipUnless(threading, 'Threading required for this test.')
class GeneralTests(unittest.TestCase):
def setUp(self):
self._threads = support.threading_setup()
@ -146,7 +151,8 @@ def debugging_server(serv, serv_evt, client_evt):
# test server times out, causing the test to fail.
# Test behavior of smtpd.DebuggingServer
class DebuggingServerTests(TestCase):
@unittest.skipUnless(threading, 'Threading required for this test.')
class DebuggingServerTests(unittest.TestCase):
def setUp(self):
# temporarily replace sys.stdout to capture DebuggingServer output
@ -241,7 +247,7 @@ def testSend(self):
self.assertEqual(self.output.getvalue(), mexpect)
class NonConnectingTests(TestCase):
class NonConnectingTests(unittest.TestCase):
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
@ -262,7 +268,8 @@ def testNonnumericPort(self):
# test response of client to a non-successful HELO message
class BadHELOServerTests(TestCase):
@unittest.skipUnless(threading, 'Threading required for this test.')
class BadHELOServerTests(unittest.TestCase):
def setUp(self):
self.old_stdout = sys.stdout
@ -386,7 +393,8 @@ def add_feature(self, feature):
# Test various SMTP & ESMTP commands/behaviors that require a simulated server
# (i.e., something with more features than DebuggingServer)
class SMTPSimTests(TestCase):
@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPSimTests(unittest.TestCase):
def setUp(self):
self._threads = support.threading_setup()

View file

@ -7,8 +7,6 @@
import io
import socket
import select
import _thread as thread
import threading
import time
import traceback
import queue
@ -21,6 +19,13 @@
HOST = support.HOST
MSG = b'Michael Gilfix was here\n'
try:
import _thread as thread
import threading
except ImportError:
thread = None
threading = None
class SocketTCPTest(unittest.TestCase):
def setUp(self):
@ -560,6 +565,7 @@ def test_sock_ioctl(self):
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):
def __init__(self, methodName='runTest'):
@ -649,6 +655,7 @@ def _testShutdown(self):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicUDPTest(ThreadedUDPSocketTest):
def __init__(self, methodName='runTest'):
@ -677,6 +684,7 @@ def testRecvFromNegative(self):
def _testRecvFromNegative(self):
self.cli.sendto(MSG, 0, (HOST, self.port))
@unittest.skipUnless(thread, 'Threading required for this test.')
class TCPCloserTest(ThreadedTCPSocketTest):
def testClose(self):
@ -696,6 +704,7 @@ def _testClose(self):
self.cli.connect((HOST, self.port))
time.sleep(1.0)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
def __init__(self, methodName='runTest'):
@ -715,6 +724,7 @@ def _testSend(self):
msg = self.cli.recv(1024)
self.assertEqual(msg, MSG)
@unittest.skipUnless(thread, 'Threading required for this test.')
class NonBlockingTCPTests(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
@ -783,6 +793,7 @@ def _testRecv(self):
time.sleep(0.1)
self.cli.send(MSG)
@unittest.skipUnless(thread, 'Threading required for this test.')
class FileObjectClassTestCase(SocketConnectedTest):
"""Unit tests for the object returned by socket.makefile()
@ -1098,6 +1109,7 @@ def testWithoutServer(self):
lambda: socket.create_connection((HOST, port))
)
@unittest.skipUnless(thread, 'Threading required for this test.')
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
@ -1160,6 +1172,7 @@ def _testTimeoutValueNonamed(self):
self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
@unittest.skipUnless(thread, 'Threading required for this test.')
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
@ -1305,6 +1318,7 @@ def testNameOverflow(self):
self.assertRaises(socket.error, s.bind, address)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BufferIOTest(SocketConnectedTest):
"""
Test the buffer versions of socket.recv() and socket.send().

View file

@ -9,12 +9,15 @@
import signal
import socket
import tempfile
import threading
import unittest
import socketserver
import test.support
from test.support import reap_children, reap_threads, verbose
try:
import threading
except ImportError:
threading = None
test.support.requires("network")
@ -119,6 +122,7 @@ def handle(self):
self.assertEquals(server.server_address, server.socket.getsockname())
return server
@unittest.skipUnless(threading, 'Threading required for this test.')
@reap_threads
def run_server(self, svrcls, hdlrbase, testfunc):
server = self.make_server(self.pickaddr(svrcls.address_family),

View file

@ -11,6 +11,10 @@
# strings to intern in test_intern()
numruns = 0
try:
import threading
except ImportError:
threading = None
class SysModuleTest(unittest.TestCase):
@ -158,6 +162,7 @@ def test_setcheckinterval(self):
sys.setcheckinterval(n)
self.assertEquals(sys.getcheckinterval(), n)
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_switchinterval(self):
self.assertRaises(TypeError, sys.setswitchinterval)
self.assertRaises(TypeError, sys.setswitchinterval, "a")

View file

@ -1,12 +1,12 @@
import socket
import select
import threading
import telnetlib
import time
import contextlib
from unittest import TestCase
from test import support
threading = support.import_module('threading')
HOST = support.HOST

View file

@ -2,7 +2,7 @@
import unittest
import random
from test import support
import _thread as thread
thread = support.import_module('_thread')
import time
import sys
import weakref

View file

@ -5,9 +5,9 @@
# complains several times about module random having no attribute
# randrange, and then Python hangs.
import _thread as thread
import unittest
from test.support import verbose, TestFailed
from test.support import verbose, TestFailed, import_module
thread = import_module('_thread')
critical_section = thread.allocate_lock()
done = thread.allocate_lock()

View file

@ -16,11 +16,10 @@
NUM_THREADS = 20
FILES_PER_THREAD = 50
import _thread as thread # If this fails, we can't test this module
import threading
import tempfile
from test.support import threading_setup, threading_cleanup, run_unittest
from test.support import threading_setup, threading_cleanup, run_unittest, import_module
threading = import_module('threading')
import unittest
import io
from traceback import print_exc

View file

@ -5,8 +5,8 @@
import random
import re
import sys
import threading
import _thread
_thread = test.support.import_module('_thread')
threading = test.support.import_module('threading')
import time
import unittest
import weakref

View file

@ -1,7 +1,7 @@
import unittest
from doctest import DocTestSuite
from test import support
import threading
threading = support.import_module('threading')
import weakref
import gc

View file

@ -1,11 +1,11 @@
"""PyUnit testing that threads honor our signal semantics"""
import unittest
import _thread as thread
import signal
import os
import sys
from test.support import run_unittest
from test.support import run_unittest, import_module
thread = import_module('_thread')
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)

View file

@ -1,13 +1,13 @@
#!/usr/bin/env python3
import email
import threading
import urllib.parse
import urllib.request
import http.server
import unittest
import hashlib
from test import support
threading = support.import_module('threading')
# Loopback http server infrastructure

View file

@ -5,7 +5,6 @@
import unittest
import xmlrpc.client as xmlrpclib
import xmlrpc.server
import threading
import http.client
import socket
import os
@ -14,6 +13,11 @@
import contextlib
from test import support
try:
import threading
except ImportError:
threading = None
alist = [{'astring': 'foo@bar.baz.spam',
'afloat': 7283.43,
'anint': 2**20,
@ -399,10 +403,12 @@ def make_request_and_skip(self):
return make_request_and_skip
return decorator
@unittest.skipUnless(threading, 'Threading required for this test.')
class BaseServerTestCase(unittest.TestCase):
requestHandler = None
request_count = 1
threadFunc = staticmethod(http_server)
def setUp(self):
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
@ -678,6 +684,9 @@ def send_content(self, connection, body):
connection.putheader("Content-Encoding", "gzip")
return xmlrpclib.Transport.send_content(self, connection, body)
def setUp(self):
BaseServerTestCase.setUp(self)
def test_gzip_request(self):
t = self.Transport()
t.encode_threshold = None
@ -714,13 +723,23 @@ def test_gsip_response(self):
#Test special attributes of the ServerProxy object
class ServerProxyTestCase(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
if threading:
self.url = URL
else:
# Without threading, http_server() and http_multi_server() will not
# be executed and URL is still equal to None. 'http://' is a just
# enough to choose the scheme (HTTP)
self.url = 'http://'
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
p = xmlrpclib.ServerProxy(self.url)
self.assertEqual(p('close')(), None)
def test_transport(self):
t = xmlrpclib.Transport()
p = xmlrpclib.ServerProxy(URL, transport=t)
p = xmlrpclib.ServerProxy(self.url, transport=t)
self.assertEqual(p('transport'), t)
# This is a contrived way to make a failure occur on the server side
@ -733,6 +752,7 @@ def get(self, key, failobj=None):
return super().get(key, failobj)
@unittest.skipUnless(threading, 'Threading required for this test.')
class FailingServerTestCase(unittest.TestCase):
def setUp(self):
self.evt = threading.Event()

View file

@ -1183,6 +1183,9 @@ Documentation
Tests
-----
- Issue #7449: Fix many tests to support Python compiled without thread
support. Patches written by Jerry Seutter.
- Issue #8108: test_ftplib's non-blocking SSL server now has proper handling
of SSL shutdowns.