Test reader thread hardening

The test reader thread could get stuck in case an unexpected Exception
was raised during test runs in parent thread. Catch the Exception, close
the thread and raise the Exception to avoid this.

Change-Id: I3a9298d593c334d96d04b6207d604b684572a2ac
Signed-off-by: juraj.linkes <juraj.linkes@pantheon.tech>
This commit is contained in:
juraj.linkes
2018-11-29 09:56:35 +01:00
committed by Ole Trøan
parent f3b7a5e47a
commit e6b58cf86a

View File

@ -210,13 +210,12 @@ class TestCaseWrapper(object):
def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
read_testcases):
read_testcase = None
while read_testcases.is_set() or len(unread_testcases) > 0:
if not read_testcase:
if len(finished_unread_testcases) > 0:
read_testcase = finished_unread_testcases.pop()
unread_testcases.remove(read_testcase)
elif len(unread_testcases) > 0:
read_testcase = unread_testcases.pop()
while read_testcases.is_set() or len(unread_testcases):
if len(finished_unread_testcases):
read_testcase = finished_unread_testcases.pop()
unread_testcases.remove(read_testcase)
elif len(unread_testcases):
read_testcase = unread_testcases.pop()
if read_testcase:
data = ''
while data is not None:
@ -328,103 +327,112 @@ def run_forked(testcase_suites):
failed_wrapped_testcases = set()
stop_run = False
while len(wrapped_testcase_suites) > 0:
finished_testcase_suites = set()
try:
while len(wrapped_testcase_suites) > 0:
finished_testcase_suites = set()
for wrapped_testcase_suite in wrapped_testcase_suites:
while wrapped_testcase_suite.result_parent_end.poll():
wrapped_testcase_suite.result.process_result(
*wrapped_testcase_suite.result_parent_end.recv())
wrapped_testcase_suite.last_heard = time.time()
while wrapped_testcase_suite.keep_alive_parent_end.poll():
wrapped_testcase_suite.last_test, \
wrapped_testcase_suite.last_test_vpp_binary, \
wrapped_testcase_suite.last_test_temp_dir, \
wrapped_testcase_suite.vpp_pid = \
wrapped_testcase_suite.keep_alive_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
if wrapped_testcase_suite.finished_parent_end.poll():
wrapped_testcase_suite.finished_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
stop_run = process_finished_testsuite(
wrapped_testcase_suite,
finished_testcase_suites,
failed_wrapped_testcases,
results) or stop_run
continue
fail = False
if wrapped_testcase_suite.last_heard + test_timeout < \
time.time():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process timed out "
"(last test running was `%s' in `%s')!" %
(wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
elif not wrapped_testcase_suite.child.is_alive():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process unexpectedly died "
"(last test running was `%s' in `%s')!" %
(wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
elif wrapped_testcase_suite.last_test_temp_dir and \
wrapped_testcase_suite.last_test_vpp_binary:
if is_core_present(
wrapped_testcase_suite.last_test_temp_dir):
wrapped_testcase_suite.add_testclass_with_core()
if wrapped_testcase_suite.core_detected_at is None:
wrapped_testcase_suite.core_detected_at = \
time.time()
elif wrapped_testcase_suite.core_detected_at + \
core_timeout < time.time():
wrapped_testcase_suite.logger.critical(
"Child test runner process unresponsive and "
"core-file exists in test temporary directory "
"(last test running was `%s' in `%s')!" %
(wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
fail = True
if fail:
wrapped_testcase_suite.child.terminate()
try:
# terminating the child process tends to leave orphan
# VPP process around
if wrapped_testcase_suite.vpp_pid:
os.kill(wrapped_testcase_suite.vpp_pid,
signal.SIGTERM)
except OSError:
# already dead
pass
wrapped_testcase_suite.result.crashed = True
wrapped_testcase_suite.result.process_result(
wrapped_testcase_suite.last_test_id, ERROR)
stop_run = process_finished_testsuite(
wrapped_testcase_suite,
finished_testcase_suites,
failed_wrapped_testcases,
results) or stop_run
for finished_testcase in finished_testcase_suites:
finished_testcase.child.join()
finished_testcase.close_pipes()
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
finished_testcase.stdouterr_queue.put(None)
if stop_run:
while len(testcase_suites) > 0:
results.append(TestResult(testcase_suites.pop(0)))
elif len(testcase_suites) > 0:
new_testcase = TestCaseWrapper(testcase_suites.pop(0),
manager)
wrapped_testcase_suites.add(new_testcase)
unread_testcases.add(new_testcase)
except Exception:
for wrapped_testcase_suite in wrapped_testcase_suites:
while wrapped_testcase_suite.result_parent_end.poll():
wrapped_testcase_suite.result.process_result(
*wrapped_testcase_suite.result_parent_end.recv())
wrapped_testcase_suite.last_heard = time.time()
wrapped_testcase_suite.child.terminate()
wrapped_testcase_suite.stdouterr_queue.put(None)
raise
finally:
read_from_testcases.clear()
stdouterr_thread.join(test_timeout)
manager.shutdown()
while wrapped_testcase_suite.keep_alive_parent_end.poll():
wrapped_testcase_suite.last_test, \
wrapped_testcase_suite.last_test_vpp_binary, \
wrapped_testcase_suite.last_test_temp_dir, \
wrapped_testcase_suite.vpp_pid = \
wrapped_testcase_suite.keep_alive_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
if wrapped_testcase_suite.finished_parent_end.poll():
wrapped_testcase_suite.finished_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
stop_run = process_finished_testsuite(
wrapped_testcase_suite,
finished_testcase_suites,
failed_wrapped_testcases,
results) or stop_run
continue
fail = False
if wrapped_testcase_suite.last_heard + test_timeout < time.time():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process timed out "
"(last test running was `%s' in `%s')!" %
(wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
elif not wrapped_testcase_suite.child.is_alive():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process unexpectedly died "
"(last test running was `%s' in `%s')!" %
(wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
elif wrapped_testcase_suite.last_test_temp_dir and \
wrapped_testcase_suite.last_test_vpp_binary:
if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
wrapped_testcase_suite.add_testclass_with_core()
if wrapped_testcase_suite.core_detected_at is None:
wrapped_testcase_suite.core_detected_at = time.time()
elif wrapped_testcase_suite.core_detected_at + \
core_timeout < time.time():
wrapped_testcase_suite.logger.critical(
"Child test runner process unresponsive and core-"
"file exists in test temporary directory "
"(last test running was `%s' in `%s')!" %
(wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
fail = True
if fail:
wrapped_testcase_suite.child.terminate()
try:
# terminating the child process tends to leave orphan
# VPP process around
if wrapped_testcase_suite.vpp_pid:
os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
except OSError:
# already dead
pass
wrapped_testcase_suite.result.crashed = True
wrapped_testcase_suite.result.process_result(
wrapped_testcase_suite.last_test_id, ERROR)
stop_run = process_finished_testsuite(
wrapped_testcase_suite,
finished_testcase_suites,
failed_wrapped_testcases,
results) or stop_run
for finished_testcase in finished_testcase_suites:
finished_testcase.child.join()
finished_testcase.close_pipes()
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
finished_testcase.stdouterr_queue.put(None)
if stop_run:
while len(testcase_suites) > 0:
results.append(TestResult(testcase_suites.pop(0)))
elif len(testcase_suites) > 0:
new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
wrapped_testcase_suites.add(new_testcase)
unread_testcases.add(new_testcase)
while len(unread_testcases) > 0:
# wait for reader thread to read everything in all loggers
pass
read_from_testcases.clear()
stdouterr_thread.join(test_timeout)
manager.shutdown()
handle_cores(failed_wrapped_testcases)
return results