Move BatchTester into a separate file.

BUG=
TEST=

Review URL: https://chromereviews.googleplex.com/3521020

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@52 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
zundel@google.com 2011-10-05 16:50:50 +00:00
parent 4b5b7fe381
commit 705be03363
3 changed files with 284 additions and 246 deletions

View file

@ -22,14 +22,15 @@ import threading
import traceback
from Queue import Queue, Empty
import testing
import testing.test_runner
import utils
TIMEOUT_SECS = 60
VERBOSE = False
ARCH_GUESS = utils.GuessArchitecture()
OS_GUESS = utils.GuessOS()
HOST_CPUS = utils.GuessCpus()
USE_DEFAULT_CPUS = -1
BUILT_IN_TESTS = ['dartc', 'vm', 'dart', 'corelib', 'language', 'co19',
'samples', 'isolate', 'stub-generator', 'client']
@ -98,8 +99,8 @@ class ProgressIndicator(object):
self.Starting()
# Scale the number of tasks to the nubmer of CPUs on the machine
if tasks == USE_DEFAULT_CPUS:
tasks = HOST_CPUS
if tasks == testing.USE_DEFAULT_CPUS:
tasks = testing.HOST_CPUS
# TODO(zundel): Refactor BatchSingle method and TestRunner to
# share code and simplify this method.
@ -122,19 +123,20 @@ class ProgressIndicator(object):
for (cmd, queue) in self.batch_queues.items():
if not queue.empty():
batch_len = queue.qsize();
batch_tester = None
try:
batch_tester = BatchTester(queue, tasks, self,
[cmd, '-batch'])
batch_tester = testing.test_runner.BatchRunner(queue, tasks, self,
[cmd, '-batch'])
except Exception, e:
print "Aborting batch test for " + cmd + ". Problem on startup."
batch_tester.Shutdown()
if batch_tester: batch_tester.Shutdown()
raise
try:
batch_tester.WaitForCompletion()
except:
print "Aborting batch cmd " + cmd + "while waiting for completion."
batch_tester.Shutdown()
if batch_tester: batch_tester.Shutdown()
raise
try:
@ -191,220 +193,6 @@ class ProgressIndicator(object):
self.HasRun(output)
class BatchTester(object):
"""Implements communication with a set of subprocesses using threads."""
def __init__(self, work_queue, tasks, progress, batch_cmd):
self.work_queue = work_queue
self.terminate = False
self.progress = progress
self.threads = []
self.runners = {}
self.last_activity = {}
self.context = progress.context
self.shutdown_lock = threading.Lock()
# Scale the number of tasks to the nubmer of CPUs on the machine
# 1:1 is too much of an overload on many machines in batch mode,
# so scale the ratio of threads to CPUs back.
if tasks == USE_DEFAULT_CPUS:
tasks = .75 * HOST_CPUS
# Start threads
for i in xrange(tasks):
thread = threading.Thread(target=self.RunThread, args=[batch_cmd, i])
self.threads.append(thread)
thread.daemon = True
thread.start()
def RunThread(self, batch_cmd, thread_number):
"""A thread started to feed a single TestRunner."""
try:
runner = None
while not self.terminate and not self.work_queue.empty():
runner = subprocess.Popen(batch_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE)
self.runners[thread_number] = runner
self.FeedTestRunner(runner, thread_number)
if self.last_activity.has_key(thread_number):
del self.last_activity[thread_number]
# cleanup
self.EndRunner(runner)
except:
self.Shutdown()
raise
finally:
if self.last_activity.has_key(thread_number):
del self.last_activity[thread_number]
if runner: self.EndRunner(runner)
def EndRunner(self, runner):
""" Cleans up a single runner, killing the child if necessary"""
with self.shutdown_lock:
if runner:
returncode = runner.poll()
if returncode == None:
runner.kill()
for (found_runner, thread_number) in self.runners.items():
if runner == found_runner:
del self.runners[thread_number]
break
try:
runner.communicate();
except ValueError:
pass
def CheckForTimeouts(self):
now = time.time()
for (thread_number, start_time) in self.last_activity.items():
if now - start_time > self.context.timeout:
self.runners[thread_number].kill()
def WaitForCompletion(self):
""" Wait for threads to finish, and monitor test runners for timeouts."""
for t in self.threads:
while True:
self.CheckForTimeouts()
t.join(timeout=5)
if not t.isAlive():
break
def FeedTestRunner(self, runner, thread_number):
"""Feed commands to the fork'ed TestRunner through a Popen object."""
last_case = {}
last_buf = ''
while not self.terminate:
# Is the runner still alive?
returninfo = runner.poll()
if returninfo is not None:
buf = last_buf + '\n' + runner.stdout.read()
if last_case:
self.RecordPassFail(last_case, buf, CRASH)
else:
with self.progress.lock:
print >>sys. stderr, ("%s: runner unexpectedly exited: %d"
% (threading.currentThread().name, returninfo))
print 'Crash Output: '
print
print buf
return
try:
case = self.work_queue.get_nowait()
with self.progress.lock:
self.progress.AboutToRun(case.case)
except Empty:
return
test_case = case.case
cmd = " ".join(test_case.GetCommand()[1:])
try:
print >>runner.stdin, cmd
except IOError:
with self.progress.lock:
traceback.print_exc()
# Child exited before starting the next command.
buf = last_buf + '\n' + runner.stdout.read()
self.RecordPassFail(last_case, buf, CRASH)
# We never got a chance to run this command - queue it back up.
self.work_queue.put(case)
return
buf = ""
self.last_activity[thread_number] = time.time()
while not self.terminate:
line = runner.stdout.readline()
if self.terminate:
break;
case.case.duration = time.time() - self.last_activity[thread_number];
if not line:
# EOF. Child has exited.
if case.case.duration > self.context.timeout:
with self.progress.lock:
print "Child timed out after %d seconds" % self.context.timeout
self.RecordPassFail(case, buf, TIMEOUT)
elif buf:
self.RecordPassFail(case, buf, CRASH)
return
# Look for TestRunner batch status escape sequence. e.g.
# >>> TEST PASS
if line.startswith('>>> '):
result = line.split()
if result[1] == 'TEST':
outcome = result[2].lower()
# Read the rest of the output buffer (possible crash output)
if outcome == CRASH:
buf += runner.stdout.read()
self.RecordPassFail(case, buf, outcome)
# Always handle crashes by restarting the runner.
if outcome == CRASH:
return
break
elif result[1] == 'BATCH':
pass
else:
print 'Unknown cmd from batch runner: %s' % line
else:
buf += line
# If the process crashes before the next command is executed,
# save info to report diagnostics.
last_buf = buf
last_case = case
def RecordPassFail(self, case, stdout_buf, outcome):
"""An unexpected failure occurred."""
if outcome == PASS or outcome == OKAY:
exit_code = 0
elif outcome == CRASH:
exit_code = -1
elif outcome == FAIL or outcome == TIMEOUT:
exit_code = 1
else:
assert false, "Unexpected outcome: %s" % outcome
cmd_output = CommandOutput(0, exit_code,
outcome == TIMEOUT, stdout_buf, "")
test_output = TestOutput(case.case,
case.case.GetCommand(),
cmd_output)
with self.progress.lock:
if test_output.UnexpectedOutput():
self.progress.failed.append(test_output)
else:
self.progress.succeeded += 1
if outcome == CRASH:
self.progress.crashed += 1
self.progress.remaining -= 1
self.progress.HasRun(test_output)
def Shutdown(self):
"""Kill all active runners"""
print "Shutting down remaining runners"
self.terminate = True;
for runner in self.runners.values():
runner.kill()
# Give threads a chance to exit gracefully
time.sleep(2)
for runner in self.runners.values():
self.EndRunner(runner)
def EscapeCommand(command):
parts = []
for part in command:
@ -679,13 +467,13 @@ class TestOutput(object):
def UnexpectedOutput(self):
if self.HasCrashed():
outcome = CRASH
outcome = testing.CRASH
elif self.HasTimedOut():
outcome = TIMEOUT
elif self.HasFailed():
outcome = FAIL
outcome = testing.FAIL
else:
outcome = PASS
outcome = testing.PASS
return not outcome in self.test.outcomes
def HasCrashed(self):
@ -961,16 +749,6 @@ def RunTestCases(cases_to_run, progress, tasks, context):
# --- T e s t C o n f i g u r a t i o n ---
# -------------------------------------------
SKIP = 'skip'
FAIL = 'fail'
PASS = 'pass'
OKAY = 'okay'
TIMEOUT = 'timeout'
CRASH = 'crash'
SLOW = 'slow'
class Expression(object):
pass
@ -1289,7 +1067,7 @@ class Configuration(object):
outcomes = outcomes.union(rule.GetOutcomes(env, self.defs))
unused_rules.discard(rule)
if not outcomes:
outcomes = [PASS]
outcomes = [testing.PASS]
case.outcomes = outcomes
all_outcomes = all_outcomes.union(outcomes)
result.append(ClassifiedTest(case, outcomes))
@ -1432,8 +1210,8 @@ def BuildOptions():
action="store_true")
result.add_option("-j", "--tasks",
help="The number of parallel tasks to run",
metavar=HOST_CPUS,
default=USE_DEFAULT_CPUS,
metavar=testing.HOST_CPUS,
default=testing.USE_DEFAULT_CPUS,
type="int")
result.add_option("--time",
help="Print timing information after running",
@ -1527,18 +1305,19 @@ Total: %(total)i tests
def PrintReport(cases):
"""Print a breakdown of which tests are marked pass/skip/fail """
def IsFlaky(o):
return (PASS in o) and (FAIL in o) and (not CRASH in o) and (not OKAY in o)
return ((testing.PASS in o) and (testing.FAIL in o)
and (not testing.CRASH in o) and (not testing.OKAY in o))
def IsFailOk(o):
return (len(o) == 2) and (FAIL in o) and (OKAY in o)
unskipped = [c for c in cases if not SKIP in c.outcomes]
return (len(o) == 2) and (testing.FAIL in o) and (testing.OKAY in o)
unskipped = [c for c in cases if not testing.SKIP in c.outcomes]
print REPORT_TEMPLATE % {
'total': len(cases),
'skipped': len(cases) - len(unskipped),
'nocrash': len([t for t in unskipped if IsFlaky(t.outcomes)]),
'pass': len([t for t in unskipped if list(t.outcomes) == [PASS]]),
'pass': len([t for t in unskipped if list(t.outcomes) == [testing.PASS]]),
'fail_ok': len([t for t in unskipped if IsFailOk(t.outcomes)]),
'fail': len([t for t in unskipped if list(t.outcomes) == [FAIL]]),
'crash': len([t for t in unskipped if list(t.outcomes) == [CRASH]]),
'fail': len([t for t in unskipped if list(t.outcomes) == [testing.FAIL]]),
'crash': len([t for t in unskipped if list(t.outcomes) == [testing.CRASH]]),
'batched' : len([t for t in unskipped if t.case.IsBatchable()])
}
@ -1553,7 +1332,7 @@ def PrintTests(cases):
has_errors = True
if has_errors:
raise Exception('Errors in above files')
for case in [c for c in cases if not SKIP in c.outcomes]:
for case in [c for c in cases if not testing.SKIP in c.outcomes]:
print "%s\t%s\t%s\t%s" %('/'.join(case.case.path),
','.join(case.outcomes),
case.case.IsNegative(),
@ -1688,7 +1467,7 @@ def Main():
result = None
def DoSkip(case):
return SKIP in case.outcomes or SLOW in case.outcomes
return testing.SKIP in case.outcomes or testing.SLOW in case.outcomes
cases_to_run = [ c for c in all_cases if not DoSkip(c) ]
if len(cases_to_run) == 0:
print "No tests to run."

View file

@ -1,3 +1,20 @@
# Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
# for details. All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
import test_runner
import utils
# Constants used for test outcomes
SKIP = 'skip'
FAIL = 'fail'
PASS = 'pass'
OKAY = 'okay'
TIMEOUT = 'timeout'
CRASH = 'crash'
SLOW = 'slow'
HOST_CPUS = utils.GuessCpus()
USE_DEFAULT_CPUS = -1

242
tools/testing/test_runner.py Executable file
View file

@ -0,0 +1,242 @@
#!/usr/bin/env python
#
# Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
# for details. All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
#
import os
import subprocess
import sys
import tempfile
import time
import threading
import traceback
import Queue
import test
import testing
import utils
class Error(Exception):
pass
class TestRunner(object):
"""Base class for runners """
def __init__(self, work_queue, tasks, progress):
self.work_queue = work_queue
self.tasks = tasks
self.terminate = False
self.progress = progress
self.threads = []
self.shutdown_lock = threading.Lock()
class BatchRunner(TestRunner):
"""Implements communication with a set of subprocesses using threads."""
def __init__(self, work_queue, tasks, progress, batch_cmd):
super(BatchRunner, self).__init__(work_queue, tasks, progress)
self.runners = {}
self.last_activity = {}
self.context = progress.context
# Scale the number of tasks to the nubmer of CPUs on the machine
# 1:1 is too much of an overload on many machines in batch mode,
# so scale the ratio of threads to CPUs back.
if tasks == testing.USE_DEFAULT_CPUS:
tasks = .75 * testing.HOST_CPUS
# Start threads
for i in xrange(tasks):
thread = threading.Thread(target=self.RunThread, args=[batch_cmd, i])
self.threads.append(thread)
thread.daemon = True
thread.start()
def RunThread(self, batch_cmd, thread_number):
"""A thread started to feed a single TestRunner."""
try:
runner = None
while not self.terminate and not self.work_queue.empty():
runner = subprocess.Popen(batch_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE)
self.runners[thread_number] = runner
self.FeedTestRunner(runner, thread_number)
if self.last_activity.has_key(thread_number):
del self.last_activity[thread_number]
# cleanup
self.EndRunner(runner)
except:
self.Shutdown()
raise
finally:
if self.last_activity.has_key(thread_number):
del self.last_activity[thread_number]
if runner: self.EndRunner(runner)
def EndRunner(self, runner):
""" Cleans up a single runner, killing the child if necessary"""
with self.shutdown_lock:
if runner:
returncode = runner.poll()
if returncode == None:
runner.kill()
for (found_runner, thread_number) in self.runners.items():
if runner == found_runner:
del self.runners[thread_number]
break
try:
runner.communicate();
except ValueError:
pass
def CheckForTimeouts(self):
now = time.time()
for (thread_number, start_time) in self.last_activity.items():
if now - start_time > self.context.timeout:
self.runners[thread_number].kill()
def WaitForCompletion(self):
""" Wait for threads to finish, and monitor test runners for timeouts."""
for t in self.threads:
while True:
self.CheckForTimeouts()
t.join(timeout=5)
if not t.isAlive():
break
def FeedTestRunner(self, runner, thread_number):
"""Feed commands to the fork'ed TestRunner through a Popen object."""
last_case = {}
last_buf = ''
while not self.terminate:
# Is the runner still alive?
returninfo = runner.poll()
if returninfo is not None:
buf = last_buf + '\n' + runner.stdout.read()
if last_case:
self.RecordPassFail(last_case, buf, testing.CRASH)
else:
with self.progress.lock:
print >>sys. stderr, ("%s: runner unexpectedly exited: %d"
% (threading.currentThread().name, returninfo))
print 'Crash Output: '
print
print buf
return
try:
case = self.work_queue.get_nowait()
with self.progress.lock:
self.progress.AboutToRun(case.case)
except Queue.Empty:
return
test_case = case.case
cmd = " ".join(test_case.GetCommand()[1:])
try:
print >>runner.stdin, cmd
except IOError:
with self.progress.lock:
traceback.print_exc()
# Child exited before starting the next command.
buf = last_buf + '\n' + runner.stdout.read()
self.RecordPassFail(last_case, buf, testing.CRASH)
# We never got a chance to run this command - queue it back up.
self.work_queue.put(case)
return
buf = ""
self.last_activity[thread_number] = time.time()
while not self.terminate:
line = runner.stdout.readline()
if self.terminate:
break;
case.case.duration = time.time() - self.last_activity[thread_number];
if not line:
# EOF. Child has exited.
if case.case.duration > self.context.timeout:
with self.progress.lock:
print "Child timed out after %d seconds" % self.context.timeout
self.RecordPassFail(case, buf, testing.TIMEOUT)
elif buf:
self.RecordPassFail(case, buf, testing.CRASH)
return
# Look for TestRunner batch status escape sequence. e.g.
# >>> TEST PASS
if line.startswith('>>> '):
result = line.split()
if result[1] == 'TEST':
outcome = result[2].lower()
# Read the rest of the output buffer (possible crash output)
if outcome == testing.CRASH:
buf += runner.stdout.read()
self.RecordPassFail(case, buf, outcome)
# Always handle crashes by restarting the runner.
if outcome == testing.CRASH:
return
break
elif result[1] == 'BATCH':
pass
else:
print 'Unknown cmd from batch runner: %s' % line
else:
buf += line
# If the process crashes before the next command is executed,
# save info to report diagnostics.
last_buf = buf
last_case = case
def RecordPassFail(self, case, stdout_buf, outcome):
"""An unexpected failure occurred."""
if outcome == testing.PASS or outcome == testing.OKAY:
exit_code = 0
elif outcome == testing.CRASH:
exit_code = -1
elif outcome == testing.FAIL or outcome == testing.TIMEOUT:
exit_code = 1
else:
assert false, "Unexpected outcome: %s" % outcome
cmd_output = test.CommandOutput(0, exit_code,
outcome == testing.TIMEOUT, stdout_buf, "")
test_output = test.TestOutput(case.case,
case.case.GetCommand(),
cmd_output)
with self.progress.lock:
if test_output.UnexpectedOutput():
self.progress.failed.append(test_output)
else:
self.progress.succeeded += 1
if outcome == testing.CRASH:
self.progress.crashed += 1
self.progress.remaining -= 1
self.progress.HasRun(test_output)
def Shutdown(self):
"""Kill all active runners"""
print "Shutting down remaining runners"
self.terminate = True;
for runner in self.runners.values():
runner.kill()
# Give threads a chance to exit gracefully
time.sleep(2)
for runner in self.runners.values():
self.EndRunner(runner)