OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3.7
/
test
/
libregrtest
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
05/09/2024 06:49:08 AM
rwxr-xr-x
📄
__init__.py
190 bytes
12/09/2021 05:04:37 PM
rw-r--r--
📁
__pycache__
-
05/09/2024 06:49:08 AM
rwxr-xr-x
📄
cmdline.py
17.74 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
main.py
23.6 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
refleak.py
8 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
runtest.py
10.33 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
runtest_mp.py
13.29 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
save_env.py
10.75 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
setup.py
4.12 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
utils.py
1.46 KB
12/09/2021 05:04:37 PM
rw-r--r--
📄
win_utils.py
4.95 KB
12/09/2021 05:04:37 PM
rw-r--r--
Editing: runtest_mp.py
Close
import collections import faulthandler import json import os import queue import subprocess import sys import threading import time import traceback import types from test import support from test.libregrtest.runtest import ( runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, format_test_result, TestResult, is_failed, TIMEOUT) from test.libregrtest.setup import setup_tests from test.libregrtest.utils import format_duration, print_warning # Display the running tests if nothing happened last N seconds PROGRESS_UPDATE = 30.0 # seconds assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME # Time to wait until a worker completes: should be immediate JOIN_TIMEOUT = 30.0 # seconds def must_stop(result, ns): if result.result == INTERRUPTED: return True if ns.failfast and is_failed(result, ns): return True return False def parse_worker_args(worker_args): ns_dict, test_name = json.loads(worker_args) ns = types.SimpleNamespace(**ns_dict) return (ns, test_name) def run_test_in_subprocess(testname, ns): ns_dict = vars(ns) worker_args = (ns_dict, testname) worker_args = json.dumps(worker_args) cmd = [sys.executable, *support.args_from_interpreter_flags(), '-u', # Unbuffered stdout and stderr '-m', 'test.regrtest', '--worker-args', worker_args] # Running the child from the same working directory as regrtest's original # invocation ensures that TEMPDIR for the child is the same when # sysconfig.is_python_build() is true. See issue 15300. return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, close_fds=(os.name != 'nt'), cwd=support.SAVEDCWD) def run_tests_worker(ns, test_name): setup_tests(ns) result = runtest(ns, test_name) print() # Force a newline (just in case) # Serialize TestResult as list in JSON print(json.dumps(list(result)), flush=True) sys.exit(0) # We do not use a generator so multiple threads can call next(). class MultiprocessIterator: """A thread-safe iterator over tests for multiprocess mode.""" def __init__(self, tests_iter): self.lock = threading.Lock() self.tests_iter = tests_iter def __iter__(self): return self def __next__(self): with self.lock: if self.tests_iter is None: raise StopIteration return next(self.tests_iter) def stop(self): with self.lock: self.tests_iter = None MultiprocessResult = collections.namedtuple('MultiprocessResult', 'result stdout stderr error_msg') class ExitThread(Exception): pass class TestWorkerProcess(threading.Thread): def __init__(self, worker_id, pending, output, ns, timeout): super().__init__() self.worker_id = worker_id self.pending = pending self.output = output self.ns = ns self.timeout = timeout self.current_test_name = None self.start_time = None self._popen = None self._killed = False self._stopped = False def __repr__(self): info = [f'TestWorkerProcess #{self.worker_id}'] if self.is_alive(): dt = time.monotonic() - self.start_time info.append("running for %s" % format_duration(dt)) else: info.append('stopped') test = self.current_test_name if test: info.append(f'test={test}') popen = self._popen if popen: info.append(f'pid={popen.pid}') return '<%s>' % ' '.join(info) def _kill(self): if self._killed: return self._killed = True popen = self._popen if popen is None: return print(f"Kill {self}", file=sys.stderr, flush=True) try: popen.kill() except OSError as exc: print_warning(f"Failed to kill {self}: {exc!r}") def stop(self): # Method called from a different thread to stop this thread self._stopped = True self._kill() def mp_result_error(self, test_name, error_type, stdout='', stderr='', err_msg=None): test_time = time.monotonic() - self.start_time result = TestResult(test_name, error_type, test_time, None) return MultiprocessResult(result, stdout, stderr, err_msg) def _timedout(self, test_name): self._kill() stdout = stderr = '' popen = self._popen try: stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT) except (subprocess.TimeoutExpired, OSError) as exc: print_warning(f"Failed to read {self} output " f"(timeout={format_duration(JOIN_TIMEOUT)}): " f"{exc!r}") return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) def _run_process(self, test_name): self.start_time = time.monotonic() self.current_test_name = test_name try: self._killed = False self._popen = run_test_in_subprocess(test_name, self.ns) popen = self._popen except: self.current_test_name = None raise try: if self._stopped: # If kill() has been called before self._popen is set, # self._popen is still running. Call again kill() # to ensure that the process is killed. self._kill() raise ExitThread try: stdout, stderr = popen.communicate(timeout=self.timeout) except subprocess.TimeoutExpired: if self._stopped: # kill() has been called: communicate() fails # on reading closed stdout/stderr raise ExitThread return self._timedout(test_name) except OSError: if self._stopped: # kill() has been called: communicate() fails # on reading closed stdout/stderr raise ExitThread raise retcode = popen.returncode stdout = stdout.strip() stderr = stderr.rstrip() return (retcode, stdout, stderr) except: self._kill() raise finally: self._wait_completed() self._popen = None self.current_test_name = None def _runtest(self, test_name): result = self._run_process(test_name) if isinstance(result, MultiprocessResult): # _timedout() case return result retcode, stdout, stderr = result err_msg = None if retcode != 0: err_msg = "Exit code %s" % retcode else: stdout, _, result = stdout.rpartition("\n") stdout = stdout.rstrip() if not result: err_msg = "Failed to parse worker stdout" else: try: # deserialize run_tests_worker() output result = json.loads(result) result = TestResult(*result) except Exception as exc: err_msg = "Failed to parse worker JSON: %s" % exc if err_msg is not None: return self.mp_result_error(test_name, CHILD_ERROR, stdout, stderr, err_msg) return MultiprocessResult(result, stdout, stderr, err_msg) def run(self): while not self._stopped: try: try: test_name = next(self.pending) except StopIteration: break mp_result = self._runtest(test_name) self.output.put((False, mp_result)) if must_stop(mp_result.result, self.ns): break except ExitThread: break except BaseException: self.output.put((True, traceback.format_exc())) break def _wait_completed(self): popen = self._popen # stdout and stderr must be closed to ensure that communicate() # does not hang popen.stdout.close() popen.stderr.close() try: popen.wait(JOIN_TIMEOUT) except (subprocess.TimeoutExpired, OSError) as exc: print_warning(f"Failed to wait for {self} completion " f"(timeout={format_duration(JOIN_TIMEOUT)}): " f"{exc!r}") def wait_stopped(self, start_time): while True: # Write a message every second self.join(1.0) if not self.is_alive(): break dt = time.monotonic() - start_time print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True) if dt > JOIN_TIMEOUT: print_warning(f"Failed to join {self} in {format_duration(dt)}") break def get_running(workers): running = [] for worker in workers: current_test_name = worker.current_test_name if not current_test_name: continue dt = time.monotonic() - worker.start_time if dt >= PROGRESS_MIN_TIME: text = '%s (%s)' % (current_test_name, format_duration(dt)) running.append(text) return running class MultiprocessTestRunner: def __init__(self, regrtest): self.regrtest = regrtest self.ns = regrtest.ns self.output = queue.Queue() self.pending = MultiprocessIterator(self.regrtest.tests) if self.ns.timeout is not None: self.worker_timeout = self.ns.timeout * 1.5 else: self.worker_timeout = None self.workers = None def start_workers(self): self.workers = [TestWorkerProcess(index, self.pending, self.output, self.ns, self.worker_timeout) for index in range(1, self.ns.use_mp + 1)] print("Run tests in parallel using %s child processes" % len(self.workers)) for worker in self.workers: worker.start() def stop_workers(self): start_time = time.monotonic() for worker in self.workers: worker.stop() for worker in self.workers: worker.wait_stopped(start_time) def _get_result(self): if not any(worker.is_alive() for worker in self.workers): # all worker threads are done: consume pending results try: return self.output.get(timeout=0) except queue.Empty: return None use_faulthandler = (self.ns.timeout is not None) timeout = PROGRESS_UPDATE while True: if use_faulthandler: faulthandler.dump_traceback_later(timeout * 2.0, exit=True) # wait for a thread try: return self.output.get(timeout=timeout) except queue.Empty: pass # display progress running = get_running(self.workers) if running and not self.ns.pgo: print('running: %s' % ', '.join(running), flush=True) def display_result(self, mp_result): result = mp_result.result text = format_test_result(result) if mp_result.error_msg is not None: # CHILD_ERROR text += ' (%s)' % mp_result.error_msg elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): text += ' (%s)' % format_duration(result.test_time) running = get_running(self.workers) if running and not self.ns.pgo: text += ' -- running: %s' % ', '.join(running) self.regrtest.display_progress(self.test_index, text) def _process_result(self, item): if item[0]: # Thread got an exception format_exc = item[1] print(f"regrtest worker thread failed: {format_exc}", file=sys.stderr, flush=True) return True self.test_index += 1 mp_result = item[1] self.regrtest.accumulate_result(mp_result.result) self.display_result(mp_result) if mp_result.stdout: print(mp_result.stdout, flush=True) if mp_result.stderr and not self.ns.pgo: print(mp_result.stderr, file=sys.stderr, flush=True) if must_stop(mp_result.result, self.ns): return True return False def run_tests(self): self.start_workers() self.test_index = 0 try: while True: item = self._get_result() if item is None: break stop = self._process_result(item) if stop: break except KeyboardInterrupt: print() self.regrtest.interrupted = True finally: if self.ns.timeout is not None: faulthandler.cancel_dump_traceback_later() # Always ensure that all worker processes are no longer # worker when we exit this function self.pending.stop() self.stop_workers() def run_tests_multiprocess(regrtest): MultiprocessTestRunner(regrtest).run_tests()