1
0
mirror of https://github.com/RPCS3/llvm-mirror.git synced 2024-10-19 19:12:56 +02:00

[lit] Add support for multiprocessing, under --use-processes for now.

llvm-svn: 189556
This commit is contained in:
Daniel Dunbar 2013-08-29 00:54:23 +00:00
parent 9d49ec468c
commit ee2ab42c2f
3 changed files with 115 additions and 39 deletions

View File

@ -113,8 +113,8 @@ Infrastructure
module. This is currently blocked on: module. This is currently blocked on:
* The external execution mode is faster in some situations, because it avoids * The external execution mode is faster in some situations, because it avoids
being bottlenecked on the GIL. We could fix this by moving to a good being bottlenecked on the GIL. This can hopefully be obviated simply by
multiprocessing model. using --use-processes.
* Some tests in LLVM/Clang are explicitly disabled with the internal shell * Some tests in LLVM/Clang are explicitly disabled with the internal shell
(because they use features specific to bash). We would need to rewrite these (because they use features specific to bash). We would need to rewrite these
@ -158,8 +158,6 @@ Miscellaneous
* Add --show-unsupported, don't show by default? * Add --show-unsupported, don't show by default?
* Optionally use multiprocessing.
* Support valgrind in all configs, and LLVM style valgrind. * Support valgrind in all configs, and LLVM style valgrind.
* Support a timeout / ulimit. * Support a timeout / ulimit.

View File

@ -142,6 +142,12 @@ def main(builtinParameters = {}):
group.add_option("", "--show-tests", dest="showTests", group.add_option("", "--show-tests", dest="showTests",
help="Show all discovered tests", help="Show all discovered tests",
action="store_true", default=False) action="store_true", default=False)
group.add_option("", "--use-processes", dest="useProcesses",
help="Run tests in parallel with processes (not threads)",
action="store_true", default=False)
group.add_option("", "--use-threads", dest="useProcesses",
help="Run tests in parallel with threads (not processes)",
action="store_false", default=False)
parser.add_option_group(group) parser.add_option_group(group)
(opts, args) = parser.parse_args() (opts, args) = parser.parse_args()
@ -264,7 +270,8 @@ def main(builtinParameters = {}):
startTime = time.time() startTime = time.time()
display = TestingProgressDisplay(opts, len(run.tests), progressBar) display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try: try:
run.execute_tests(display, opts.numThreads, opts.maxTime) run.execute_tests(display, opts.numThreads, opts.maxTime,
opts.useProcesses)
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(2) sys.exit(2)
display.finish() display.finish()

View File

