OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3
/
dist-packages
/
twisted
/
trial
/
_dist
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
__init__.py
1.9 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
__pycache__
-
03/31/2022 06:22:40 AM
rwxr-xr-x
📄
distreporter.py
2.44 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
disttrial.py
8.51 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
managercommands.py
1.55 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
options.py
739 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📁
test
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
worker.py
9.21 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
workercommands.py
537 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
workerreporter.py
5.44 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
workertrial.py
2.9 KB
09/08/2017 10:38:36 AM
rw-r--r--
Editing: worker.py
Close
# -*- test-case-name: twisted.trial._dist.test.test_worker -*- # # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ This module implements the worker classes. @since: 12.3 """ import os from zope.interface import implementer from twisted.internet.protocol import ProcessProtocol from twisted.internet.interfaces import ITransport, IAddress from twisted.internet.defer import Deferred from twisted.protocols.amp import AMP from twisted.python.compat import _PY3, unicode from twisted.python.failure import Failure from twisted.python.reflect import namedObject from twisted.trial.unittest import Todo from twisted.trial.runner import TrialSuite, TestLoader from twisted.trial._dist import workercommands, managercommands from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT from twisted.trial._dist.workerreporter import WorkerReporter class WorkerProtocol(AMP): """ The worker-side trial distributed protocol. """ def __init__(self, forceGarbageCollection=False): self._loader = TestLoader() self._result = WorkerReporter(self) self._forceGarbageCollection = forceGarbageCollection def run(self, testCase): """ Run a test case by name. """ if _PY3: testCase = testCase.decode("utf-8") case = self._loader.loadByName(testCase) suite = TrialSuite([case], self._forceGarbageCollection) suite.run(self._result) return {'success': True} workercommands.Run.responder(run) def start(self, directory): """ Set up the worker, moving into given directory for tests to run in them. """ os.chdir(directory) return {'success': True} workercommands.Start.responder(start) class LocalWorkerAMP(AMP): """ Local implementation of the manager commands. """ def addSuccess(self, testName): """ Add a success to the reporter. """ self._result.addSuccess(self._testCase) return {'success': True} managercommands.AddSuccess.responder(addSuccess) def _buildFailure(self, error, errorClass, frames): """ Helper to build a C{Failure} with some traceback. @param error: An C{Exception} instance. @param error: The class name of the C{error} class. @param frames: A flat list of strings representing the information need to approximatively rebuild C{Failure} frames. @return: A L{Failure} instance with enough information about a test error. """ if _PY3: errorClass = errorClass.decode("utf-8") errorType = namedObject(errorClass) failure = Failure(error, errorType) for i in range(0, len(frames), 3): failure.frames.append( (frames[i], frames[i + 1], int(frames[i + 2]), [], [])) return failure def addError(self, testName, error, errorClass, frames): """ Add an error to the reporter. """ failure = self._buildFailure(error, errorClass, frames) self._result.addError(self._testCase, failure) return {'success': True} managercommands.AddError.responder(addError) def addFailure(self, testName, fail, failClass, frames): """ Add a failure to the reporter. """ failure = self._buildFailure(fail, failClass, frames) self._result.addFailure(self._testCase, failure) return {'success': True} managercommands.AddFailure.responder(addFailure) def addSkip(self, testName, reason): """ Add a skip to the reporter. """ self._result.addSkip(self._testCase, reason) return {'success': True} managercommands.AddSkip.responder(addSkip) def addExpectedFailure(self, testName, error, todo): """ Add an expected failure to the reporter. """ _todo = Todo(todo) self._result.addExpectedFailure(self._testCase, error, _todo) return {'success': True} managercommands.AddExpectedFailure.responder(addExpectedFailure) def addUnexpectedSuccess(self, testName, todo): """ Add an unexpected success to the reporter. """ self._result.addUnexpectedSuccess(self._testCase, todo) return {'success': True} managercommands.AddUnexpectedSuccess.responder(addUnexpectedSuccess) def testWrite(self, out): """ Print test output from the worker. """ if _PY3 and isinstance(out, bytes): out = out.decode("utf-8") self._testStream.write(out + '\n') self._testStream.flush() return {'success': True} managercommands.TestWrite.responder(testWrite) def _stopTest(self, result): """ Stop the current running test case, forwarding the result. """ self._result.stopTest(self._testCase) return result def run(self, testCase, result): """ Run a test. """ self._testCase = testCase self._result = result self._result.startTest(testCase) d = self.callRemote(workercommands.Run, testCase=testCase.id()) return d.addCallback(self._stopTest) def setTestStream(self, stream): """ Set the stream used to log output from tests. """ self._testStream = stream @implementer(IAddress) class LocalWorkerAddress(object): """ A L{IAddress} implementation meant to provide stub addresses for L{ITransport.getPeer} and L{ITransport.getHost}. """ @implementer(ITransport) class LocalWorkerTransport(object): """ A stub transport implementation used to support L{AMP} over a L{ProcessProtocol} transport. """ def __init__(self, transport): self._transport = transport def write(self, data): """ Forward data to transport. """ self._transport.writeToChild(_WORKER_AMP_STDIN, data) def writeSequence(self, sequence): """ Emulate C{writeSequence} by iterating data in the C{sequence}. """ for data in sequence: self._transport.writeToChild(_WORKER_AMP_STDIN, data) def loseConnection(self): """ Closes the transport. """ self._transport.loseConnection() def getHost(self): """ Return a L{LocalWorkerAddress} instance. """ return LocalWorkerAddress() def getPeer(self): """ Return a L{LocalWorkerAddress} instance. """ return LocalWorkerAddress() class LocalWorker(ProcessProtocol): """ Local process worker protocol. This worker runs as a local process and communicates via stdin/out. @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with the worker. @ivar _logDirectory: The directory where logs will reside. @ivar _logFile: The name of the main log file for tests output. """ def __init__(self, ampProtocol, logDirectory, logFile): self._ampProtocol = ampProtocol self._logDirectory = logDirectory self._logFile = logFile self.endDeferred = Deferred() def connectionMade(self): """ When connection is made, create the AMP protocol instance. """ self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport)) if not os.path.exists(self._logDirectory): os.makedirs(self._logDirectory) self._outLog = open(os.path.join(self._logDirectory, 'out.log'), 'w') self._errLog = open(os.path.join(self._logDirectory, 'err.log'), 'w') testLog = open(os.path.join(self._logDirectory, self._logFile), 'w') self._ampProtocol.setTestStream(testLog) logDirectory = self._logDirectory if isinstance(logDirectory, unicode): logDirectory = logDirectory.encode("utf-8") d = self._ampProtocol.callRemote(workercommands.Start, directory=logDirectory) # Ignore the potential errors, the test suite will fail properly and it # would just print garbage. d.addErrback(lambda x: None) def connectionLost(self, reason): """ On connection lost, close the log files that we're managing for stdin and stdout. """ self._outLog.close() self._errLog.close() def processEnded(self, reason): """ When the process closes, call C{connectionLost} for cleanup purposes and forward the information to the C{_ampProtocol}. """ self.connectionLost(reason) self._ampProtocol.connectionLost(reason) self.endDeferred.callback(reason) def outReceived(self, data): """ Send data received from stdout to log. """ self._outLog.write(data) def errReceived(self, data): """ Write error data to log. """ self._errLog.write(data) def childDataReceived(self, childFD, data): """ Handle data received on the specific pipe for the C{_ampProtocol}. """ if childFD == _WORKER_AMP_STDOUT: self._ampProtocol.dataReceived(data) else: ProcessProtocol.childDataReceived(self, childFD, data)