@ -2,42 +2,68 @@ import os
import threading import threading
import time import time
import traceback import traceback
try:
import Queue as queue
except ImportError:
import queue
try: try:
import win32api import win32api
except ImportError: except ImportError:
win32api = None win32api = None
try:
import multiprocessing
except ImportError:
multiprocessing = None
import lit.Test import lit.Test
### ###
# Test Execution Implementation # Test Execution Implementation
class TestProvider(object): class LockedValue(object):
def __init__(self, tests): def __init__(self, value):
self.iter = iter(range(len(tests)))
self.lock = threading.Lock() self.lock = threading.Lock()
self.canceled = False self._value = value
def _get_value(self):
self.lock.acquire()
try:
return self._value
finally:
self.lock.release()
def _set_value(self, value):
self.lock.acquire()
try:
self._value = value
finally:
self.lock.release()
value = property(_get_value, _set_value)
class TestProvider(object):
def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
self.canceled_flag = canceled_flag
# Create a shared queue to provide the test indices.
self.queue = queue_impl()
for i in range(len(tests)):
self.queue.put(i)
for i in range(num_jobs):
self.queue.put(None)
def cancel(self): def cancel(self):
self.lock.acquire() self.canceled_flag.value = 1
self.canceled = True
self.lock.release()
def get(self): def get(self):
# Check if we are cancelled. # Check if we are canceled.
self.lock.acquire() if self.canceled_flag.value:
if self.canceled:
self.lock.release()
return None return None
# Otherwise take the next test. # Otherwise take the next test.
for item in self.iter: return self.queue.get()
break
else:
item = None
self.lock.release()
return item
class Tester(object): class Tester(object):
def __init__(self, run_instance, provider, consumer): def __init__(self, run_instance, provider, consumer):
@ -46,7 +72,7 @@ class Tester(object):
self.consumer = consumer self.consumer = consumer
def run(self): def run(self):
while 1: while True:
item = self.provider.get() item = self.provider.get()
if item is None: if item is None:
break break
@ -82,6 +108,42 @@ class ThreadResultsConsumer(object):
def handle_results(self): def handle_results(self):
pass pass
class MultiprocessResultsConsumer(object):
def __init__(self, run, display, num_jobs):
self.run = run
self.display = display
self.num_jobs = num_jobs
self.queue = multiprocessing.Queue()
def update(self, test_index, test):
# This method is called in the child processes, and communicates the
# results to the actual display implementation via an output queue.
self.queue.put((test_index, test.result))
def task_finished(self):
# This method is called in the child processes, and communicates that
# individual tasks are complete.
self.queue.put(None)
def handle_results(self):
# This method is called in the parent, and consumes the results from the
# output queue and dispatches to the actual display. The method will
# complete after each of num_jobs tasks has signalled completion.
completed = 0
while completed != self.num_jobs:
# Wait for a result item.
item = self.queue.get()
if item is None:
completed += 1
continue
# Update the test result in the parent process.
index,result = item
test = self.run.tests[index]
test.result = result
self.display.update(test)
def run_one_tester(run, provider, display): def run_one_tester(run, provider, display):
tester = Tester(run, provider, display) tester = Tester(run, provider, display)
tester.run() tester.run()
@ -123,7 +185,8 @@ class Run(object):
test.setResult(result) test.setResult(result)
def execute_tests(self, display, jobs, max_time=None): def execute_tests(self, display, jobs, max_time=None,
use_processes=False):
""" """
execute_tests(display, jobs, [max_time]) execute_tests(display, jobs, [max_time])
@ -145,8 +208,20 @@ class Run(object):
be given an UNRESOLVED result. be given an UNRESOLVED result.
""" """
# Create the test provider object. # Choose the appropriate parallel execution implementation.
provider = TestProvider(self.tests) if jobs == 1 or not use_processes or multiprocessing is None:
task_impl = threading.Thread
queue_impl = queue.Queue
canceled_flag = LockedValue(0)
consumer = ThreadResultsConsumer(display)
else:
task_impl = multiprocessing.Process
queue_impl = multiprocessing.Queue
canceled_flag = multiprocessing.Value('i', 0)
consumer = MultiprocessResultsConsumer(self, display, jobs)
# Create the test provider.
provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
# Install a console-control signal handler on Windows. # Install a console-control signal handler on Windows.
if win32api is not None: if win32api is not None:
@ -162,8 +237,12 @@ class Run(object):
timeout_timer = threading.Timer(max_time, timeout_handler) timeout_timer = threading.Timer(max_time, timeout_handler)
timeout_timer.start() timeout_timer.start()
# Actually execute the tests. # If not using multiple tasks, just run the tests directly.
self._execute_tests_with_provider(provider, display, jobs) if jobs == 1:
run_one_tester(self, provider, consumer)
else:
# Otherwise, execute the tests in parallel
self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
# Cancel the timeout handler. # Cancel the timeout handler.
if max_time is not None: if max_time is not None:
@ -174,18 +253,10 @@ class Run(object):
if test.result is None: if test.result is None:
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
def _execute_tests_with_provider(self, provider, display, jobs): def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
consumer = ThreadResultsConsumer(display)
# If only using one testing thread, don't use tasks at all; this lets us
# profile, among other things.
if jobs == 1:
run_one_tester(self, provider, consumer)
return
# Start all of the tasks. # Start all of the tasks.
tasks = [threading.Thread(target=run_one_tester, tasks = [task_impl(target=run_one_tester,
args=(self, provider, consumer)) args=(self, provider, consumer))
for i in range(jobs)] for i in range(jobs)]
for t in tasks: for t in tasks:
t.start() t.start()